# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
# """
# Implements the PAA (Piecewise Aggregate Approximation) and SAX (Symbolic
# Aggregation Approximation) algorithms.
# """
import math
import numpy as np
import scipy
import scipy.fftpack
import scipy.signal
from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import SyConfigurationError
from sylib_timeseries.utils import aggregate_fns
[docs]
class PiecewiseAggregationAnalysis(node.Node):
name = 'Piecewise Aggregation Analysis (PAA)'
author = 'Mathias Broxvall'
icon = 'paa.svg'
nodeid = ('com.sympathyfordata.timeseriesanalysis'
'.piecewise_aggregation_analysis')
tags = Tags(Tag.Analysis.Features)
description = (
'Applies the Piecewise Aggregation Analysis (PAA) algorithm on a '
'table, treating each column as a single time-series. Splits the '
'time-series into a number of bins and returns the aggregate within '
'each bin. Typical choice of aggregate function is the average.'
)
parameters = node.parameters()
parameters.set_string(
'aggregate',
label='Aggregate',
value='average',
description=(
'Selects aggregate function to apply to each bin'),
editor=node.editors.combo_editor(
options=sorted(list(aggregate_fns.keys()))))
parameters.set_string(
'inclusion',
label='Binning method',
value='uneven',
description=(
'Method for determining the value selection for each bin\n'
'\n uneven : allows a different number of value in each bin if '
'input length is not a multiple of bins\n'
'\n pad-first : pads the input data with copies of the first '
'value\n'
'\n pad-last : pads the input data with copies of the last value\n'
'\n overlapping : guarantees that all bins have the same\n'
'\n number of samples but samples may fit multiple bins'
),
editor=node.editors.combo_editor(
options=['uneven', 'pad-first', 'pad-last', 'overlapping'])
)
parameters.set_integer(
'bins',
label='Bins',
value=10,
description='Number of output bins or number of samples per bin')
parameters.set_boolean(
'fixed',
label='Fixed bin size',
value=False,
description='Uses bins of a fixed size instead of a fixed number of '
'bins.')
parameters.set_boolean(
'indices',
label='Output indices',
value=False,
description='Outputs two columns with the start/stop index for each '
'bin. Indices start at zero.Stop index is the first row '
'that is NOT included in the given bin')
parameters.set_boolean(
'bin_numbers',
label='Output bin number',
value=False,
description='Generates one column for row in the input data '
'containing the bin number that row was given (without '
'padding)')
inputs = Ports([
Port.Table('input', name='input')
])
outputs = Ports([
Port.Table('output', name='output')
])
def execute(self, node_context):
in_tbl = node_context.input['input']
out_tbl = node_context.output['output']
aggregate = node_context.parameters['aggregate'].value
nbins = node_context.parameters['bins'].value
fixed = node_context.parameters['fixed'].value
inclusion = node_context.parameters['inclusion'].value
indices = node_context.parameters['indices'].value
bin_numbers = node_context.parameters['bin_numbers'].value
aggregate_fn = aggregate_fns[aggregate]
def fn(x):
return aggregate_fn(x, node_context.parameters)
pad_first = 0
pad_last = 0
s = abs(nbins)
n = in_tbl.number_of_rows()
if inclusion == 'pad-first':
pad_first = (s - (n % s)) % s
n += pad_first
elif inclusion == 'pad-last':
pad_last = (s - (n % s)) % s
n += pad_last
# bins: List of tuples (A, B) where including all values from
# A up to but not including B
if not fixed:
if inclusion == 'overlapping':
bin_size = int(math.ceil(n / float(nbins)))
bins = [(int(math.floor((i*n)/nbins)),
int(math.floor((i*n)/nbins)) + bin_size)
for i in range(nbins)]
else:
bins = [
(int(math.floor((i*n)/nbins)),
int(math.floor(((i+1)*n)/nbins))) for i in range(nbins)]
else:
if inclusion == 'overlapping':
bin_size = nbins
nbins = int(np.ceil(n / float(bin_size)))
bins = [(int(math.floor((i*n)/nbins)),
int(math.floor((i*n)/nbins) + bin_size))
for i in range(nbins)]
else:
bin_size = nbins
nbins = int(np.ceil(n / float(bin_size)))
bins = [(int(i*bin_size), int(min(n, (i+1)*bin_size)))
for i in range(nbins)]
if bin_numbers:
if inclusion == 'overlapping':
raise SyConfigurationError('Cannot output bin numbers with '
'inclusion mode overlapping')
rows = in_tbl.number_of_rows()
data = [i
for i, (start, stop) in enumerate(bins)
for _ in range(max(0, min(rows, stop-pad_first)) -
max(0, min(rows, start-pad_first)))]
out_tbl.set_column_from_array('bin', np.array(data))
else:
if indices:
out_tbl.set_column_from_array(
'start',
np.array([max(0, start-pad_first) for start, stop in bins])
)
out_tbl.set_column_from_array(
'stop',
np.array([
max(0, min(in_tbl.number_of_rows(), stop-pad_first))
for start, stop in bins])
)
for col in in_tbl.cols():
data = col.data
data = np.r_[[data[0]]*pad_first, data, data[-1]*pad_last]
res = np.array([fn(data[start:stop]) for start, stop in bins])
out_tbl.set_column_from_array(col.name, res)
[docs]
class SymbolicAggregationApproximation(node.Node):
name = 'Symbolic Aggregation Approximation (SAX)'
author = 'Mathias Broxvall'
description = (
'Uses Symbolic Aggregation Approximation to reduce the output space '
'into a (small) set of symbols. The input values are replaced by the '
'value of the bin that they match. Each bin\'s value corresponds to '
'one letter or digit. This generates a string from the input values. '
'The final output is generated by sliding a window over this string.')
nodeid = 'com.sympathyfordata.timeseriesanalysis.sax'
tags = Tags(Tag.Analysis.Features)
icon = 'sax.svg'
LETTERS = ([chr(code) for code in range(ord('a'), ord('z')+1)] +
[chr(code) for code in range(ord('A'), ord('Z')+1)] +
[chr(code) for code in range(ord('0'), ord('9')+1)])
parameters = node.parameters()
parameters.set_integer(
'bins',
label='Bins',
value=3,
description='Number of output bins')
parameters.set_integer(
'window',
label='Sliding window',
value=0,
description='Generates words using a sliding window over the '
'generated symbols. If zero then a single string is given')
inputs = Ports([
Port.Table('input', name='input'),
Port.Custom('table', 'bin_values', name='bin_values', n=(0, 1))
])
outputs = Ports([
Port.Table('output', name='output'),
Port.Custom('table', 'bin_values', name='bin_values', n=(0, 1))
])
def execute(self, node_context):
in_tbl = node_context.input['input']
out_tbl = node_context.output['output']
bins = node_context.parameters['bins'].value
window = node_context.parameters['window'].value
input_bin_values_grp = node_context.input.group('bin_values')
output_bin_values_grp = node_context.output.group('bin_values')
if input_bin_values_grp:
bins = input_bin_values_grp[0].number_of_rows()
for i, col in enumerate(in_tbl.cols()):
data = col.data
mean = np.mean(data)
std = np.std(data)
if input_bin_values_grp:
bin_values = input_bin_values_grp[0].cols()[i].data
else:
bin_values = scipy.stats.norm.ppf(
np.linspace(0, 1, int(bins + 1))[1:]) * std + mean
if output_bin_values_grp:
output_bin_values_grp[0].set_column_from_array(
col.name, bin_values)
letters = []
for x in data:
nz = (bin_values > x).nonzero()[0]
if len(nz) == 0:
val = bins-1
else:
val = nz[0]
letters.append(SymbolicAggregationApproximation.LETTERS[val])
if window == 0:
res = np.array([''.join(letters)])
elif window == 1:
res = np.array(letters)
else:
res = np.array([''.join(letters[i:i+window])
for i in range(len(letters)-window+1)])
out_tbl.set_column_from_array(col.name, res)