Source code for node_resample

# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.

import numpy as np
import scipy
import scipy.fftpack
import scipy.signal

from sympathy.api import node
from sympathy.api.exceptions import SyConfigurationError, SyDataError
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, adjust

INTERP_METHODS = {
    'nearest': 'nearest',
    '(interpolate) zero': 'zero',
    '(interpolate) linear': 'linear',
    '(interpolate) linear as spline': 'slinear',
    '(interpolate) quadratic': 'quadratic',
    '(interpolate) cubic': 'cubic',
}
FOURIER_METHOD = 'fourier'


def alg_fourier_resample(time_data, n2):
    data = scipy.signal.resample(np.r_[time_data, time_data[::-1]], n2*2)[:n2]
    return data


[docs] class ResampleTable(node.Node): """ """ name = 'Resample Table' author = 'Mathias Broxvall' icon = 'resample.svg' description = ( 'Performs interpolation or resampling of table data. Copies any table ' 'attributes from input to output and adds a new \n' 'attribute with the resample factor.\n' '\nThe selected time signals are resampled linearly, while ' 'all other signals (unselected in the menu) with the selected ' 'method.\n' '\nIf the input table has sample ' 'rate or frequency attributes then they are multiplied by the ' 'resample rate' ) nodeid = 'com.sympathyfordata.timeseriesanalysis.resample_table' tags = Tags(Tag.Analysis.SignalProcessing) inputs = Ports([ Port.Table('Table to resample', name='value'), ]) outputs = Ports([ Port.Custom('table', 'Result table', name='out') ]) time_editor = node.editors.multilist_editor(mode=False) parameters = node.parameters() parameters.set_float( 'resample factor', label='Resample factor', value=1.0, description='Multiplicative factor for resampling, eg. value 2.0 ' 'means that twice as many datapoints will be generated in ' 'the output' ) parameters.set_string( 'method', label='Method', value=FOURIER_METHOD, description='Interpolation method used. The interpolation methods ' 'should only be used for upsampling', editor=node.editors.combo_editor( options=[FOURIER_METHOD] + list(INTERP_METHODS.keys())) ) parameters.set_list( 'time signals', label='Time signal(s)', value=[], description='Time signals will always be resampled using linear ' 'interpolation', editor=time_editor) def adjust_parameters(self, node_context): adjust(node_context.parameters['time signals'], node_context.input['value']) def execute(self, node_context): in_tbl = node_context.input['value'] out_tbl = node_context.output['out'] resample_factor = node_context.parameters['resample factor'].value method = node_context.parameters['method'].value time_signals = node_context.parameters['time signals'].value_names n1 = in_tbl.number_of_rows() if n1 == 0: raise SyDataError("Empty table") n2 = int(round(n1 * resample_factor)) resample_factor = n2 / float(n1) attrs = dict(in_tbl.get_table_attributes()) if 'resample factor' in attrs: attrs['resample factor'] *= resample_factor else: attrs['resample factor'] = resample_factor if 'sample rate' in attrs: attrs['sample rate'] *= resample_factor if 'frequency' in attrs: attrs['frequency'] /= resample_factor out_tbl.set_table_attributes(attrs) for col in in_tbl.cols(): if col.name in time_signals: fn = scipy.interpolate.interp1d( np.arange(n1), col.data, kind='linear', fill_value='extrapolate', copy=False, assume_sorted=True) data = fn(np.arange(n2) / resample_factor) elif method == FOURIER_METHOD: data = alg_fourier_resample(col.data, n2) elif method in INTERP_METHODS: fn = scipy.interpolate.interp1d( np.arange(n1), col.data, kind=INTERP_METHODS[method], fill_value='extrapolate', copy=False, assume_sorted=True) data = fn(np.arange(n2) / resample_factor) else: raise SyConfigurationError('Invalid resampling method: "{}"' .format(method)) out_tbl.set_column_from_array(col.name, data) attrs = dict(col.attrs) if 'resample factor' in attrs: attrs['resample factor'] *= resample_factor else: attrs['resample factor'] = resample_factor if 'sample rate' in attrs: attrs['sample rate'] *= resample_factor if 'frequency' in attrs: attrs['frequency'] /= resample_factor if len(attrs) > 0: out_tbl.set_column_attributes(col.name, attrs)