# This file is part of Sympathy for Data.
# Copyright (c) 2024, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
import scipy.signal as signal
import scipy.interpolate as si
from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api import dtypes
from sympathy.platform.exceptions import SyDataError
from sympathy.platform.node import adjust
def calculate_time_lag(reference_time, reference_data, reference_mean_step,
target_data, target_mean_step) -> float:
# Interpolate selected reference table data. Target table data and
# its sampling rate remains the same
interpolated_reference_time = np.arange(reference_time[0],
reference_time[0] +
reference_mean_step *
len(reference_time),
target_mean_step)
interpolation_func = si.interp1d(reference_time,
reference_data,
bounds_error=False,
fill_value=np.nan)
reference_data = interpolation_func(interpolated_reference_time)
# Mask out-of-bounds values
valid_mask = ~np.isnan(reference_data)
reference_data_valid = reference_data[valid_mask]
# Find cross-correlation and maximum lag
correlation = signal.correlate(
target_data, reference_data_valid, mode="full")
lags = signal.correlation_lags(
target_data.size, reference_data_valid.size, mode="full")
max_correlation_index = np.argmax(correlation)
time_lag = lags[max_correlation_index]
return time_lag * target_mean_step
def _datetime_to_float(arr: np.ndarray) -> np.ndarray:
"""Returned array is in microseconds."""
return (arr - arr[0]) / np.timedelta64(1, 'us')
[docs]
class SynchronizeTimeseries(node.Node):
"""
Synchronizes two tables in time by selecting a signal in each table and
then finding the time offset at which the features of those two signals
overlap the most.
The node uses cross-correlation internally to find the offset, hence the
signal features needs to be similar in shape, but scale doesn't matter.
Note that the resulting time offset will always be a multiple of the
smallest time step of the two signals.
"""
name = 'Time Synchronisation'
author = 'Evert Häggman'
icon = 'time_sync.svg'
description = ('Synchronize two tables in time by matching '
'features of selected signals.')
nodeid = (
'com.sympathyfordata.timeseriesanalysis.timeseries_synchronisation')
tags = Tags(Tag.Analysis.SignalProcessing)
parameters = node.parameters()
parameters.set_string(
'target_signal',
label='Target Signal',
description='Choose a column to use as the target signal',
editor=node.editors.combo_editor(edit=True, placeholder='Required'))
parameters.set_string(
'target_time',
label='Target Time column',
description='Choose a column to use as the target time',
editor=node.editors.combo_editor(edit=True, placeholder='Required'))
parameters.set_string(
'reference_signal',
label='Reference Signal',
description='Choose a column to use as the reference signal',
editor=node.editors.combo_editor(edit=True, placeholder='Required'))
parameters.set_string(
'reference_time',
label='Reference Time',
description='Choose a column to use as the reference time',
editor=node.editors.combo_editor(edit=True, placeholder='Required'))
inputs = Ports([
Port.Table(description=('Input table with target signal and time'),
name='Target table'),
Port.Table(description=('Input table with reference signal and time'),
name='Reference table')
])
outputs = Ports([
Port.Table(description=('Target table from input '
'with synchronized time column'),
name='Result table')
])
def adjust_parameters(self, node_context):
adjust(node_context.parameters['target_signal'],
node_context.input['Target table'])
adjust(node_context.parameters['target_time'],
node_context.input['Target table'])
adjust(node_context.parameters['reference_signal'],
node_context.input['Reference table'])
adjust(node_context.parameters['reference_time'],
node_context.input['Reference table'])
def execute(self, node_context):
parameters = node_context.parameters
target_table = node_context.input['Target table']
reference_table = node_context.input['Reference table']
result_table = node_context.output['Result table']
if reference_table.number_of_rows() == 0:
raise SyDataError("Empty reference table (no rows)")
target_data = target_table._require_column(
parameters['target_signal'])
target_time = target_table._require_column(
parameters['target_time'])
reference_data = reference_table._require_column(
parameters['reference_signal'])
reference_time = reference_table._require_column(
parameters['reference_time'])
if not dtypes.compatible(target_time, reference_time):
raise SyDataError(
'Selected time columns are not of same type. '
'target_type: {}, reference_type: {}'.format(
dtypes.typename(target_time),
dtypes.typename(reference_time)))
# To support NaNs/masked we could remove all rows that contain NaNs
# (see how Drop NaN Table implements this). Removing rows from the
# beginning or end of the table would always be safe, but removing rows
# in the middle of the table might lead to jumps in the time arrays.
if np.any(np.isnan(reference_data)) or np.any(np.isnan(target_data)):
raise SyDataError(
"Data contains NaNs. This node currently doesn't support NaNs."
"\n\nHint: you can use the node Drop NaN Table to remove any "
"rows with NaNs.")
orig_target_time = target_time
if target_time.dtype.kind == reference_time.dtype.kind == 'M':
target_time = _datetime_to_float(target_time)
reference_time = _datetime_to_float(reference_time)
target_mean_step = np.diff(target_time).mean()
reference_mean_step = np.diff(reference_time).mean()
# Calculations made with step size and not the actual sampling rates
if target_mean_step > reference_mean_step:
time_lag = -calculate_time_lag(
reference_time=target_time,
reference_data=target_data,
reference_mean_step=target_mean_step,
target_data=reference_data,
target_mean_step=reference_mean_step)
else:
time_lag = calculate_time_lag(
reference_time=reference_time,
reference_data=reference_data,
reference_mean_step=reference_mean_step,
target_data=target_data,
target_mean_step=target_mean_step)
if orig_target_time.dtype.kind == 'M':
# Scale the time lag based on sampling rate for datetime format
time_lag *= np.timedelta64(1, 'us')
new_target_time = orig_target_time - time_lag
# Synchronize target time series based on resulting the time lag
for column_name in target_table.column_names():
if column_name == parameters['target_time'].value:
result_table[column_name] = new_target_time
else:
result_table.update_column(column_name, target_table)