Source code for node_apca
# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
from sympathy.api import node
from sympathy.api.exceptions import SyNodeError, SyConfigurationError
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, adjust
from sylib_timeseries.utils import (
pad_pow2, haar, inverse_haar_flat, binary_search
)
[docs]
class APCATransform(node.Node):
"""
The algorithm uses haar-transforms and heuristics for generating the
segments, meaning that a global optima is not guaranteed
"""
name = 'Adaptive Piecewise Constant Approximation (APCA)'
author = 'Mathias Broxvall'
icon = 'apca.svg'
description = (
'Applies the APCA algorithm to split the input time series '
'into a number of constant-valued pieces of varying length '
'while minimizing the mean-square error. It outputs a table '
'containing indices for slices with meta values'
'(e.g. errors) as table attributes. The second output contains '
'approximated values and column attributes with per-column error'
)
nodeid = 'com.sympathyfordata.timeseriesanalysis.apca_transform'
tags = Tags(Tag.Analysis.Features)
split_column_editor = node.editors.combo_editor()
time_column_editor = node.editors.multilist_editor(mode=False)
parameters = node.parameters()
parameters.set_list(
'split column',
label='Select master column',
description='The column on which the APCA algorithm is run, all other '
'columns will be split using the same segments as those '
'generated for the master column',
value=[],
editor=split_column_editor)
parameters.set_list(
'time column',
label='Select time column',
description='The time column is passed through without modification',
value=[],
editor=time_column_editor)
parameters.set_integer(
'n_segments',
label='Number of segments',
value=3,
description='Number of segments to generate')
parameters.set_float(
'max_error',
label='Max error',
value=0.0,
description='If non-zero then increase number of segments until error '
'is less than this. Due to heuristic functions error may '
'be overshoot slightly'
)
inputs = Ports([
Port.Table('input', name='input')
])
outputs = Ports([
Port.Table('output indices', name='output indices'),
Port.Table('output values', name='output values')
])
def adjust_parameters(self, node_context):
adjust(node_context.parameters['split column'],
node_context.input['input'])
adjust(node_context.parameters['time column'],
node_context.input['input'])
def execute(self, node_context):
in_tbl = node_context.input['input']
out_tbl = node_context.output['output indices']
out_val_tbl = node_context.output['output values']
n_segments = node_context.parameters['n_segments'].value
max_error = node_context.parameters['max_error'].value
if not node_context.parameters['split column'].value_names:
raise SyConfigurationError('You must configure a master column')
split_col = node_context.parameters['split column'].selected
time_cols = node_context.parameters['time column'].value_names
n = in_tbl.number_of_rows()
col = in_tbl.col(split_col)
data = col.data
padded_data = pad_pow2(data)
remainder, coefficients = haar(padded_data)
importance = [np.abs(coef)/np.power(2, level)
for level, coef in enumerate(coefficients[::-1])]
importance = np.concatenate(importance)
order = np.argsort(importance)
coefficients = np.concatenate(coefficients[::-1])
def calculate_segments(n_segments):
pruned_coefficients = np.array(coefficients)
pruned_coefficients[order[:-min(n_segments, n)]] = 0
new_data = inverse_haar_flat(
remainder, pruned_coefficients)[:len(data)]
splits = np.r_[0, (np.abs(new_data[1:] - new_data[:-1]) > 1e-5)
.nonzero()[0]+1, n]
segments = [(start, np.mean(data[start:end]))
for start, end in zip(splits[:-1], splits[1:])]
# Merge down the segments until we have sufficiently few segments
while len(segments) > n_segments:
best, best_err = None, None
for i in range(len(segments)-1):
start, mean0 = segments[i]
mid, mean1 = segments[i+1]
end, _ = segments[i+2] if i+2 < len(segments) else (
n, None)
mean = np.mean(data[start:end])
diff_err = (np.sum(np.square(data[start:end] - mean)) -
np.sum(np.square(data[start:mid] - mean0)) -
np.sum(np.square(data[mid:end] - mean1)))
if (best is None) or (diff_err < best_err):
best, best_err = i, diff_err
start, _ = segments[best]
end, _ = segments[best+1]
segments[best] = (start, np.mean(data[start:end]))
del segments[best+1]
return segments
def eval_segments_errors(segments):
err = 0
for ((s, mean), (e, _)) in zip(segments, segments[1:]+[(n, None)]):
err += np.sum(np.square(data[s:e]-mean))
return err / n
def test_segments_errors(n_segments):
segments = calculate_segments(n_segments)
return eval_segments_errors(segments) < max_error
if max_error == 0:
segments = calculate_segments(n_segments)
else: # Determine number of segments needed by binary search
try:
n_segments, iters = binary_search(
n_segments, test_segments_errors)
except IndexError as exc:
raise SyNodeError(
"Failed to minimize error sufficiently"
) from exc
segments = calculate_segments(n_segments+1)
# List of segments start/stop values for indices
segs = [seg for seg, _ in segments]+[n]
# Output generated segments
out_tbl.set_column_from_array(
'start', np.array([start for start in segs[:-1]]))
out_tbl.set_column_from_array(
'end', np.array([end for end in segs[1:]]))
tbl_attrs = {
'master column': col.name,
'master error': float(eval_segments_errors(segments)),
}
sum_error = 0
for col in in_tbl.cols():
if col.name in time_cols:
out_val_tbl.set_column_from_array(
col.name, col.data, attributes=col.attrs)
continue
approx_values = np.concatenate([
np.full(e-s, np.mean(col.data[s:e]))
for s, e in zip(segs[:], segs[1:])
])
this_error = np.sum(np.square(col.data - approx_values)) / n
col_attrs = dict(col.attrs)
col_attrs['error'] = this_error
sum_error += this_error
out_val_tbl.set_column_from_array(
col.name, approx_values, attributes=col_attrs)
out_tbl.set_column_from_array(
col.name,
np.array([np.mean(col.data[s:e])
for s, e in zip(segs[:-1], segs[1:])]))
tbl_attrs['sum error'] = sum_error
out_tbl.set_table_attributes(tbl_attrs)