Source code for node_frequency

# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
import scipy.signal
from scipy import fft

from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, adjust
from sympathy.api.exceptions import SyConfigurationError, SyDataError
from sylib.machinelearning.utility import table_to_array


_transform_options = {
    'fourier': 'Discrete Fourier Transform (FFT)',
    'DCT-I': 'Discrete Cosine Transform (type I)',
    'DCT-II': 'Discrete Cosine Transform (type II)',
    'DCT-III': 'Discrete Cosine Transform (type III)',
    'DST-I': 'Discrete Sine Transform (type I)',
    'DST-II': 'Discrete Sine Transform (type II)',
    'DST-III': 'Discrete Sine Transform (type III)',
}


def _ifftfreq(freq):
    """
    Attempt to recreate a time array from the frequencies from fftfreq.
    The returned time array will always start at 0.
    """
    if len(freq) <= 1:
        return np.arange(0, len(freq))

    # fftfreq documentation states that the last bin will be -1/(n*d).
    d = -1/(freq[-1]*len(freq))
    return np.linspace(0, (len(freq) - 1)*d, len(freq))


[docs] class FrequencyTransform(node.Node): """ The Discrete Fourier Transform takes a signal in the time domain and decomposes it into its constituent components in the frequency domain. This is useful throughout many different fields including signal processing and image processing. For more information, see https://en.wikipedia.org/wiki/Discrete_Fourier_transform. All different transforms in this node are variants of DFT, and they all use the Fast Fourier Transform (FFT) algorithm under the hood. Discrete Cosine Transform (DCT) assumes that the signal is symmetric at one or two of its end points, with different assumptions being used for different types of DCT. Conversely the Discrete Sine Transform (DST) assumes that the signal is antisymmetric at one or two of its end points. Two-dimensional transform ========================= If "Use 2D transform" is checked the whole input table is treated as a single two-dimensional array, and the output is likewise a single two-dimensional array. Inverse transform ================= To inverse a transform you select that same transform and check "Inverse", also making sure to keep the "Center" option the same as when the transform was first applied. """ name = 'Frequency transform' author = 'Mathias Broxvall' icon = 'fourier.svg' description = ( "Apply Discrete Fourier Transform to input, " "decomposing it into frequency components." ) nodeid = 'com.sympathyfordata.timeseriesanalysis.frequency_transform' tags = Tags(Tag.Analysis.SignalProcessing) related = [ 'com.sympathyfordata.timeseriesanalysis.frequency_transform', 'com.sympathyfordata.timeseriesanalysis.spectral_transform', 'com.sympathyfordata.imageanalysis.frequency_transform_image', ] parameters = node.parameters() parameters.set_string( 'operation', label='Operation', value='fourier', description='Transform to apply.', editor=node.editors.combo_editor(options=_transform_options)) parameters.set_boolean( 'center', label='Center', value=True, description=( 'If checked all frequencies in the output will be in increasing ' 'order. If unchecked positive frequencies will come before ' 'negative frequencies.')) parameters.set_boolean( '2d', label='Use 2D transform', value=False, description=( 'If checked the whole input table is treated as a single two-' 'dimensional array. If unchecked each column is treated as a ' 'separate one-dimensional signal.')) parameters.set_boolean( 'inverse', label='Inverse transformation', value=False, description=( 'Compute the inverse of the selected transform')) parameters.set_string( 'time_col', label='Time column', value='', description=( 'Time column is not transformed but is instead used to ' 'calculate frequencies'), editor=node.editors.combo_editor(edit=True)) inputs = Ports([Port.Table('time domain', name='time domain')]) outputs = Ports([Port.Table('frequency domain', name='frequency domain')]) controllers = node.controller( when=node.field('2d', 'checked'), action=node.field('time_col', 'disabled')) def adjust_parameters(self, node_context): adjust(node_context.parameters['time_col'], node_context.input['time domain']) def execute(self, node_context): as_2d = node_context.parameters['2d'].value inverse = node_context.parameters['inverse'].value center = node_context.parameters['center'].value op = node_context.parameters['operation'].value time_tbl = node_context.input['time domain'] out_tbl = node_context.output['frequency domain'] time_col = node_context.parameters['time_col'].value if op == 'fourier': if as_2d: trans = fft.ifft2 if inverse else fft.fft2 else: trans = fft.ifft if inverse else fft.fft type_ = None elif op in ['DCT-I', 'DCT-II', 'DCT-III']: trans = fft.idct if inverse else fft.dct type_ = {'DCT-I': 1, 'DCT-II': 2, 'DCT-III': 3}[op] elif op in ['DST-I', 'DST-II', 'DST-III']: trans = fft.idst if inverse else fft.dst type_ = {'DST-I': 1, 'DST-II': 2, 'DST-III': 3}[op] else: raise SyConfigurationError( 'Invalid value for parameter operation.') if time_col and time_col not in time_tbl.column_names(): raise SyConfigurationError( 'Selected time column not found in data.') if as_2d: time = table_to_array(time_tbl) if center and inverse: time = fft.ifftshift(time) if type_ is None: freq = trans(time) else: # dct/dst transforms need to be made 2-d manually freq = trans(trans(time.T, type=type_).T, type=type_) if center and not inverse: freq = fft.fftshift(freq) for i, col in enumerate(time_tbl.cols()): out_tbl.set_column_from_array(col.name, freq[:, i]) else: for col in time_tbl.cols(): attrs = {} name = col.name time = col.data if center and inverse: time = fft.ifftshift(time) if col.name == time_col: if inverse: freq = _ifftfreq(time) name = col.attrs.get('time_name', name) else: freq = fft.fftfreq( len(time), d=np.mean(np.diff(time))) name = 'frequency' attrs = {'time_name': col.name} else: if type_ is None: freq = trans(time) else: freq = trans(time, type=type_) if center and not inverse: freq = fft.fftshift(freq) out_tbl.set_column_from_array(name, freq, attributes=attrs)
[docs] class SpectralTransform(node.Node): """ Spectral transforms are important when we want to analyze the frequency content of non-stationary signals over time. For more information, see https://en.wikipedia.org/wiki/Short-time_Fourier_transform Methods ======= Short-time Fourier Transform (STFT) is a fourier releated transform used to determine the frequency content of local sections of time. More specifically, STFT splits the whole signal to shorter segment of equal length and computes the fourier transform on each segment. Spectogram is a visual representation of non stationary signal frequency content over time. It is the absolute square of the STFT To invert the transform select the istft method, also making sure to pass the output of stft. If "Add time column" is checked, it drops the first column which is considered as time column before applying the istft function. For more information about perfectly reconstructing a signal, see: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.check_COLA.html Window functions ================ For more information on the different window functions, see `Window Function Catalog <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.get_window.html#scipy.signal.get_window>`_ """ name = 'Spectral (short time fourier) transform' author = 'Mathias Broxvall' icon = 'fourier.svg' description = ( 'Computes a discrete short-time fourier transform on successive ' 'segments of the input data. Each analyzed frequency becomes one ' 'column and each segment becomes one row.') nodeid = 'com.sympathyfordata.timeseriesanalysis.spectral_transform' tags = Tags(Tag.Analysis.SignalProcessing) related = [ 'com.sympathyfordata.timeseriesanalysis.frequency_transform', 'com.sympathyfordata.timeseriesanalysis.spectral_transform', 'com.sympathyfordata.imageanalysis.frequency_transform_image', 'com.sympathyfordata.timeseriesanalysis.continous_wavelet_transform', ] parameters = node.parameters() parameters.set_string( 'column', label='Select column', value='', description='Input data column to transform', editor=node.editors.combo_editor()) parameters.set_string( 'method', label='Method', value='spectrogram', description='Select among fourier methods:\n' '\n "spectrogram" : spectral analysis (real valued),\n' '\n "stft" : short-time fourier transform (complex),\n' '\n "istft" : inverse short-time fourier transform ' '(complex to real)', editor=node.editors.combo_editor( options=['spectrogram', 'stft', 'istft'])) parameters.set_string( 'window', label='Window', value='boxcar', description=( 'Window function applied to each segment, for details please ' 'refer to the node documentation.\n'), editor=node.editors.combo_editor( options=['boxcar', 'triang', 'blackman', 'hamming', 'hann', 'bartlett', 'flattop', 'parzen', 'bohman', 'blackmanharris', 'nuttall', 'barthann'])) parameters.set_string( 'detrend', label='Detrend', value='none', description='Performs detrending on each subset of the data before ' 'FFT. \n' '\n "constant" : the mean of the subset will be ' 'removed. \n \n "linear" : a least square linear ' 'expression is subtracted', editor=node.editors.combo_editor( options=['none', 'constant', 'linear'])) parameters.set_string( 'boundary', label='Boundary', value='none', description='Specifies how to extend borders of the data so that the ' 'first segment is centered on the first data point', editor=node.editors.combo_editor( options=['none', 'zeros', 'even', 'odd', 'constant'])) parameters.set_integer( 'nperseg', label='Segment length', value=256, description='Length of each subset') parameters.set_integer( 'overlap', label='Overlap between segments', value=0, description='Number of points to overlap between segments. Use ' 'segment length / 2 if transform should be invertible') parameters.set_float( 'fs', label='Sample rate', value=1.0, description='Number of samples corresponding to one second') parameters.set_boolean( 'descname', label='Descriptive names', value=True, description='Generates output column names containing the frequency. ' 'Otherwise simple indices is used for each column') parameters.set_boolean( 'segment_times', label='Add time column', value=True, description='If checked, add a column with segment times. The time ' 'column will be the first column of the output table.') inputs = Ports([ Port.Table('time domain', name='time domain') ]) outputs = Ports([ Port.Table('frequency domain', name='frequency domain') ]) def adjust_parameters(self, node_context): adjust(node_context.parameters['column'], node_context.input['time domain']) def execute(self, node_context): column = node_context.parameters['column'].value boundary = node_context.parameters['boundary'].value descname = node_context.parameters['descname'].value detrend = node_context.parameters['detrend'].value fs = node_context.parameters['fs'].value method = node_context.parameters['method'].value nperseg = node_context.parameters['nperseg'].value overlap = node_context.parameters['overlap'].value window = node_context.parameters['window'].value time_tbl = node_context.input['time domain'] out_tbl = node_context.output['frequency domain'] time_seg_col = node_context.parameters['segment_times'].value if not column: raise SyDataError("No column selected") col = time_tbl.col(column) if time_tbl.number_of_rows() == 0: raise SyDataError("Empty table") if method != 'istft': time = col.data if detrend == 'none': detrend = False if boundary == 'none': boundary = None kwargs = {} kwargs['nperseg'] = nperseg kwargs['fs'] = fs kwargs['window'] = window kwargs['noverlap'] = overlap if method == 'istft': if time_seg_col: time = table_to_array(time_tbl[:, 1:]).T else: time = table_to_array(time_tbl).T kwargs['boundary'] = boundary t, res = scipy.signal.istft(time, **kwargs) if '@' in col.name: name = '@'.join(col.name.split('@')[:-1]) else: name = col.name if time_seg_col: out_tbl.set_column_from_array('time', t) out_tbl.set_column_from_array(name, res) elif method == 'spectrogram': kwargs['detrend'] = detrend f, t, res = scipy.signal.spectrogram(time, **kwargs) elif method == 'stft': kwargs['detrend'] = detrend kwargs['boundary'] = boundary f, t, res = scipy.signal.stft(time, **kwargs) else: raise SyConfigurationError('Invalid method in node configuration') if method != 'istft': if time_seg_col: out_tbl.set_column_from_array('time', t) for freq in range(res.shape[0]): if descname: name = "{}@{} hz".format(col.name, f[freq]) else: name = "{}@{}".format(col.name, freq) out_tbl.set_column_from_array(name, res[freq, :])