Source code for node_dtw

# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
import fastdtw

from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sylib.machinelearning.utility import table_to_array
from sylib_timeseries.utils import distance_metrics


[docs] class FastDTW(node.Node): """ For additional details please refer to the `fastdtw <https://pypi.org/project/fastdtw>`_ library """ name = 'Fast DTW (Dynamic Time Warping)' author = 'Mathias Broxvall' icon = 'dtw.svg' description = ( 'Fast Dynamic Time Warp provides ' 'a mapping between the time points of two signals ' 'and outputs the final distance between them.' ) nodeid = 'com.sympathyfordata.timeseriesanalysis.fast_dtw' tags = Tags(Tag.Analysis.SignalProcessing) parameters = node.parameters() parameters.set_string( 'distance', label='Distance function', value='num: euclidean', description=( 'Function for calculating distances between two rows ' '(one from each table), for more details refer to ' '`Distance Metrics <https://scikit-learn.org/stable/modules/' 'generated/sklearn.neighbors.DistanceMetric.html>`_'), editor=node.editors.combo_editor( options=sorted(list(distance_metrics.keys())))) parameters.set_float( 'p', label='P', value=32.0, description=( 'power used for Minkowski distance')) inputs = Ports([Port.Table('Signal 1', name='signal 1'), Port.Table('Signal 2', name='signal 2')]) outputs = Ports([Port.Table('Path', name='path'), Port.Table('Distance', name='distance')]) def execute(self, node_context): sig1 = table_to_array(node_context.input['signal 1']) sig2 = table_to_array(node_context.input['signal 2']) path_tbl = node_context.output['path'] distance_tbl = node_context.output['distance'] dist_metric = distance_metrics[ node_context.parameters['distance'].value] def dist_fn(u, v): return dist_metric(u, v, node_context.parameters) distance, path = fastdtw.fastdtw(sig1, sig2, dist=dist_fn) distance_tbl.set_column_from_array('distance', np.array([distance])) path_tbl.set_column_from_array('P0', np.array([p[0] for p in path])) path_tbl.set_column_from_array('P1', np.array([p[1] for p in path]))