Source code for node_apca

# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.

import numpy as np

from sympathy.api import node
from sympathy.api.exceptions import SyNodeError, SyConfigurationError
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, adjust

from sylib_timeseries.utils import (
    pad_pow2, haar, inverse_haar_flat, binary_search
)


[docs] class APCATransform(node.Node): """ The algorithm uses haar-transforms and heuristics for generating the segments, meaning that a global optima is not guaranteed """ name = 'Adaptive Piecewise Constant Approximation (APCA)' author = 'Mathias Broxvall' icon = 'apca.svg' description = ( 'Applies the APCA algorithm to split the input time series ' 'into a number of constant-valued pieces of varying length ' 'while minimizing the mean-square error. It outputs a table ' 'containing indices for slices with meta values' '(e.g. errors) as table attributes. The second output contains ' 'approximated values and column attributes with per-column error' ) nodeid = 'com.sympathyfordata.timeseriesanalysis.apca_transform' tags = Tags(Tag.Analysis.Features) split_column_editor = node.editors.combo_editor() time_column_editor = node.editors.multilist_editor(mode=False) parameters = node.parameters() parameters.set_list( 'split column', label='Select master column', description='The column on which the APCA algorithm is run, all other ' 'columns will be split using the same segments as those ' 'generated for the master column', value=[], editor=split_column_editor) parameters.set_list( 'time column', label='Select time column', description='The time column is passed through without modification', value=[], editor=time_column_editor) parameters.set_integer( 'n_segments', label='Number of segments', value=3, description='Number of segments to generate') parameters.set_float( 'max_error', label='Max error', value=0.0, description='If non-zero then increase number of segments until error ' 'is less than this. Due to heuristic functions error may ' 'be overshoot slightly' ) inputs = Ports([ Port.Table('input', name='input') ]) outputs = Ports([ Port.Table('output indices', name='output indices'), Port.Table('output values', name='output values') ]) def adjust_parameters(self, node_context): adjust(node_context.parameters['split column'], node_context.input['input']) adjust(node_context.parameters['time column'], node_context.input['input']) def execute(self, node_context): in_tbl = node_context.input['input'] out_tbl = node_context.output['output indices'] out_val_tbl = node_context.output['output values'] n_segments = node_context.parameters['n_segments'].value max_error = node_context.parameters['max_error'].value if not node_context.parameters['split column'].value_names: raise SyConfigurationError('You must configure a master column') split_col = node_context.parameters['split column'].selected time_cols = node_context.parameters['time column'].value_names n = in_tbl.number_of_rows() col = in_tbl.col(split_col) data = col.data padded_data = pad_pow2(data) remainder, coefficients = haar(padded_data) importance = [np.abs(coef)/np.power(2, level) for level, coef in enumerate(coefficients[::-1])] importance = np.concatenate(importance) order = np.argsort(importance) coefficients = np.concatenate(coefficients[::-1]) def calculate_segments(n_segments): pruned_coefficients = np.array(coefficients) pruned_coefficients[order[:-min(n_segments, n)]] = 0 new_data = inverse_haar_flat( remainder, pruned_coefficients)[:len(data)] splits = np.r_[0, (np.abs(new_data[1:] - new_data[:-1]) > 1e-5) .nonzero()[0]+1, n] segments = [(start, np.mean(data[start:end])) for start, end in zip(splits[:-1], splits[1:])] # Merge down the segments until we have sufficiently few segments while len(segments) > n_segments: best, best_err = None, None for i in range(len(segments)-1): start, mean0 = segments[i] mid, mean1 = segments[i+1] end, _ = segments[i+2] if i+2 < len(segments) else ( n, None) mean = np.mean(data[start:end]) diff_err = (np.sum(np.square(data[start:end] - mean)) - np.sum(np.square(data[start:mid] - mean0)) - np.sum(np.square(data[mid:end] - mean1))) if (best is None) or (diff_err < best_err): best, best_err = i, diff_err start, _ = segments[best] end, _ = segments[best+1] segments[best] = (start, np.mean(data[start:end])) del segments[best+1] return segments def eval_segments_errors(segments): err = 0 for ((s, mean), (e, _)) in zip(segments, segments[1:]+[(n, None)]): err += np.sum(np.square(data[s:e]-mean)) return err / n def test_segments_errors(n_segments): segments = calculate_segments(n_segments) return eval_segments_errors(segments) < max_error if max_error == 0: segments = calculate_segments(n_segments) else: # Determine number of segments needed by binary search try: n_segments, iters = binary_search( n_segments, test_segments_errors) except IndexError as exc: raise SyNodeError( "Failed to minimize error sufficiently" ) from exc segments = calculate_segments(n_segments+1) # List of segments start/stop values for indices segs = [seg for seg, _ in segments]+[n] # Output generated segments out_tbl.set_column_from_array( 'start', np.array([start for start in segs[:-1]])) out_tbl.set_column_from_array( 'end', np.array([end for end in segs[1:]])) tbl_attrs = { 'master column': col.name, 'master error': float(eval_segments_errors(segments)), } sum_error = 0 for col in in_tbl.cols(): if col.name in time_cols: out_val_tbl.set_column_from_array( col.name, col.data, attributes=col.attrs) continue approx_values = np.concatenate([ np.full(e-s, np.mean(col.data[s:e])) for s, e in zip(segs[:], segs[1:]) ]) this_error = np.sum(np.square(col.data - approx_values)) / n col_attrs = dict(col.attrs) col_attrs['error'] = this_error sum_error += this_error out_val_tbl.set_column_from_array( col.name, approx_values, attributes=col_attrs) out_tbl.set_column_from_array( col.name, np.array([np.mean(col.data[s:e]) for s, e in zip(segs[:-1], segs[1:])])) tbl_attrs['sum error'] = sum_error out_tbl.set_table_attributes(tbl_attrs)