Source code for node_frequency
# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
import scipy.signal
from scipy import fft
from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, adjust
from sympathy.api.exceptions import SyConfigurationError, SyDataError
from sylib.machinelearning.utility import table_to_array
_transform_options = {
'fourier': 'Discrete Fourier Transform (FFT)',
'DCT-I': 'Discrete Cosine Transform (type I)',
'DCT-II': 'Discrete Cosine Transform (type II)',
'DCT-III': 'Discrete Cosine Transform (type III)',
'DST-I': 'Discrete Sine Transform (type I)',
'DST-II': 'Discrete Sine Transform (type II)',
'DST-III': 'Discrete Sine Transform (type III)',
}
def _ifftfreq(freq):
"""
Attempt to recreate a time array from the frequencies from fftfreq.
The returned time array will always start at 0.
"""
if len(freq) <= 1:
return np.arange(0, len(freq))
# fftfreq documentation states that the last bin will be -1/(n*d).
d = -1/(freq[-1]*len(freq))
return np.linspace(0, (len(freq) - 1)*d, len(freq))
[docs]
class FrequencyTransform(node.Node):
"""
The Discrete Fourier Transform takes a signal in the time domain and
decomposes it into its constituent components in the frequency domain. This
is useful throughout many different fields including signal processing and
image processing. For more information, see
https://en.wikipedia.org/wiki/Discrete_Fourier_transform.
All different transforms in this node are variants of DFT, and they all use
the Fast Fourier Transform (FFT) algorithm under the hood.
Discrete Cosine Transform (DCT) assumes that the signal is symmetric at one
or two of its end points, with different assumptions being used for
different types of DCT. Conversely the Discrete Sine Transform (DST)
assumes that the signal is antisymmetric at one or two of its end points.
Two-dimensional transform
=========================
If "Use 2D transform" is checked the whole input table is treated as a
single two-dimensional array, and the output is likewise a single
two-dimensional array.
Inverse transform
=================
To inverse a transform you select that same transform and check "Inverse",
also making sure to keep the "Center" option the same as when the transform
was first applied.
"""
name = 'Frequency transform'
author = 'Mathias Broxvall'
icon = 'fourier.svg'
description = (
"Apply Discrete Fourier Transform to input, "
"decomposing it into frequency components."
)
nodeid = 'com.sympathyfordata.timeseriesanalysis.frequency_transform'
tags = Tags(Tag.Analysis.SignalProcessing)
related = [
'com.sympathyfordata.timeseriesanalysis.frequency_transform',
'com.sympathyfordata.timeseriesanalysis.spectral_transform',
'com.sympathyfordata.imageanalysis.frequency_transform_image',
]
parameters = node.parameters()
parameters.set_string(
'operation', label='Operation', value='fourier',
description='Transform to apply.',
editor=node.editors.combo_editor(options=_transform_options))
parameters.set_boolean(
'center', label='Center', value=True,
description=(
'If checked all frequencies in the output will be in increasing '
'order. If unchecked positive frequencies will come before '
'negative frequencies.'))
parameters.set_boolean(
'2d', label='Use 2D transform', value=False,
description=(
'If checked the whole input table is treated as a single two-'
'dimensional array. If unchecked each column is treated as a '
'separate one-dimensional signal.'))
parameters.set_boolean(
'inverse', label='Inverse transformation', value=False,
description=(
'Compute the inverse of the selected transform'))
parameters.set_string(
'time_col', label='Time column', value='',
description=(
'Time column is not transformed but is instead used to '
'calculate frequencies'),
editor=node.editors.combo_editor(edit=True))
inputs = Ports([Port.Table('time domain', name='time domain')])
outputs = Ports([Port.Table('frequency domain', name='frequency domain')])
controllers = node.controller(
when=node.field('2d', 'checked'),
action=node.field('time_col', 'disabled'))
def adjust_parameters(self, node_context):
adjust(node_context.parameters['time_col'],
node_context.input['time domain'])
def execute(self, node_context):
as_2d = node_context.parameters['2d'].value
inverse = node_context.parameters['inverse'].value
center = node_context.parameters['center'].value
op = node_context.parameters['operation'].value
time_tbl = node_context.input['time domain']
out_tbl = node_context.output['frequency domain']
time_col = node_context.parameters['time_col'].value
if op == 'fourier':
if as_2d:
trans = fft.ifft2 if inverse else fft.fft2
else:
trans = fft.ifft if inverse else fft.fft
type_ = None
elif op in ['DCT-I', 'DCT-II', 'DCT-III']:
trans = fft.idct if inverse else fft.dct
type_ = {'DCT-I': 1, 'DCT-II': 2, 'DCT-III': 3}[op]
elif op in ['DST-I', 'DST-II', 'DST-III']:
trans = fft.idst if inverse else fft.dst
type_ = {'DST-I': 1, 'DST-II': 2, 'DST-III': 3}[op]
else:
raise SyConfigurationError(
'Invalid value for parameter operation.')
if time_col and time_col not in time_tbl.column_names():
raise SyConfigurationError(
'Selected time column not found in data.')
if as_2d:
time = table_to_array(time_tbl)
if center and inverse:
time = fft.ifftshift(time)
if type_ is None:
freq = trans(time)
else:
# dct/dst transforms need to be made 2-d manually
freq = trans(trans(time.T, type=type_).T, type=type_)
if center and not inverse:
freq = fft.fftshift(freq)
for i, col in enumerate(time_tbl.cols()):
out_tbl.set_column_from_array(col.name, freq[:, i])
else:
for col in time_tbl.cols():
attrs = {}
name = col.name
time = col.data
if center and inverse:
time = fft.ifftshift(time)
if col.name == time_col:
if inverse:
freq = _ifftfreq(time)
name = col.attrs.get('time_name', name)
else:
freq = fft.fftfreq(
len(time), d=np.mean(np.diff(time)))
name = 'frequency'
attrs = {'time_name': col.name}
else:
if type_ is None:
freq = trans(time)
else:
freq = trans(time, type=type_)
if center and not inverse:
freq = fft.fftshift(freq)
out_tbl.set_column_from_array(name, freq, attributes=attrs)
[docs]
class SpectralTransform(node.Node):
"""
Spectral transforms are important when we want to analyze the frequency
content of non-stationary signals over time. For more information, see
https://en.wikipedia.org/wiki/Short-time_Fourier_transform
Methods
=======
Short-time Fourier Transform (STFT) is a fourier releated transform used to
determine the frequency content of local sections of time. More
specifically, STFT splits the whole signal to shorter segment of equal
length and computes the fourier transform on each segment.
Spectogram is a visual representation of non stationary signal frequency
content over time. It is the absolute square of the STFT
To invert the transform select the istft method, also making sure to pass
the output of stft. If "Add time column" is checked, it drops the first
column which is considered as time column before applying the istft
function. For more information about perfectly reconstructing a signal,
see:
https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.check_COLA.html
Window functions
================
For more information on the different window functions, see
`Window Function Catalog <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.get_window.html#scipy.signal.get_window>`_
"""
name = 'Spectral (short time fourier) transform'
author = 'Mathias Broxvall'
icon = 'fourier.svg'
description = (
'Computes a discrete short-time fourier transform on successive '
'segments of the input data. Each analyzed frequency becomes one '
'column and each segment becomes one row.')
nodeid = 'com.sympathyfordata.timeseriesanalysis.spectral_transform'
tags = Tags(Tag.Analysis.SignalProcessing)
related = [
'com.sympathyfordata.timeseriesanalysis.frequency_transform',
'com.sympathyfordata.timeseriesanalysis.spectral_transform',
'com.sympathyfordata.imageanalysis.frequency_transform_image',
'com.sympathyfordata.timeseriesanalysis.continous_wavelet_transform',
]
parameters = node.parameters()
parameters.set_string(
'column',
label='Select column',
value='',
description='Input data column to transform',
editor=node.editors.combo_editor())
parameters.set_string(
'method',
label='Method',
value='spectrogram',
description='Select among fourier methods:\n'
'\n "spectrogram" : spectral analysis (real valued),\n'
'\n "stft" : short-time fourier transform (complex),\n'
'\n "istft" : inverse short-time fourier transform '
'(complex to real)',
editor=node.editors.combo_editor(
options=['spectrogram', 'stft', 'istft']))
parameters.set_string(
'window',
label='Window',
value='boxcar',
description=(
'Window function applied to each segment, for details please '
'refer to the node documentation.\n'),
editor=node.editors.combo_editor(
options=['boxcar', 'triang', 'blackman', 'hamming', 'hann',
'bartlett', 'flattop', 'parzen', 'bohman',
'blackmanharris', 'nuttall', 'barthann']))
parameters.set_string(
'detrend',
label='Detrend',
value='none',
description='Performs detrending on each subset of the data before '
'FFT. \n'
'\n "constant" : the mean of the subset will be '
'removed. \n \n "linear" : a least square linear '
'expression is subtracted',
editor=node.editors.combo_editor(
options=['none', 'constant', 'linear']))
parameters.set_string(
'boundary',
label='Boundary',
value='none',
description='Specifies how to extend borders of the data so that the '
'first segment is centered on the first data point',
editor=node.editors.combo_editor(
options=['none', 'zeros', 'even', 'odd', 'constant']))
parameters.set_integer(
'nperseg',
label='Segment length',
value=256,
description='Length of each subset')
parameters.set_integer(
'overlap',
label='Overlap between segments',
value=0,
description='Number of points to overlap between segments. Use '
'segment length / 2 if transform should be invertible')
parameters.set_float(
'fs', label='Sample rate', value=1.0,
description='Number of samples corresponding to one second')
parameters.set_boolean(
'descname',
label='Descriptive names',
value=True,
description='Generates output column names containing the frequency. '
'Otherwise simple indices is used for each column')
parameters.set_boolean(
'segment_times',
label='Add time column',
value=True,
description='If checked, add a column with segment times. The time '
'column will be the first column of the output table.')
inputs = Ports([
Port.Table('time domain', name='time domain')
])
outputs = Ports([
Port.Table('frequency domain', name='frequency domain')
])
def adjust_parameters(self, node_context):
adjust(node_context.parameters['column'],
node_context.input['time domain'])
def execute(self, node_context):
column = node_context.parameters['column'].value
boundary = node_context.parameters['boundary'].value
descname = node_context.parameters['descname'].value
detrend = node_context.parameters['detrend'].value
fs = node_context.parameters['fs'].value
method = node_context.parameters['method'].value
nperseg = node_context.parameters['nperseg'].value
overlap = node_context.parameters['overlap'].value
window = node_context.parameters['window'].value
time_tbl = node_context.input['time domain']
out_tbl = node_context.output['frequency domain']
time_seg_col = node_context.parameters['segment_times'].value
if not column:
raise SyDataError("No column selected")
col = time_tbl.col(column)
if time_tbl.number_of_rows() == 0:
raise SyDataError("Empty table")
if method != 'istft':
time = col.data
if detrend == 'none':
detrend = False
if boundary == 'none':
boundary = None
kwargs = {}
kwargs['nperseg'] = nperseg
kwargs['fs'] = fs
kwargs['window'] = window
kwargs['noverlap'] = overlap
if method == 'istft':
if time_seg_col:
time = table_to_array(time_tbl[:, 1:]).T
else:
time = table_to_array(time_tbl).T
kwargs['boundary'] = boundary
t, res = scipy.signal.istft(time, **kwargs)
if '@' in col.name:
name = '@'.join(col.name.split('@')[:-1])
else:
name = col.name
if time_seg_col:
out_tbl.set_column_from_array('time', t)
out_tbl.set_column_from_array(name, res)
elif method == 'spectrogram':
kwargs['detrend'] = detrend
f, t, res = scipy.signal.spectrogram(time, **kwargs)
elif method == 'stft':
kwargs['detrend'] = detrend
kwargs['boundary'] = boundary
f, t, res = scipy.signal.stft(time, **kwargs)
else:
raise SyConfigurationError('Invalid method in node configuration')
if method != 'istft':
if time_seg_col:
out_tbl.set_column_from_array('time', t)
for freq in range(res.shape[0]):
if descname:
name = "{}@{} hz".format(col.name, f[freq])
else:
name = "{}@{}".format(col.name, freq)
out_tbl.set_column_from_array(name, res[freq, :])