Source code for node_interpolation

# coding: utf8
# Copyright (c) 2013, 2017, System Engineering Software Society
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the System Engineering Software Society nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.
# IN NO EVENT SHALL SYSTEM ENGINEERING SOFTWARE SOCIETY BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Interpolate timeseries to a single timebasis. The new timebasis can
either be an existing timebasis in the adaf-file or a timebasis with a timestep
defined by the user. The timeseries that will be interpolated are selected in a
list. The output file will contain a single system and raster with all the
chosen timeseries.
"""
from __future__ import division, print_function

import collections

import six
import numpy as np
from scipy.interpolate import interp1d as scinterp

from sympathy.api import node_helper
from sympathy.api import node as synode
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, adjust
from sympathy.api.exceptions import SyDataError, SyConfigurationError, sywarn
from sympathy.api import dtypes
from sympathy.api import qt2 as qt_compat
QtGui = qt_compat.import_module('QtGui')
QtWidgets = qt_compat.import_module('QtWidgets')
QtCore = qt_compat.import_module('QtCore')


_METHODS = ['zero', 'nearest', 'linear', 'quadratic', 'cubic']


def _method_wrapper(f):
    def inner(self, signal):
        dtype = signal.dtype
        if not len(self._source_basis):
            return np.ma.masked_all(self._target_basis.shape, dtype=dtype)
        if dtype.kind in ('m', 'M'):
            signal = signal.astype(float)
        result = f(self, signal)
        if dtype.kind in ('m', 'M'):
            return result.astype(dtype)
        return result
    return inner


class RasterResampler(object):
    def __init__(self):
        self._has_warned_about_missing_values = False

    def new_raster(self, source_basis, target_basis):
        """Start working on a new raster."""
        # np.interp can't handle some types. Notably it can't handle datetimes.
        self._time_reference = None
        self._source_basis = self._to_float(source_basis)
        self._target_basis = self._to_float(target_basis)

        # Base cache variables used for all interpolation methods:
        self._i = None
        self._left = None
        self._right = None
        self._mask = None
        # Cache variables used for specific interpolation methods:
        self._nearest_i = None
        self._zero_i = None
        self._linear_i = None
        self._linear_alpha = None
        self._higher = False

    def _to_float(self, basis):
        """Convert datetime bases to float."""
        if basis.dtype.kind == 'M':
            if self._time_reference is None:
                self._time_reference = basis[0]
            timeunit = np.timedelta64(1, 'us')
            return (basis - self._time_reference) / timeunit
        return basis

    def get_method(self, signal, selected_methods):
        """
        Return a function that can be used to interpolate signal to the current
        raster.
        """
        bool_method, int_method, float_method = selected_methods

        kind = signal.dtype.kind
        if kind in ('b', 'S', 'U'):
            selected_method = bool_method
        elif kind in ('i', 'u'):
            selected_method = int_method
        else:
            selected_method = float_method

        if self._source_basis.size == 1 and selected_method not in (
                'zero', 'nearest'):
            # Signals with one sample can only be resampled using 'nearest' or
            # 'zero'.
            # sywarn("Can't interpolate signal with one sample using method "
            #        "'{}', falling back to method 'nearest'".format(
            #            selected_method))
            selected_method = 'nearest'

        if selected_method == 'zero':
            return self.zero
        elif selected_method == 'nearest':
            return self.nearest
        elif selected_method == 'linear':
            return self.linear
        elif selected_method == 'quadratic':
            return self.quadratic
        elif selected_method == 'cubic':
            return self.cubic
        else:
            raise ValueError(
                "Unknown interpolation method '{}'".format(selected_method))

    def _interp_indices(self):
        """
        Calculate and cache the common interpolation steps for the current
        raster. Each interpolation method will refine and use these results.
        """
        if self._i is not None:
            return

        LEFT, RIGHT = -1, -2
        old_i = np.arange(len(self._source_basis))
        self._i = np.interp(self._target_basis, self._source_basis, old_i,
                            left=LEFT, right=RIGHT)
        self._left = self._i == LEFT
        self._right = self._i == RIGHT
        self._mask = np.logical_or(self._left, self._right)

        # Set extrapolated values to what is needed in methods zero and
        # nearest. Other methods mask these values anyways.
        self._i[self._left] = 0
        self._i[self._right] = -1

        # Make it easier to check if any values are masked.
        if not np.any(self._left):
            self._left = None
        if not np.any(self._mask):
            self._mask = None

    @_method_wrapper
    def zero(self, signal):
        """Nearest preceding samples."""
        if self._zero_i is None:
            self._interp_indices()
            self._zero_i = self._i.astype(int)
        new_signal = signal[self._zero_i]
        if self._left is None:
            return new_signal
        else:
            return np.ma.MaskedArray(new_signal, mask=self._left)

    @_method_wrapper
    def nearest(self, signal):
        """Nearest neighbor."""
        if self._nearest_i is None:
            self._interp_indices()
            self._nearest_i = self._i.astype(int)
            if self._mask is not None:
                self._nearest_i[~self._mask] = (
                    self._i[~self._mask] + 0.5).astype(int)
        return signal[self._nearest_i]

    @_method_wrapper
    def linear(self, signal):
        """Piecewise linear interpolation."""
        if self._linear_i is None:
            self._interp_indices()
            self._linear_i = self._i.astype(int)
            self._linear_alpha = self._i - self._linear_i

            # Special handling of end point to make it safe to use
            # self._linear_i + 1 in indexing.
            endpoints = self._linear_i + 1 == len(self._source_basis)
            self._linear_i[endpoints] -= 1
            self._linear_alpha[endpoints] = 1

        new_signal = (signal[self._linear_i] * (1 - self._linear_alpha) +
                      signal[self._linear_i + 1] * self._linear_alpha)

        # These should give exactly the same results as the calculations above,
        # but make sure that NaN/masked at self._linear_i or self._linear_i + 1
        # don't pollute the results.
        new_signal[self._linear_alpha == 0] = (
            signal[self._linear_i][self._linear_alpha == 0])
        new_signal[self._linear_alpha == 1] = (
            signal[self._linear_i + 1][self._linear_alpha == 1])

        if self._mask is None:
            return new_signal
        else:
            return np.ma.MaskedArray(new_signal, mask=self._mask)

    @_method_wrapper
    def quadratic(self, signal):
        """Quadratic interpolation."""
        return self._higher_order(signal, kind='quadratic')

    @_method_wrapper
    def cubic(self, signal):
        """Cubic interpolation."""
        return self._higher_order(signal, kind='cubic')

    def _higher_order(self, signal, kind):
        """
        Interpolations based on scipy.interpolation.interp1d.

        This lacks several features of the numpy-based interpolation methods
        such as optimization for interpolating mulitple signals between the
        same rasters, handling of NaNs/masked values etc.
        """
        if self._higher is False:
            self._mask = np.logical_or(
                self._target_basis > np.max(self._source_basis),
                self._target_basis < np.min(self._source_basis))
            self._higher = True
        self._warn_if_missing_values(signal)
        # Use scipy interpolation function
        new_signal = scinterp(self._source_basis, signal,
                              kind=kind, copy=False,
                              bounds_error=False,
                              fill_value=np.NaN)(self._target_basis)
        if self._mask is None:
            return new_signal
        else:
            return np.ma.MaskedArray(new_signal, mask=self._mask)

    def _warn_if_missing_values(self, signal):
        """Warn about using quadratic/cubic interpolation with NaN values."""
        if self._has_warned_about_missing_values:
            return

        if isinstance(signal, np.ma.MaskedArray):
            if np.any(np.ma.getmaskarray(signal) | np.isnan(signal)):
                sywarn("Masked values/NaNs are not supported when "
                       "interpolating with quadratic or cubic method.")
                self._has_warned_about_missing_values = True


def resample_file_with_spec(parameter_root, spec, in_adaffile, out_adaffile,
                            progress):
    signals_col = parameter_root['signals_colname'].selected
    dt_col = parameter_root['dt_colname'].selected
    tb_col = parameter_root['tb_colname'].selected
    signals = spec.get_column_to_array(signals_col)
    if dt_col is not None:
        dts = spec.get_column_to_array(dt_col)
    else:
        dts = np.zeros(signals.shape, dtype=float) * np.nan
    if tb_col is not None:
        to_tbs = spec.get_column_to_array(tb_col)
    else:
        to_tbs = np.zeros(signals.shape, dtype=six.text_type)

    dt_to_signals = collections.defaultdict(list)
    tbname_to_signals = collections.defaultdict(list)
    for i, (dt, to_tb, signal) in enumerate(zip(dts, to_tbs, signals)):
        if dt and not np.isnan(dt):
            dt_to_signals[dt].append(signal)
        elif to_tb:
            tbname_to_signals[to_tb].append(signal)
        else:
            raise SyDataError("Row {} in specification table specifies "
                              "neither dt nor a target time basis.".format(i))

    new_timebases = []
    for dt, dt_signals in dt_to_signals.items():
        # Create output system/raster
        new_timebasis, unit = get_new_timebasis(in_adaffile, dt, dt_signals)
        system_name = 'Resampled system'
        raster_name = 'Resampled raster {:.2}'.format(dt)
        new_timebases.append(
            (system_name, raster_name, new_timebasis, unit, dt_signals))
    for tbname, tb_signals in tbname_to_signals.items():
        raster_dict = get_raster_dict(in_adaffile)
        try:
            old_system_name, old_raster_name = raster_dict[tbname]
        except KeyError:
            sywarn('No such system/raster: {}'.format(tbname))
            continue
        old_tb_column = (
            in_adaffile.sys[old_system_name][old_raster_name].basis_column())

        system_name = 'Resampled system'
        raster_name = old_raster_name
        new_timebasis = old_tb_column.value()
        unit = old_tb_column.attr['unit'] or 's'
        new_timebases.append(
            (system_name, raster_name, new_timebasis, unit, tb_signals))

    selected_methods = (parameter_root['bool_interp_method'].selected,
                        parameter_root['int_interp_method'].selected,
                        parameter_root['interpolation_method'].selected)

    resampler = RasterResampler()
    progress_counter = 0
    for system_name, raster_name, new_timebasis, unit, signals in (
            new_timebases):
        if not unit:
            unit = 's'
        if system_name not in out_adaffile.sys.keys():
            new_system = out_adaffile.sys.create(system_name)
        if raster_name not in new_system.keys():
            new_raster = new_system.create(raster_name)
        new_raster.create_basis(new_timebasis, attributes={'unit': unit})

        # Loop over all rasters and resample selected signals in them
        for origin_system_name in in_adaffile.sys.keys():
            origin_system = in_adaffile.sys[origin_system_name]
            for origin_raster_name in origin_system.keys():
                origin_raster = origin_system[origin_raster_name]
                signals_in_this_raster = [
                    signal_name for signal_name in origin_raster.keys()
                    if signal_name in signals]
                if not signals_in_this_raster:
                    continue

                origin_basis = origin_raster.basis_column().value()
                new_basis = new_raster.basis_column().value()

                resampler.new_raster(origin_basis, new_basis)

                # Loop over all signals and resample them
                for signal_name in signals_in_this_raster:
                    progress(100. * progress_counter / len(signals))
                    signal = origin_raster[signal_name]
                    y = signal.y
                    interp = resampler.get_method(y, selected_methods)
                    new_y = interp(y)
                    attrs = dict(signal.signal().attr.items())
                    new_raster.create_signal(signal.name, new_y, attrs)
                    progress_counter += 1


def resample_file(parameter_root, in_adaffile, out_adaffile, progress):
    dt = parameter_root['dt'].value
    use_dt = parameter_root['use_dt'].value
    new_tb = parameter_root['new_tb'].selected
    resample_all = parameter_root['resample_all_rasters'].value
    chosen_signals = parameter_root['ts'].value_names

    if resample_all:
        signals = in_adaffile.ts.keys()
    else:
        signals = chosen_signals

    # Create output system/raster
    if use_dt:
        system_name = 'Resampled system'
        raster_name = 'Resampled raster'
        new_timebasis, unit = get_new_timebasis(in_adaffile, dt, signals)
    else:
        raster_dict = get_raster_dict(in_adaffile)
        system_name, raster_name = raster_dict[new_tb]
        old_tb_column = (in_adaffile.sys[system_name][raster_name]
                         .basis_column())
        new_timebasis = old_tb_column.value()
        unit = old_tb_column.attr['unit']
    new_system = out_adaffile.sys.create(system_name)
    new_raster = new_system.create(raster_name)

    attributes = {}
    if unit:
        attributes['unit'] = unit
    if use_dt:
        attributes['time step'] = dt

    new_raster.create_basis(new_timebasis, attributes=attributes)
    if parameter_root['only_timebasis'].value:
        return

    selected_methods = (parameter_root['bool_interp_method'].selected,
                        parameter_root['int_interp_method'].selected,
                        parameter_root['interpolation_method'].selected)

    resampler = RasterResampler()
    # Loop over all rasters and resample selected signals in them
    progress_counter = 0
    for origin_system_name in in_adaffile.sys.keys():
        origin_system = in_adaffile.sys[origin_system_name]
        for origin_raster_name in origin_system.keys():
            origin_raster = origin_system[origin_raster_name]
            signals_in_this_raster = [
                signal_name for signal_name in origin_raster.keys()
                if signal_name in signals]
            if not signals_in_this_raster:
                continue

            origin_basis = origin_raster.basis_column().value()
            new_basis = new_raster.basis_column().value()

            resampler.new_raster(origin_basis, new_basis)

            # Loop over all signals and resample them
            for signal_name in signals_in_this_raster:
                progress(100. * progress_counter / len(signals))
                signal = origin_raster[signal_name]
                y = signal.y
                interp = resampler.get_method(y, selected_methods)
                new_y = interp(y)
                attrs = dict(signal.signal().attr.items())
                new_raster.create_signal(signal.name, new_y, attrs)
                progress_counter += 1


def get_new_timebasis(in_adaffile, dt, signals):
    """
    Get new timebasis covering the same range as all the old timebases using
    step size dt.
    """
    if dt <= 0:
        raise SyConfigurationError('Time step must be positive.')

    t_range = None
    basis_kind = None
    basis_units = set()

    # The range of the new time basis should be the superset of all the
    # times in the resampled rasters.
    # TODO: What about reference times here?
    # TODO: I removed the warning when some signals are missing. Does that
    # matter?
    for system_name in in_adaffile.sys.keys():
        system = in_adaffile.sys[system_name]
        for raster_name in system.keys():
            raster = system[raster_name]
            if not set(raster.keys()) & set(signals):
                # No selected signals in this raster
                continue

            old_basis = raster.basis_column()
            basis = old_basis.value()
            unit = old_basis.attr.get('unit', '')

            if basis_kind:
                if basis_kind != basis.dtype.kind and not (
                        basis_kind in 'fiu' and basis.dtype.kind in 'fiu'):
                    raise SyDataError(
                        "All time bases must be of the same type. "
                        "Found both {} and {}.".format(
                            dtypes.typename_from_kind(basis_kind),
                            dtypes.typename_from_kind(basis.dtype.kind)))
            else:
                basis_kind = basis.dtype.kind

            # 'unknown' was the previous default unit so we can treat that as
            # empty.
            if unit and unit != 'unknown':
                basis_units.add(unit)

            if len(basis):
                if t_range is None:
                    t_range = basis[0], basis[-1]
                else:
                    t_range = (min(t_range[0], basis[0]),
                               max(t_range[1], basis[-1]))

    basis_unit = ''
    if len(basis_units) == 1:
        basis_unit = basis_units.pop()
    elif len(basis_units) > 1:
        sywarn("The time bases have different units: {}".format(
            ", ".join(basis_units)))
    if t_range is None:
        # None of the selected signals were present, return empty target basis
        return np.array([]), ''
    if basis_kind == 'M':
        t_range = t_range[0].astype(float), t_range[1].astype(float)
        dt *= 1000000

    # If number of samples fit perfectly, use linspace to always catch both end
    # points, at the expense of sometimes changing dt very slightly.
    sample_count = np.around((t_range[1] - t_range[0]) / dt, 8)
    if sample_count == int(sample_count):
        timebasis_new = np.linspace(t_range[0], t_range[1], sample_count + 1)
    else:
        timebasis_new = np.arange(t_range[0], t_range[1], dt)
    if basis_kind == 'M':
        timebasis_new = timebasis_new.astype('M8[us]')

    return timebasis_new, basis_unit


def get_raster_dict(adaffile):
    if adaffile.is_valid():
        return collections.OrderedDict(
            [('/'.join([system_name, raster_name]), (system_name, raster_name))
             for system_name, system in adaffile.sys.items()
             for raster_name in system.keys()])
    else:
        return {}


def timebasis_only_parameter(parameters):
    parameters.set_boolean(
        'only_timebasis', label='Export time basis only', value=False,
        description='Choose to only export the time basis')
    return parameters


class SuperNode(synode.Node):
    author = 'Helena Olen'
    version = '2.0'
    icon = 'interpolate.svg'
    tags = Tags(Tag.Analysis.SignalProcessing)

    parameters = synode.parameters()
    parameters.set_boolean(
        'resample_all_rasters', value=True, label="Resample all signals",
        description='Apply resampling to all signals')
    ts_editor = synode.Util.multilist_editor(edit=True)
    ts_editor.set_attribute('filter', True)
    parameters.set_list(
        'ts', label="Choose signals",
        description='Choose signals to interpolate', editor=ts_editor)
    parameters.set_float(
        'dt', label='Time step',
        description=('Time step in new timebasis. If old timebasis is of '
                     'type datetime this is considered to be in seconds.'))
    parameters.set_list(
        'new_tb', label='Timebasis to use for interpolation',
        description=('Timebasis to use as new timebasis '
                     'for selected timeseries'),
        editor=synode.Util.combo_editor(filter=True))
    parameters.set_boolean(
        'use_dt', label='Time step approach', value=True,
        description='Choose between a custom time step and using an existing.')
    parameters = timebasis_only_parameter(parameters)
    parameters.set_list(
        'bool_interp_method', plist=['zero', 'nearest'], value=[1],
        description=('Method used to interpolate boolean, text, and '
                     'byte string data'),
        editor=synode.Util.combo_editor())
    parameters.set_list(
        'int_interp_method', plist=_METHODS, value=[_METHODS.index('nearest')],
        description='Method used to interpolate integer data',
        editor=synode.Util.combo_editor())
    parameters.set_list(
        'interpolation_method', plist=_METHODS,
        value=[_METHODS.index('linear')],
        description='Method used to interpolate other data types',
        editor=synode.Util.combo_editor())


[docs]class InterpolateADAF(SuperNode): """ Interpolation of timeseries in an ADAF. :Ref. nodes: :ref:`Interpolate ADAFs` """ name = 'Interpolate ADAF' description = 'Interpolation of data' nodeid = 'org.sysess.sympathy.data.adaf.interpolateadaf' inputs = Ports([Port.ADAF('Input ADAF', name='port1')]) outputs = Ports([Port.ADAF('Interpolated ADAF', name='port1')]) def adjust_parameters(self, node_context): adjust(node_context.parameters['new_tb'], node_context.input['port1'], kind='rasters') adjust(node_context.parameters['ts'], node_context.input['port1'], kind='ts') def exec_parameter_view(self, node_context): parameter_root = node_context.parameters return InterpolationWidget(parameter_root) def execute(self, node_context): in_adaffile = node_context.input['port1'] out_adaffile = node_context.output['port1'] parameter_root = node_context.parameters if in_adaffile.is_empty(): return out_adaffile.meta.from_table(in_adaffile.meta.to_table()) out_adaffile.res.from_table(in_adaffile.res.to_table()) resample_file(parameter_root, in_adaffile, out_adaffile, progress=self.set_progress)
[docs]@node_helper.list_node_decorator(['port1'], ['port1']) class InterpolateADAFs(InterpolateADAF): name = 'Interpolate ADAFs' nodeid = 'org.sysess.sympathy.data.adaf.interpolateadafs'
[docs]class InterpolateADAFsFromTable(synode.Node): """ Interpolation of timeseries in ADAFs using a specification table. The specification table should have two to three columns. It must have a column with the names of the signals that should be interpolated. Furthermore it should have either a column with resampling rate for each signal or a column with the names of the signals to whose time basis it should interpolate each signal. It can also have both columns and if both of them have values for the same row it will use the resample rate. :Ref. nodes: :ref:`Interpolate ADAF` """ name = 'Interpolate ADAFs with Table' description = 'Interpolation of data' nodeid = 'org.sysess.sympathy.data.adaf.interpolateadafswithtable' version = '1.0' author = 'Magnus Sandén' icon = 'interpolate.svg' tags = Tags(Tag.Analysis.SignalProcessing) inputs = Ports([Port.Table('Specification Table', name='spec'), Port.ADAFs('Input ADAFs', name='port1')]) outputs = Ports([Port.ADAFs('Interpolated ADAFs', name='port1')]) parameters = synode.parameters() parameters.set_list( 'signals_colname', label='Column with signal names', description='Resample the timeseries in this column.', editor=synode.editors.combo_editor(edit=True, filter=True)) parameters.set_list( 'dt_colname', label='Column with sample rates', description=('The selected column should contain sample rates to ' 'which the selected signals will be resampled. ' 'At least one of this parameter and the time bases ' 'parameter must be specified.'), editor=synode.editors.combo_editor( include_empty=True, edit=True, filter=True)) parameters.set_list( 'tb_colname', label='Column with time bases', description=('The selected column should contain existsing time bases ' 'to which the selected signals will be resampled. ' 'At least one of this parameter and the time bases ' 'parameter must be specified.'), editor=synode.editors.combo_editor( include_empty=True, edit=True, filter=True)) parameters = timebasis_only_parameter(parameters) parameters.set_list( 'bool_interp_method', plist=['zero', 'nearest'], value=[1], description=('Method used to interpolate boolean, text, and ' 'byte string data'), editor=synode.Util.combo_editor()) parameters.set_list( 'int_interp_method', plist=_METHODS, value=[_METHODS.index('nearest')], description='Method used to interpolate integer data', editor=synode.Util.combo_editor()) parameters.set_list( 'interpolation_method', plist=_METHODS, value=[_METHODS.index('linear')], description='Method used to interpolate other data types', editor=synode.Util.combo_editor()) def update_parameters(self, parameters): # Older versions (before sympathy 1.4.4) specified the empty element # explicitly in adjust_parameters. In 1.4.4 the empty element went # missing. From 1.4.5 we add it with 'include_empty' instead. parameters['dt_colname'].editor['include_empty'] = True parameters['tb_colname'].editor['include_empty'] = True def adjust_parameters(self, node_context): pnames = ['signals_colname', 'dt_colname', 'tb_colname'] for pname in pnames: adjust(node_context.parameters[pname], node_context.input['spec']) def exec_parameter_view(self, node_context): parameter_root = node_context.parameters tab_widget = QtWidgets.QTabWidget() methods_tab = MethodsTab(parameter_root) choose_spec_cols_tab = ChooseSpecColsTab(parameter_root) tab_widget.addTab(choose_spec_cols_tab, 'Specification table') tab_widget.addTab(methods_tab, 'Interpolation method') return tab_widget def execute(self, node_context): input_adafs = node_context.input['port1'] spec = node_context.input['spec'] if spec.is_empty(): return output_adafs = node_context.output['port1'] parameter_root = node_context.parameters def interpolate_adafs_w_table(input_adaf, output_adaf, set_progress): output_adaf.meta.from_table(input_adaf.meta.to_table()) output_adaf.res.from_table(input_adaf.res.to_table()) resample_file_with_spec(parameter_root, spec, input_adaf, output_adaf, progress=set_progress) synode.map_list_node(interpolate_adafs_w_table, input_adafs, output_adafs, self.set_progress)
class ChooseSpecColsTab(QtWidgets.QWidget): def __init__(self, parameter_root): super(ChooseSpecColsTab, self).__init__() layout = QtWidgets.QVBoxLayout() layout.addWidget(parameter_root['signals_colname'].gui()) layout.addWidget(parameter_root['dt_colname'].gui()) layout.addWidget(parameter_root['tb_colname'].gui()) layout.addStretch(0) self.setLayout(layout) class MethodsTab(QtWidgets.QWidget): def __init__(self, parameter_root): super(MethodsTab, self).__init__() layout = QtWidgets.QVBoxLayout() layout.addWidget(QtWidgets.QLabel( "Choose interpolation methods for different data types.")) self._form_layout = QtWidgets.QFormLayout() categories = ['Boolean, text, and byte string', 'Integer', 'Float, datetime, and timedelta'] parameters = ['bool_interp_method', 'int_interp_method', 'interpolation_method'] for category, parameter in zip(categories, parameters): widget = parameter_root[parameter].gui() self._form_layout.addRow(category, widget) layout.addLayout(self._form_layout) self._only_timebasis = parameter_root['only_timebasis'].gui() layout.addWidget(self._only_timebasis) layout.addStretch(0) self.setLayout(layout) self._only_timebasis.stateChanged.connect( self._only_timebasis_changed) self._only_timebasis_changed() def _only_timebasis_changed(self): disabled = ( self._only_timebasis.editor().checkState() != QtCore.Qt.Checked) for i in range(self._form_layout.count()): self._form_layout.itemAt(i).widget().setEnabled(disabled) class InterpolationWidget(QtWidgets.QWidget): """A widget containing a TimeBasisWidget and a ListSelectorWidget.""" def __init__(self, parameter_root, parent=None): super(InterpolationWidget, self).__init__() self._parameter_root = parameter_root self._init_gui() def _init_gui(self): # Create widgets and add to layout tab_widget = QtWidgets.QTabWidget() tb_tab = self._init_tb_tab() tab_widget.addTab(tb_tab, 'New time basis') methods_tab = MethodsTab(self._parameter_root) tab_widget.addTab(methods_tab, 'Interpolation method') signals_tab = self._init_signals_tab() tab_widget.addTab(signals_tab, 'Signals') layout = QtWidgets.QVBoxLayout() layout.addWidget(tab_widget) self.setLayout(layout) self._init_gui_from_parameters() def _init_tb_tab(self): # Create radio button group self._dt_or_tb = QtWidgets.QButtonGroup() self._dt_or_tb.setExclusive(True) self._custom_dt_button = QtWidgets.QRadioButton('Use custom timestep') self._use_tb_button = QtWidgets.QRadioButton( 'Interpolate using existing timebasis') # Add buttons to group self._dt_or_tb.addButton(self._custom_dt_button) self._dt_or_tb.addButton(self._use_tb_button) self._new_tb = self._parameter_root['new_tb'].gui() self._dt = self._parameter_root['dt'].gui() tb_ts_vlayout = QtWidgets.QVBoxLayout() tb_ts_vlayout.addWidget(self._custom_dt_button) tb_ts_vlayout.addWidget(self._dt) tb_ts_vlayout.addWidget(self._use_tb_button) tb_ts_vlayout.addWidget(self._new_tb) tb_ts_vlayout.addStretch(0) tb_tab = QtWidgets.QWidget() tb_tab.setLayout(tb_ts_vlayout) self._dt_or_tb.buttonClicked.connect(self._button_changed) return tb_tab def _init_signals_tab(self): signals_tab = QtWidgets.QWidget() self._ts_selection = self._parameter_root['ts'].gui() self._resample_all_signals = ( self._parameter_root['resample_all_rasters'].gui()) signals_vlayout = QtWidgets.QVBoxLayout() signals_vlayout.addWidget(self._resample_all_signals) signals_vlayout.addWidget(self._ts_selection) self._resample_all_signals.editor().toggled.connect( self._ts_selection.set_disabled) self._ts_selection.set_disabled( self._parameter_root['resample_all_rasters'].value) signals_tab.setLayout(signals_vlayout) return signals_tab def _init_gui_from_parameters(self): dt_checked = self._parameter_root['use_dt'].value self._custom_dt_button.setChecked(dt_checked) self._use_tb_button.setChecked(not dt_checked) self._use_tb_button.setChecked(not dt_checked) self._dt.setEnabled(dt_checked) self._new_tb.setEnabled(not dt_checked) def _button_changed(self, button): """ Radiobuttton clicked. Enable/disable custom coefficient edits or predefined filter widgets depedning on which button that is pressed. """ if button == self._custom_dt_button: self._dt.setEnabled(True) self._new_tb.setEnabled(False) self._parameter_root['use_dt'].value = True else: self._dt.setEnabled(False) self._new_tb.setEnabled(True) self._parameter_root['use_dt'].value = False