Source code for node_paa_sax

# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.

# """
# Implements the PAA (Piecewise Aggregate Approximation) and SAX (Symbolic
# Aggregation Approximation) algorithms.
# """
import math
import numpy as np
import scipy
import scipy.fftpack
import scipy.signal

from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import SyConfigurationError

from sylib_timeseries.utils import aggregate_fns


[docs] class PiecewiseAggregationAnalysis(node.Node): name = 'Piecewise Aggregation Analysis (PAA)' author = 'Mathias Broxvall' icon = 'paa.svg' nodeid = ('com.sympathyfordata.timeseriesanalysis' '.piecewise_aggregation_analysis') tags = Tags(Tag.Analysis.Features) description = ( 'Applies the Piecewise Aggregation Analysis (PAA) algorithm on a ' 'table, treating each column as a single time-series. Splits the ' 'time-series into a number of bins and returns the aggregate within ' 'each bin. Typical choice of aggregate function is the average.' ) parameters = node.parameters() parameters.set_string( 'aggregate', label='Aggregate', value='average', description=( 'Selects aggregate function to apply to each bin'), editor=node.editors.combo_editor( options=sorted(list(aggregate_fns.keys())))) parameters.set_string( 'inclusion', label='Binning method', value='uneven', description=( 'Method for determining the value selection for each bin\n' '\n uneven : allows a different number of value in each bin if ' 'input length is not a multiple of bins\n' '\n pad-first : pads the input data with copies of the first ' 'value\n' '\n pad-last : pads the input data with copies of the last value\n' '\n overlapping : guarantees that all bins have the same\n' '\n number of samples but samples may fit multiple bins' ), editor=node.editors.combo_editor( options=['uneven', 'pad-first', 'pad-last', 'overlapping']) ) parameters.set_integer( 'bins', label='Bins', value=10, description='Number of output bins or number of samples per bin') parameters.set_boolean( 'fixed', label='Fixed bin size', value=False, description='Uses bins of a fixed size instead of a fixed number of ' 'bins.') parameters.set_boolean( 'indices', label='Output indices', value=False, description='Outputs two columns with the start/stop index for each ' 'bin. Indices start at zero.Stop index is the first row ' 'that is NOT included in the given bin') parameters.set_boolean( 'bin_numbers', label='Output bin number', value=False, description='Generates one column for row in the input data ' 'containing the bin number that row was given (without ' 'padding)') inputs = Ports([ Port.Table('input', name='input') ]) outputs = Ports([ Port.Table('output', name='output') ]) def execute(self, node_context): in_tbl = node_context.input['input'] out_tbl = node_context.output['output'] aggregate = node_context.parameters['aggregate'].value nbins = node_context.parameters['bins'].value fixed = node_context.parameters['fixed'].value inclusion = node_context.parameters['inclusion'].value indices = node_context.parameters['indices'].value bin_numbers = node_context.parameters['bin_numbers'].value aggregate_fn = aggregate_fns[aggregate] def fn(x): return aggregate_fn(x, node_context.parameters) pad_first = 0 pad_last = 0 s = abs(nbins) n = in_tbl.number_of_rows() if inclusion == 'pad-first': pad_first = (s - (n % s)) % s n += pad_first elif inclusion == 'pad-last': pad_last = (s - (n % s)) % s n += pad_last # bins: List of tuples (A, B) where including all values from # A up to but not including B if not fixed: if inclusion == 'overlapping': bin_size = int(math.ceil(n / float(nbins))) bins = [(int(math.floor((i*n)/nbins)), int(math.floor((i*n)/nbins)) + bin_size) for i in range(nbins)] else: bins = [ (int(math.floor((i*n)/nbins)), int(math.floor(((i+1)*n)/nbins))) for i in range(nbins)] else: if inclusion == 'overlapping': bin_size = nbins nbins = int(np.ceil(n / float(bin_size))) bins = [(int(math.floor((i*n)/nbins)), int(math.floor((i*n)/nbins) + bin_size)) for i in range(nbins)] else: bin_size = nbins nbins = int(np.ceil(n / float(bin_size))) bins = [(int(i*bin_size), int(min(n, (i+1)*bin_size))) for i in range(nbins)] if bin_numbers: if inclusion == 'overlapping': raise SyConfigurationError('Cannot output bin numbers with ' 'inclusion mode overlapping') rows = in_tbl.number_of_rows() data = [i for i, (start, stop) in enumerate(bins) for _ in range(max(0, min(rows, stop-pad_first)) - max(0, min(rows, start-pad_first)))] out_tbl.set_column_from_array('bin', np.array(data)) else: if indices: out_tbl.set_column_from_array( 'start', np.array([max(0, start-pad_first) for start, stop in bins]) ) out_tbl.set_column_from_array( 'stop', np.array([ max(0, min(in_tbl.number_of_rows(), stop-pad_first)) for start, stop in bins]) ) for col in in_tbl.cols(): data = col.data data = np.r_[[data[0]]*pad_first, data, data[-1]*pad_last] res = np.array([fn(data[start:stop]) for start, stop in bins]) out_tbl.set_column_from_array(col.name, res)
[docs] class SymbolicAggregationApproximation(node.Node): name = 'Symbolic Aggregation Approximation (SAX)' author = 'Mathias Broxvall' description = ( 'Uses Symbolic Aggregation Approximation to reduce the output space ' 'into a (small) set of symbols. The input values are replaced by the ' 'value of the bin that they match. Each bin\'s value corresponds to ' 'one letter or digit. This generates a string from the input values. ' 'The final output is generated by sliding a window over this string.') nodeid = 'com.sympathyfordata.timeseriesanalysis.sax' tags = Tags(Tag.Analysis.Features) icon = 'sax.svg' LETTERS = ([chr(code) for code in range(ord('a'), ord('z')+1)] + [chr(code) for code in range(ord('A'), ord('Z')+1)] + [chr(code) for code in range(ord('0'), ord('9')+1)]) parameters = node.parameters() parameters.set_integer( 'bins', label='Bins', value=3, description='Number of output bins') parameters.set_integer( 'window', label='Sliding window', value=0, description='Generates words using a sliding window over the ' 'generated symbols. If zero then a single string is given') inputs = Ports([ Port.Table('input', name='input'), Port.Custom('table', 'bin_values', name='bin_values', n=(0, 1)) ]) outputs = Ports([ Port.Table('output', name='output'), Port.Custom('table', 'bin_values', name='bin_values', n=(0, 1)) ]) def execute(self, node_context): in_tbl = node_context.input['input'] out_tbl = node_context.output['output'] bins = node_context.parameters['bins'].value window = node_context.parameters['window'].value input_bin_values_grp = node_context.input.group('bin_values') output_bin_values_grp = node_context.output.group('bin_values') if input_bin_values_grp: bins = input_bin_values_grp[0].number_of_rows() for i, col in enumerate(in_tbl.cols()): data = col.data mean = np.mean(data) std = np.std(data) if input_bin_values_grp: bin_values = input_bin_values_grp[0].cols()[i].data else: bin_values = scipy.stats.norm.ppf( np.linspace(0, 1, int(bins + 1))[1:]) * std + mean if output_bin_values_grp: output_bin_values_grp[0].set_column_from_array( col.name, bin_values) letters = [] for x in data: nz = (bin_values > x).nonzero()[0] if len(nz) == 0: val = bins-1 else: val = nz[0] letters.append(SymbolicAggregationApproximation.LETTERS[val]) if window == 0: res = np.array([''.join(letters)]) elif window == 1: res = np.array(letters) else: res = np.array([''.join(letters[i:i+window]) for i in range(len(letters)-window+1)]) out_tbl.set_column_from_array(col.name, res)