Source code for node_sync_time

# This file is part of Sympathy for Data.
# Copyright (c) 2024, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
import scipy.signal as signal
import scipy.interpolate as si

from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api import dtypes
from sympathy.platform.exceptions import SyDataError
from sympathy.platform.node import adjust


def calculate_time_lag(reference_time, reference_data, reference_mean_step,
                       target_data, target_mean_step) -> float:
    # Interpolate selected reference table data. Target table data and
    # its sampling rate remains the same
    interpolated_reference_time = np.arange(reference_time[0],
                                            reference_time[0] +
                                            reference_mean_step *
                                            len(reference_time),
                                            target_mean_step)

    interpolation_func = si.interp1d(reference_time,
                                     reference_data,
                                     bounds_error=False,
                                     fill_value=np.nan)
    reference_data = interpolation_func(interpolated_reference_time)

    # Mask out-of-bounds values
    valid_mask = ~np.isnan(reference_data)
    reference_data_valid = reference_data[valid_mask]

    # Find cross-correlation and maximum lag
    correlation = signal.correlate(
        target_data, reference_data_valid, mode="full")
    lags = signal.correlation_lags(
        target_data.size, reference_data_valid.size, mode="full")
    max_correlation_index = np.argmax(correlation)
    time_lag = lags[max_correlation_index]

    return time_lag * target_mean_step


def _datetime_to_float(arr: np.ndarray) -> np.ndarray:
    """Returned array is in microseconds."""
    return (arr - arr[0]) / np.timedelta64(1, 'us')


[docs] class SynchronizeTimeseries(node.Node): """ Synchronizes two tables in time by selecting a signal in each table and then finding the time offset at which the features of those two signals overlap the most. The node uses cross-correlation internally to find the offset, hence the signal features needs to be similar in shape, but scale doesn't matter. Note that the resulting time offset will always be a multiple of the smallest time step of the two signals. """ name = 'Time Synchronisation' author = 'Evert Häggman' icon = 'time_sync.svg' description = ('Synchronize two tables in time by matching ' 'features of selected signals.') nodeid = ( 'com.sympathyfordata.timeseriesanalysis.timeseries_synchronisation') tags = Tags(Tag.Analysis.SignalProcessing) parameters = node.parameters() parameters.set_string( 'target_signal', label='Target Signal', description='Choose a column to use as the target signal', editor=node.editors.combo_editor(edit=True, placeholder='Required')) parameters.set_string( 'target_time', label='Target Time column', description='Choose a column to use as the target time', editor=node.editors.combo_editor(edit=True, placeholder='Required')) parameters.set_string( 'reference_signal', label='Reference Signal', description='Choose a column to use as the reference signal', editor=node.editors.combo_editor(edit=True, placeholder='Required')) parameters.set_string( 'reference_time', label='Reference Time', description='Choose a column to use as the reference time', editor=node.editors.combo_editor(edit=True, placeholder='Required')) inputs = Ports([ Port.Table(description=('Input table with target signal and time'), name='Target table'), Port.Table(description=('Input table with reference signal and time'), name='Reference table') ]) outputs = Ports([ Port.Table(description=('Target table from input ' 'with synchronized time column'), name='Result table') ]) def adjust_parameters(self, node_context): adjust(node_context.parameters['target_signal'], node_context.input['Target table']) adjust(node_context.parameters['target_time'], node_context.input['Target table']) adjust(node_context.parameters['reference_signal'], node_context.input['Reference table']) adjust(node_context.parameters['reference_time'], node_context.input['Reference table']) def execute(self, node_context): parameters = node_context.parameters target_table = node_context.input['Target table'] reference_table = node_context.input['Reference table'] result_table = node_context.output['Result table'] if reference_table.number_of_rows() == 0: raise SyDataError("Empty reference table (no rows)") target_data = target_table._require_column( parameters['target_signal']) target_time = target_table._require_column( parameters['target_time']) reference_data = reference_table._require_column( parameters['reference_signal']) reference_time = reference_table._require_column( parameters['reference_time']) if not dtypes.compatible(target_time, reference_time): raise SyDataError( 'Selected time columns are not of same type. ' 'target_type: {}, reference_type: {}'.format( dtypes.typename(target_time), dtypes.typename(reference_time))) # To support NaNs/masked we could remove all rows that contain NaNs # (see how Drop NaN Table implements this). Removing rows from the # beginning or end of the table would always be safe, but removing rows # in the middle of the table might lead to jumps in the time arrays. if np.any(np.isnan(reference_data)) or np.any(np.isnan(target_data)): raise SyDataError( "Data contains NaNs. This node currently doesn't support NaNs." "\n\nHint: you can use the node Drop NaN Table to remove any " "rows with NaNs.") orig_target_time = target_time if target_time.dtype.kind == reference_time.dtype.kind == 'M': target_time = _datetime_to_float(target_time) reference_time = _datetime_to_float(reference_time) target_mean_step = np.diff(target_time).mean() reference_mean_step = np.diff(reference_time).mean() # Calculations made with step size and not the actual sampling rates if target_mean_step > reference_mean_step: time_lag = -calculate_time_lag( reference_time=target_time, reference_data=target_data, reference_mean_step=target_mean_step, target_data=reference_data, target_mean_step=reference_mean_step) else: time_lag = calculate_time_lag( reference_time=reference_time, reference_data=reference_data, reference_mean_step=reference_mean_step, target_data=target_data, target_mean_step=target_mean_step) if orig_target_time.dtype.kind == 'M': # Scale the time lag based on sampling rate for datetime format time_lag *= np.timedelta64(1, 'us') new_target_time = orig_target_time - time_lag # Synchronize target time series based on resulting the time lag for column_name in target_table.column_names(): if column_name == parameters['target_time'].value: result_table[column_name] = new_target_time else: result_table.update_column(column_name, target_table)