# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
from sympathy.api import node
from sympathy.api.exceptions import SyDataError
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
[docs]
class DifferenceEquation(node.Node):
"""
Performs a forward projection of the following linear difference equation
system:
yᵢ(t) = aᵢ₀ + ∑ aᵢⱼₖ yⱼ(t-k) + ∑ bᵢⱼₖ uⱼ(t-k)
Where aᵢⱼₖ and bᵢⱼₖ are the coefficients giving the contribution
to yᵢ based on the value of yⱼ or uⱼ at time point t-k.
The system with v variables y_1 ... y_v and control signals u_1 ... u_r
is described by (K+1) x (v+r) coefficients, where
K is the number of previous times samples considered by
this system.
This system takes as input v columns of (v+r)*K+1 rows as coefficients as
follows:
aᵢ₀ = column[i][0],\n
aᵢⱼₖ = column[i][j*K + k],\n
bᵢⱼₖ = column[i][V*K + j*K + k]\n
Given at least K rows of initial condition this node steps the
linear system forwards in time and outputs all variables at their
corresponding time points.
By default the system has no coefficients b and
the node configuration parameter T, defines the length of the system's
response.\n
To use the node for a system with a control signal, right click the node
and select
Ports>Input>Create: Control Signal.
In this case, the length of the control sequence defines also the length of
the output.
"""
name = 'Linear Difference Equation'
author = 'Mathias Broxvall'
icon = 'linear_difference_equations.svg'
description = (
'Performs a forward projection of a linear difference equation system')
nodeid = ('com.sympathyfordata.timeseriesanalysis'
'.project_linear_difference_equation')
tags = Tags(Tag.Analysis.SignalProcessing)
inputs = Ports([
Port.Table('Initial conditions', name='init cond'),
Port.Table('Coefficients', name='coefficients'),
Port.Custom('table', 'Control signal', name='control signal',
n=(0, 1, 0)),
])
outputs = Ports([
Port.Table('Output', name='out'),
])
parameters = node.parameters()
parameters.set_integer(
'T',
label='T',
value=10,
description='Number of points generated in outputs including the '
'initial condition points')
def execute(self, node_context):
init_cond_tbl = node_context.input['init cond']
coeff_tbl = node_context.input['coefficients']
ctrl_sig_tbls = node_context.input.group('control signal')
out_tbl = node_context.output['out']
T = node_context.parameters['T'].value
Vu = 0 # Number of control-signals
if (init_cond_tbl.number_of_rows() == 0 or
coeff_tbl.number_of_rows() == 0):
raise SyDataError("Empty table")
if ctrl_sig_tbls:
ctrl_sig_tbl = ctrl_sig_tbls[0]
Vu = ctrl_sig_tbl.number_of_columns()
T = ctrl_sig_tbl.number_of_rows()
# using same names for variables as in docstring
V = len(coeff_tbl.cols())
if (coeff_tbl.number_of_rows()-1) % (V+Vu) != 0:
raise SyDataError(
'Invalid number of coefficients, number of rows-1 must be a '
'multiple of number of columns in coefficients table plus '
'number of columns in control signal'
)
K = int((coeff_tbl.number_of_rows()-1) / (V+Vu))
bias = np.array([col.data[0] for col in coeff_tbl.cols()])
coeff = np.column_stack([col.data[1:] for col in coeff_tbl.cols()])
if K > init_cond_tbl.number_of_rows():
raise SyDataError(
'Number of coefficients must be at least one more '
'than the number of initial conditions')
# Pre-allocate array with one row per time point, and
# system variables + control signals as columns (in that order)
data = np.zeros((T, V+Vu))
data[:init_cond_tbl.number_of_rows(),
:init_cond_tbl.number_of_columns()] = (
np.column_stack([col.data
for col in init_cond_tbl.cols()]))
if Vu > 0:
data[:, init_cond_tbl.number_of_columns():] = (
np.column_stack([col.data for col in ctrl_sig_tbl.cols()])
)
for t in range(init_cond_tbl.number_of_rows(), T):
# History used for generating time point t contains last K values
# of system variables and control variables
if t-K-1 >= 0:
hist = data[t-1:t-K-1:-1, :].T.ravel()
else:
hist = data[t-1::-1, :].T.ravel()
val = np.array(
[np.dot(hist, coeff[:, i]) for i in range(V)]) + bias
data[t, :V] = val
for idx, col in enumerate(init_cond_tbl.cols()):
out_tbl.set_column_from_array(col.name, data[:, idx])
out_tbl.set_column_attributes(col.name, col.attrs)
attrs = init_cond_tbl.get_table_attributes()
attrs['history length'] = K
attrs['number of system variables'] = V
attrs['number of control signals'] = Vu
out_tbl.set_table_attributes(init_cond_tbl.get_table_attributes())
[docs]
class TimeSeriesToFeatures(node.Node):
"""
Converts time-series of length N into a target signal 'f0' and
M features signals 'f1 .. fM' of length (N-M) as follows:\n
f0 = x_{M+1} .. x_{n}\n
f1 = x_1, ... x_{n-m}\n
f2 = x_2, ... x_{n-m+1}\n
...\n
fM = x_M, ... x_{n-1}\n
The features 'f1 .. fM' thus capture the last M values of the signal f0.
"""
name = 'Time Series To Features'
author = 'Mathias Broxvall'
icon = 'timeseries_features.svg'
description = (
"Converts time-series of length N into a target signal 'f0' and M "
"features signals 'f1 .. fM' of length (N-M)")
nodeid = 'com.sympathyfordata.timeseriesanalysis.time_series_to_features'
tags = Tags(Tag.Analysis.Features)
inputs = Ports([
Port.Table('Input signals', name='input'),
])
outputs = Ports([
Port.Table('Features', name='x'),
Port.Table('Target signal', name='y'),
])
parameters = node.parameters()
parameters.set_integer(
'M',
label='History length',
value=3,
description='Number of points of history (M) captured at each time '
'point')
def execute(self, node_context):
in_tbl = node_context.input['input']
x_tbl = node_context.output['x']
y_tbl = node_context.output['y']
m = node_context.parameters['M'].value
# k = in_tbl.number_of_rows() - m
for col in in_tbl.cols():
y_tbl.set_column_from_array(col.name, col.data[m:])
for col in in_tbl.cols():
for i in range(m):
data = col.data[m - i - 1: -i - 1]
x_tbl.set_column_from_array(
'{}_{}'.format(col.name, i + 1), data)