Source code for node_difference

# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np

from sympathy.api import node
from sympathy.api.exceptions import SyDataError
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags


[docs] class DifferenceEquation(node.Node): """ Performs a forward projection of the following linear difference equation system: yᵢ(t) = aᵢ₀ + ∑ aᵢⱼₖ yⱼ(t-k) + ∑ bᵢⱼₖ uⱼ(t-k) Where aᵢⱼₖ and bᵢⱼₖ are the coefficients giving the contribution to yᵢ based on the value of yⱼ or uⱼ at time point t-k. The system with v variables y_1 ... y_v and control signals u_1 ... u_r is described by (K+1) x (v+r) coefficients, where K is the number of previous times samples considered by this system. This system takes as input v columns of (v+r)*K+1 rows as coefficients as follows: aᵢ₀ = column[i][0],\n aᵢⱼₖ = column[i][j*K + k],\n bᵢⱼₖ = column[i][V*K + j*K + k]\n Given at least K rows of initial condition this node steps the linear system forwards in time and outputs all variables at their corresponding time points. By default the system has no coefficients b and the node configuration parameter T, defines the length of the system's response.\n To use the node for a system with a control signal, right click the node and select Ports>Input>Create: Control Signal. In this case, the length of the control sequence defines also the length of the output. """ name = 'Linear Difference Equation' author = 'Mathias Broxvall' icon = 'linear_difference_equations.svg' description = ( 'Performs a forward projection of a linear difference equation system') nodeid = ('com.sympathyfordata.timeseriesanalysis' '.project_linear_difference_equation') tags = Tags(Tag.Analysis.SignalProcessing) inputs = Ports([ Port.Table('Initial conditions', name='init cond'), Port.Table('Coefficients', name='coefficients'), Port.Custom('table', 'Control signal', name='control signal', n=(0, 1, 0)), ]) outputs = Ports([ Port.Table('Output', name='out'), ]) parameters = node.parameters() parameters.set_integer( 'T', label='T', value=10, description='Number of points generated in outputs including the ' 'initial condition points') def execute(self, node_context): init_cond_tbl = node_context.input['init cond'] coeff_tbl = node_context.input['coefficients'] ctrl_sig_tbls = node_context.input.group('control signal') out_tbl = node_context.output['out'] T = node_context.parameters['T'].value Vu = 0 # Number of control-signals if (init_cond_tbl.number_of_rows() == 0 or coeff_tbl.number_of_rows() == 0): raise SyDataError("Empty table") if ctrl_sig_tbls: ctrl_sig_tbl = ctrl_sig_tbls[0] Vu = ctrl_sig_tbl.number_of_columns() T = ctrl_sig_tbl.number_of_rows() # using same names for variables as in docstring V = len(coeff_tbl.cols()) if (coeff_tbl.number_of_rows()-1) % (V+Vu) != 0: raise SyDataError( 'Invalid number of coefficients, number of rows-1 must be a ' 'multiple of number of columns in coefficients table plus ' 'number of columns in control signal' ) K = int((coeff_tbl.number_of_rows()-1) / (V+Vu)) bias = np.array([col.data[0] for col in coeff_tbl.cols()]) coeff = np.column_stack([col.data[1:] for col in coeff_tbl.cols()]) if K > init_cond_tbl.number_of_rows(): raise SyDataError( 'Number of coefficients must be at least one more ' 'than the number of initial conditions') # Pre-allocate array with one row per time point, and # system variables + control signals as columns (in that order) data = np.zeros((T, V+Vu)) data[:init_cond_tbl.number_of_rows(), :init_cond_tbl.number_of_columns()] = ( np.column_stack([col.data for col in init_cond_tbl.cols()])) if Vu > 0: data[:, init_cond_tbl.number_of_columns():] = ( np.column_stack([col.data for col in ctrl_sig_tbl.cols()]) ) for t in range(init_cond_tbl.number_of_rows(), T): # History used for generating time point t contains last K values # of system variables and control variables if t-K-1 >= 0: hist = data[t-1:t-K-1:-1, :].T.ravel() else: hist = data[t-1::-1, :].T.ravel() val = np.array( [np.dot(hist, coeff[:, i]) for i in range(V)]) + bias data[t, :V] = val for idx, col in enumerate(init_cond_tbl.cols()): out_tbl.set_column_from_array(col.name, data[:, idx]) out_tbl.set_column_attributes(col.name, col.attrs) attrs = init_cond_tbl.get_table_attributes() attrs['history length'] = K attrs['number of system variables'] = V attrs['number of control signals'] = Vu out_tbl.set_table_attributes(init_cond_tbl.get_table_attributes())
[docs] class TimeSeriesToFeatures(node.Node): """ Converts time-series of length N into a target signal 'f0' and M features signals 'f1 .. fM' of length (N-M) as follows:\n f0 = x_{M+1} .. x_{n}\n f1 = x_1, ... x_{n-m}\n f2 = x_2, ... x_{n-m+1}\n ...\n fM = x_M, ... x_{n-1}\n The features 'f1 .. fM' thus capture the last M values of the signal f0. """ name = 'Time Series To Features' author = 'Mathias Broxvall' icon = 'timeseries_features.svg' description = ( "Converts time-series of length N into a target signal 'f0' and M " "features signals 'f1 .. fM' of length (N-M)") nodeid = 'com.sympathyfordata.timeseriesanalysis.time_series_to_features' tags = Tags(Tag.Analysis.Features) inputs = Ports([ Port.Table('Input signals', name='input'), ]) outputs = Ports([ Port.Table('Features', name='x'), Port.Table('Target signal', name='y'), ]) parameters = node.parameters() parameters.set_integer( 'M', label='History length', value=3, description='Number of points of history (M) captured at each time ' 'point') def execute(self, node_context): in_tbl = node_context.input['input'] x_tbl = node_context.output['x'] y_tbl = node_context.output['y'] m = node_context.parameters['M'].value # k = in_tbl.number_of_rows() - m for col in in_tbl.cols(): y_tbl.set_column_from_array(col.name, col.data[m:]) for col in in_tbl.cols(): for i in range(m): data = col.data[m - i - 1: -i - 1] x_tbl.set_column_from_array( '{}_{}'.format(col.name, i + 1), data)