Source code for node_dtw
# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
import fastdtw
from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sylib.machinelearning.utility import table_to_array
from sylib_timeseries.utils import distance_metrics
[docs]
class FastDTW(node.Node):
"""
For additional details please refer to the
`fastdtw <https://pypi.org/project/fastdtw>`_ library
"""
name = 'Fast DTW (Dynamic Time Warping)'
author = 'Mathias Broxvall'
icon = 'dtw.svg'
description = (
'Fast Dynamic Time Warp provides '
'a mapping between the time points of two signals '
'and outputs the final distance between them.'
)
nodeid = 'com.sympathyfordata.timeseriesanalysis.fast_dtw'
tags = Tags(Tag.Analysis.SignalProcessing)
parameters = node.parameters()
parameters.set_string(
'distance', label='Distance function', value='num: euclidean',
description=(
'Function for calculating distances between two rows '
'(one from each table), for more details refer to '
'`Distance Metrics <https://scikit-learn.org/stable/modules/'
'generated/sklearn.neighbors.DistanceMetric.html>`_'),
editor=node.editors.combo_editor(
options=sorted(list(distance_metrics.keys()))))
parameters.set_float(
'p', label='P', value=32.0,
description=(
'power used for Minkowski distance'))
inputs = Ports([Port.Table('Signal 1', name='signal 1'),
Port.Table('Signal 2', name='signal 2')])
outputs = Ports([Port.Table('Path', name='path'),
Port.Table('Distance', name='distance')])
def execute(self, node_context):
sig1 = table_to_array(node_context.input['signal 1'])
sig2 = table_to_array(node_context.input['signal 2'])
path_tbl = node_context.output['path']
distance_tbl = node_context.output['distance']
dist_metric = distance_metrics[
node_context.parameters['distance'].value]
def dist_fn(u, v):
return dist_metric(u, v, node_context.parameters)
distance, path = fastdtw.fastdtw(sig1, sig2, dist=dist_fn)
distance_tbl.set_column_from_array('distance', np.array([distance]))
path_tbl.set_column_from_array('P0', np.array([p[0] for p in path]))
path_tbl.set_column_from_array('P1', np.array([p[1] for p in path]))