Source code for node_interpolation_old

# Copyright (c) 2013, 2017, System Engineering Software Society
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the System Engineering Software Society nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.
# IN NO EVENT SHALL SYSTEM ENGINEERING SOFTWARE SOCIETY BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
.. deprecated:: 1.2.5
    Use :ref:`Interpolate ADAF` or :ref:`Interpolate ADAFs` instead.

Interpolate timeseries by the chosen interpolation method and calculate the
new timeseries based on a new timebasis. The new timebasis can either be an
existing timebasis in the adaf-file or a timebasis with a timestep defined
by the user. The timeseries that will be interpolated are selected in a list.
The output file will contain the unmodified timeseries, and the modfied ones.
The modified timeseries will be moved to a new timebasis if a timestep is
used and to the existing timebasis if that alternative is chosen.
"""
import six
from six.moves import zip as izip
import numpy as np
from scipy.interpolate import interp1d, UnivariateSpline

from sympathy.api import node as synode
from sympathy.api.nodeconfig import Port, Ports, deprecated_node
from sympathy.api.exceptions import SyConfigurationError
from sympathy.api import qt as qt_compat
from sympathy.api import table
QtCore = qt_compat.QtCore
QtGui = qt_compat.import_module('QtGui')


def resample_raster(parameter_root, raster_dict, adaffile):
    """Resample raster."""
    use_dt = parameter_root['use_dt'].value
    interpolation_method = parameter_root['interpolation_method'].selected

    try:
        # Avoid breaking older nodes without this option.
        resample_all = parameter_root['resample_all_rasters'].value
    except KeyError:
        resample_all = False

    if resample_all:
        rasters = (raster for system in adaffile.sys.values()
                   for raster in system.values())
    else:
        try:
            rasters = [raster_dict[parameter_root['tb'].selected]]
        except KeyError:
            # If the raster is unavailable don't resample.
            rasters = []

    for origin_raster in rasters:
        origin_basis = origin_raster.basis_column().value()

        if not len(origin_basis):
            # Do not attempt to resample an empty raster. Creating NaN values
            # would probably not be useful and cannot be used in any type.
            return

        if use_dt:
            dt = parameter_root['dt'].value
            if not dt:
                raise SyConfigurationError(
                    'Time step must be set and non-zero.')
            if origin_basis.dtype.kind == 'M':
                dt *= np.timedelta64(1000000, 'us')

            target_basis = get_new_timebasis(dt, origin_basis)
        else:
            target_basis = raster_dict[
                parameter_root['new_tb'].selected].basis_column().value()

        data = []

        for name, series in origin_raster.items():
            data.append((name, series.y, dict(series.signal().attr.items())))

        basis_attrs = dict(origin_raster.basis_column().attr.items())
        origin_raster.from_table(table.File(), basis_name=None)

        for name, y, attrs in data:

            column = resample_signal(
                origin_basis,
                target_basis,
                y,
                interpolation_method)
            origin_raster.create_signal(
                name, column, attrs)

        origin_raster.create_basis(target_basis, basis_attrs)


def get_new_timebasis(dt, tb):
    """
    Get new timebasis covering the same range as old timebasis using
    step size dt.
    """
    t_start = tb[0]
    t_end = tb[-1]
    timebasis_new = np.arange(t_start, t_end, dt)
    return timebasis_new


def nearest_any(tb_old, ts_old):
    """
    Returns nearest neighbour function for tb_old and ts_old.
    The function works for any type of ts as long as tb can be ordered with
    < and >.
    """
    def inner(tb_new):
        in_iter = izip(ts_old, tb_old)
        curr_y, curr_t = six.next(in_iter)
        prev_y = curr_y
        prev_t = curr_t
        result = []
        try:
            for t in tb_new:
                while curr_t is not None and t > curr_t:
                    # Move the position.
                    prev_y = curr_y
                    prev_t = curr_t
                    curr_y, curr_t = six.next(in_iter)
                if curr_t is None:
                    result.append(prev_y)
                if prev_t is None:
                    result.append(curr_y)

                if curr_t - t < t - prev_t:
                    result.append(curr_y)
                else:
                    result.append(prev_y)
        except StopIteration:
            if len(tb_new) > len(result):
                result.extend((len(tb_new) - len(result)) * [curr_y])

        return np.array(result, dtype=ts_old.dtype)
    return inner


def get_interpolated_function(tb, ts, interpolation_method):
    """Get interplated function from timbase and timeserie."""
    def datetime_wrapper(interp):
        def inner(tb_new):
            tb = (tb_new - tb_new[0]) / timeunit
            return interp(tb)
        return inner

    if tb.dtype.kind == 'M':
        is_datetime = True
    else:
        is_datetime = False
    timeunit = np.timedelta64(1, 'us')

    try:
        if is_datetime:
            tb = (tb - tb[0]) / timeunit
        if ts.dtype.kind == 'f':
            if interpolation_method == 'cubic':
                f_i = UnivariateSpline(tb, ts, k=3)
            elif interpolation_method == 'quadratic':
                f_i = UnivariateSpline(tb, ts, k=4)
            else:
                f_i = interp1d(
                    tb, ts, kind=interpolation_method, bounds_error=False)
        else:
            f_i = interp1d(
                tb, ts, kind='nearest', bounds_error=False)
    except ValueError:
        return nearest_any(tb, ts)
    if is_datetime:
        return datetime_wrapper(f_i)
    else:
        return f_i


def resample_signal(tb_old, tb_new, ts, interpolation_method):  # dtype, ???!!
    """Resample signal to new timebasis t_new if singal more than 1 point."""
    if len(ts) > 1 and len(tb_old) > 1:
        f_i = get_interpolated_function(tb_old, ts, interpolation_method)
        ts_new = f_i(tb_new)
    else:
        ts_new = ts
    return ts_new


def check_consistence(parameter_root, raster_dict):
    """Check if items in widgets are consistent with input file."""
    if sorted(parameter_root['tb'].list) == sorted(raster_dict.keys()):
        return True
    else:
        return False


def reinit_interpolation(parameter_root):
    """Reinitialize node when infile has changed."""
    parameter_root['tb'].list = []
    parameter_root['tb'].value = [0]
    parameter_root['tb'].value_names = []
    parameter_root['ts'].list = []
    parameter_root['ts'].value = [0]
    parameter_root['ts'].value_names = []
    parameter_root['new_tb'].list = []
    parameter_root['new_tb'].value = [0]
    parameter_root['new_tb'].value_names = []
    parameter_root['interpolation_method'].value = [0]
    parameter_root['dt'].value = 0
    parameter_root['use_dt'].value = True


def check_and_reinit_node(parameter_root, raster_dict):
    """
    Check if node_context consistent with info from input file and
    reinitialize if not.
    """
    if not check_consistence(parameter_root, raster_dict):
        reinit_interpolation(parameter_root)


def get_interpolations():
    """Get list of available interpolation methods."""
    interpolation_methods = ['linear', 'nearest', 'zero',
                             'slinear', 'quadratic', 'cubic']
    return interpolation_methods


def get_raster_dict(adaffile):
    if adaffile.is_valid():
        return dict([('/'.join([system_name, raster_name]), raster)
                     for system_name, system in adaffile.sys.items()
                     for raster_name, raster in system.items()])
    else:
        return dict()


class SuperNode(object):
    author = 'Helena Olen <helena.olen@combine.se'
    copyright = '(C) 2013 System Engineering Software Society'
    version = '1.0'
    icon = 'interpolate.svg'

    def update_parameters(self, old_params):
        # From the beginning there was no parameter called
        # resample_all_rasters.
        if 'resample_all_rasters' not in old_params:
            # At that point the node behaved as though the parameter was False.
            old_params.set_boolean(
                'resample_all_rasters', value=False,
                label="Resample all rasters",
                description='Apply resampling to all rasters')
        else:
            # Then there was a parameter, but it didn't have label/description.
            if not old_params['resample_all_rasters'].description:
                old_params['resample_all_rasters'].description = (
                    'Apply resampling to all rasters')
            if not old_params['resample_all_rasters'].label:
                old_params[
                    'resample_all_rasters'].label = 'Resample all rasters'


def base_parameters():
    parameter_root = synode.parameters()
    ts_editor = synode.Util.selectionlist_editor('multi')
    ts_editor.set_attribute('filter', True)
    parameter_root.set_list('tb',
                            label="Time basis column",
                            value=[0],
                            editor=synode.Util.list_editor().value())
    parameter_root.set_list('ts',
                            label="Time series columns in preview",
                            value=[0],
                            editor=ts_editor.value())
    parameter_root.set_list('interpolation_method', plist=get_interpolations(),
                            label='Interpolation method',
                            value=[0],
                            description='Function used to detrend data',
                            editor=synode.Util.combo_editor().value())
    parameter_root.set_float('dt', label='Time step',
                             description='Time step in new timebasis. If old '
                                         'timebasis is of type datetime this '
                                         'is considered to be in seconds.')
    parameter_root.set_list('new_tb', value=[0],
                            label='Timebasis to use for interpolation',
                            description=('Timebasis to use as new timebasis'
                            'for selected timeseries'),
                            editor=synode.Util.combo_editor().value())
    parameter_root.set_boolean('use_dt', value=True)
    parameter_root.set_boolean(
        'resample_all_rasters', value=True,
        label="Resample all rasters",
        description='Apply resampling to all rasters')
    return parameter_root


[docs]@deprecated_node('1.5.0', 'Interpolate ADAF') class InterpolationNodeOld(SuperNode, synode.Node): """ .. deprecated:: 1.2.5 Use :ref:`Interpolate ADAF` instead. Interpolation of timeseries in an ADAF. :Inputs: **port1** : ADAF ADAF with data. :Outputs: **port1** : ADAF ADAF with interpolated data. :Configuration: **Use custom timestep** Specify the custom step length for basis in a new raster. **Interpolate using existing timebasis** Select basis in another raster as new basis for selected columns. **Interpolation method** Select interpolation method. **Time basis column** Select raster to choose time series columns from. **Time series columns** Select one or many time series columns to interpolate to the new basis. :Ref. nodes: :ref:`Interpolate ADAF` """ name = 'Interpolate ADAF (deprecated)' description = 'Interpolation of data' nodeid = 'org.sysess.sympathy.data.adaf.interpolationnode' inputs = Ports([Port.ADAF('ADAFInput', name='port1')]) outputs = Ports([Port.ADAF('ADAFOutput', name='port1')]) parameters = base_parameters() def exec_parameter_view(self, node_context): adaffile = node_context.input['port1'] parameter_root = node_context.parameters raster_dict = get_raster_dict(adaffile) check_and_reinit_node(parameter_root, raster_dict) return InterpolationWidget(parameter_root, raster_dict) def execute(self, node_context): in_adaffile = node_context.input['port1'] out_adaffile = node_context.output['port1'] parameter_root = node_context.parameters raster_dict = get_raster_dict(in_adaffile) check_and_reinit_node(parameter_root, raster_dict) resample_raster(parameter_root, raster_dict, in_adaffile) out_adaffile.source(in_adaffile)
[docs]@deprecated_node('1.5.0', 'Interpolate ADAFs') class InterpolationNodeADAFsOld(SuperNode, synode.Node): """ .. deprecated:: 1.2.5 Use :ref:`Interpolate ADAFs` instead. Interpolation of timeseries in ADAFs. :Inputs: **port1** : ADAF ADAF with data. :Outputs: **port1** : ADAF ADAF with interpolated data. :Configuration: **Use custom timestep** Specify the custom step length for basis in a new raster. **Interpolate using existing timebasis** Select basis in another raster as new basis for selected columns. **Interpolation method** Select interpolation method. **Time basis column** Select raster to choose time series columns from. **Time series columns** Select one or many time series columns to interpolate to the new basis. :Ref. nodes: :ref:`Interpolate ADAFs` """ name = 'Interpolate ADAFs (deprecated)' description = 'Interpolation of data' nodeid = 'org.sysess.sympathy.data.adaf.interpolationnodeADAFs' inputs = Ports([Port.ADAFs('ADAFInput', name='port1')]) outputs = Ports([Port.ADAFs('ADAFOutput', name='port1')]) parameters = base_parameters() def exec_parameter_view(self, node_context): input_list = node_context.input['port1'] parameter_root = node_context.parameters if input_list.is_valid() and len(input_list): # Present GUI based on the first element. first = six.next(iter(input_list)) raster_dict = get_raster_dict(first) else: raster_dict = {} check_and_reinit_node(parameter_root, raster_dict) return InterpolationWidget(parameter_root, raster_dict) def execute(self, node_context): input_list = node_context.input['port1'] output_list = node_context.output['port1'] parameter_root = node_context.parameters for in_adaffile in input_list: raster_dict = get_raster_dict(in_adaffile) resample_raster(parameter_root, raster_dict, in_adaffile) output_list.append(in_adaffile)
class InterpolationWidget(QtGui.QWidget): """A widget containing a TimeBasisWidget and a ListSelectorWidget.""" def __init__( self, parameter_root, raster_dict, parent=None): super(InterpolationWidget, self).__init__() self._parameter_root = parameter_root self._raster_dict = raster_dict self._figure = None self._axes = None self._canvas = None self._toolbar = None self._init_gui() def _init_gui(self): self._pre_init_gui_from_parameters() # Create widgets and add to layout self._tb_selection = self._parameter_root['tb'].gui() self._interpolation_method = (self._parameter_root[ 'interpolation_method'].gui()) # Create radio button group self._dt_or_tb = QtGui.QButtonGroup() self._dt_or_tb.setExclusive(True) self._custom_dt_button = QtGui.QRadioButton( 'Use custom timestep') self._use_tb_button = QtGui.QRadioButton( 'Interpolate using existing timebasis') # Add buttons to group self._dt_or_tb.addButton(self._custom_dt_button) self._dt_or_tb.addButton(self._use_tb_button) self._new_tb = self._parameter_root['new_tb'].gui() self._dt = self._parameter_root['dt'].gui() tb_ts_vlayout = QtGui.QVBoxLayout() tb_ts_vlayout.addWidget(self._custom_dt_button) tb_ts_vlayout.addWidget(self._dt) tb_ts_vlayout.addWidget(self._use_tb_button) tb_ts_vlayout.addWidget(self._new_tb) tb_ts_vlayout.addWidget(self._interpolation_method) tb_ts_vlayout.addWidget(self._tb_selection) if 'resample_all_rasters' in self._parameter_root: # Avoid breaking older nodes without this option. resample_all_gui = ( self._parameter_root['resample_all_rasters'].gui()) tb_ts_vlayout.addWidget(resample_all_gui) resample_all_gui.editor().toggled.connect( self._tb_selection.set_disabled) if self._parameter_root['resample_all_rasters'].value: self._tb_selection.set_disabled(True) hlayout = QtGui.QHBoxLayout() hlayout.addLayout(tb_ts_vlayout) layout = QtGui.QVBoxLayout() layout.addLayout(hlayout) self.setLayout(layout) self._init_gui_from_parameters() self._dt_or_tb.buttonClicked.connect(self._button_changed) def _pre_init_gui_from_parameters(self): """Pre-initialize GUI from parameters.""" if self._parameter_root['tb'].list == []: self._parameter_root['tb'].list = list(self._raster_dict.keys()) if self._parameter_root['tb'].value == []: self._parameter_root['tb'].value = [0] if self._parameter_root['new_tb'].list == []: self._parameter_root['new_tb'].list = list( self._raster_dict.keys()) self._parameter_root['new_tb'].value = [0] def _init_gui_from_parameters(self): dt_checked = self._parameter_root['use_dt'].value self._custom_dt_button.setChecked(dt_checked) self._use_tb_button.setChecked(not dt_checked) self._dt.setEnabled(dt_checked) self._new_tb.setEnabled(not dt_checked) def _button_changed(self, button): """ Radiobuttton clicked. Enable/disable custom coefficient edits or predefined filter widgets depedning on which button that is pressed. """ if button == self._custom_dt_button: self._dt.setEnabled(True) self._new_tb.setEnabled(False) self._parameter_root['use_dt'].value = True else: self._dt.setEnabled(False) self._new_tb.setEnabled(True) self._parameter_root['use_dt'].value = False