Source code for node_filter_adafs

# Copyright (c) 2013, 2017, System Engineering Software Society
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the System Engineering Software Society nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.
# IN NO EVENT SHALL SYSTEM ENGINEERING SOFTWARE SOCIETY BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
"""
from __future__ import (print_function, division, unicode_literals,
                        absolute_import)

import collections
import ast
import sys
import warnings

import six

import numpy as np
import scipy.signal as signal

from sympathy.api import table, ParameterView
from sympathy.api import qt as qt_compat
from sympathy.api.exceptions import sywarn

from matplotlib.figure import Figure
from matplotlib.backends.backend_qt4agg \
    import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt4agg \
    import NavigationToolbar2QT as NavigationToolbar

from sympathy.api import node as synode
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, deprecated_node
from sympathy.utils import prim

QtCore = qt_compat.QtCore
QtGui = qt_compat.QtGui
qt_compat.backend.use_matplotlib_qt()


class CapturePrint(list):
    """Context manager for capturing print output."""

    def __enter__(self):
        self._stdout = sys.stdout
        sys.stdout = self._stringio = six.StringIO()
        return self

    def __exit__(self, *args):
        self.extend(self._stringio.getvalue().splitlines())
        sys.stdout = self._stdout


def write_group(in_group, out_group):
    """Write meta from input file to output ADAF file."""
    def attributes(table):
        return {column: table.get_column_attributes(column)
                for column in table.column_names()}

    attrs_meta = prim.flip(attributes(in_group.to_table()))
    data_meta = in_group.to_table()
    for name in in_group.keys():
        try:
            attrs = attrs_meta[name]
        except KeyError:
            attrs = {}
        out_group.create_column(
            name, data_meta.get_column_to_array(name), attrs)


def write_res(in_adaffile, out_adaffile):
    """Write results from input file to output ADAF file."""
    write_group(in_adaffile.res, out_adaffile.res)


def write_meta(in_adaffile, out_adaffile):
    """Write meta from input file to output ADAF file."""
    write_group(in_adaffile.meta, out_adaffile.meta)


def filter_signals(in_adaffile, out_adaffile, parameters):
    """
    Filter all timeseries in in_adaffile and write to output
    ADAF file with old timebasis, meta and result.
    """
    write_res(in_adaffile, out_adaffile)
    write_meta(in_adaffile, out_adaffile)
    # Generate global filter design
    b, a = generate_filter(parameters)
    for system_name, in_system in in_adaffile.sys.items():
        out_system = out_adaffile.sys.create(system_name)
        for raster_name, in_raster in in_system.items():
            out_raster = out_system.create(raster_name)
            # Making use of the table API to build the output raster.
            # While at the same time taking care to propagate attributes.
            in_raster_table = in_raster.to_table()
            out_raster_table = table.File()
            for column_name in in_raster.keys():
                column_data = in_raster_table.get_column_to_array(column_name)
                attributes = in_raster_table.get_column_attributes(column_name)
                try:
                    column = filter_signal(parameters, b, a, column_data)
                    attributes['Filtering'] = six.text_type(
                        create_filter_parameter_attributes(parameters)
                    )
                except ValueError as e:
                    sywarn('A ValueError occurred during signal filtering. '
                           'The column "{}" is returned unfiltered!\n'
                           'Error message: {}'.format(column_name, e))
                    column = column_data
                    attributes['Filtering'] = 'Unfiltered due to Error'
                out_raster_table.set_column_from_array(column_name, column)
                out_raster_table.set_column_attributes(column_name, attributes)
            in_basis = in_raster.basis_column()
            out_raster.from_table(out_raster_table)
            out_raster.create_basis(
                in_basis.value(), dict(in_basis.attr.items()))


def generate_filter(parameters):
    """Generate filter."""
    filter_type = parameters['filter_type'].selected
    if filter_type == 'IIR':
        b, a = iir_filter_design(parameters)
    else:
        b, a = fir_filter_design(parameters)
    return b, a


def create_filter_parameter_attributes(parameters):
    """Generate a filter parameter attribute representation."""
    filter_type = parameters['filter_type'].selected
    filter_dict = collections.OrderedDict()
    filter_dict['Filter Type'] = filter_type
    if filter_type == 'IIR':
        values = get_iir_filter_parameters(parameters)
        for var, value in zip(['iir_wp', 'iir_ws', 'iir_gpass', 'iir_gstop',
                               'iir_filters'], values):
            label = parameters[var].label
            filter_dict[label] = value
    else:
        fir_dict = get_fir_window_dict()
        fir_window = parameters['fir_windows'].selected
        window = fir_dict[fir_window]['name']
        params = fir_dict[fir_window]['param']
        filter_dict['Filter window'] = window
        filter_dict['Filter length'] = parameters['fir_len'].value
        filter_dict['Cutoff frequency'] = parameters['fir_cutoff'].value
        if len(params) >= 1:
            label = parameters['fir_w1'].label
            filter_dict[label] = parameters['fir_w1'].value
        if len(params) == 2:
            label = parameters['fir_w2'].label
            filter_dict[label] = parameters['fir_w2'].value
        filter_dict['Frequency pass type'] = parameters['freq_type'].selected
    filter_dict['Filtering Type'] = parameters['filtering'].selected
    return '; '.join(['{}: {}'.format(k, v) for k, v in six.iteritems(
        filter_dict)])


def filter_signal(parameters, b, a, ts):
    """Filter timeserie ts."""
    filtering_dict = get_filtering_dict()
    return filtering_dict[parameters['filtering'].selected](b, a, ts)


def get_iir_filter_parameters(parameter_root):
    iir_dict = get_iir_filter_dict()
    wp_str = parameter_root['iir_wp'].value
    wp = ast.literal_eval(wp_str)
    ws_str = parameter_root['iir_ws'].value
    ws = ast.literal_eval(ws_str)
    gpass_str = parameter_root['iir_gpass'].value
    gpass = float(gpass_str)
    gstop_str = parameter_root['iir_gstop'].value
    gstop = float(gstop_str)
    ftype = iir_dict[parameter_root['iir_filters'].selected]
    return wp, ws, gpass, gstop, ftype


def iir_filter_design(parameter_root):
    """Design and return parameters for iir filter."""
    wp, ws, gpass, gstop, ftype = get_iir_filter_parameters(parameter_root)
    b, a = signal.iirdesign(wp, ws, gpass, gstop, ftype=ftype)
    return b, a


def get_fir_filter_parameters(parameter_root):
    fir_dict = get_fir_window_dict()
    fir_window = parameter_root['fir_windows'].selected
    window = fir_dict[fir_window]['name']
    params = fir_dict[fir_window]['param']
    if len(params) == 0:
        window_tuple = (window, )
    elif len(params) == 1:
        arg1 = float(parameter_root['fir_w1'].value)
        window_tuple = (window, arg1)
    else:
        arg1 = float(parameter_root['fir_w1'].value)
        arg2 = float(parameter_root['fir_w2'].value)
        window_tuple = (window, arg1, arg2)
    m = int(parameter_root['fir_len'].value)
    cutoff = ast.literal_eval(parameter_root['fir_cutoff'].value)
    if parameter_root['freq_type'].selected in ['Highpass', 'Bandpass']:
        freq_type = False
    else:
        freq_type = True
    return m, cutoff, window_tuple, freq_type


def fir_filter_design(parameter_root):
    """Get FIR filter coefficients."""
    m, cutoff, window_tuple, freq_type = get_fir_filter_parameters(
        parameter_root)
    b = signal.firwin(m, cutoff, window=window_tuple, pass_zero=freq_type)
    return b, [1.0]


def get_filtering_dict():
    filtering = {'Forward': signal.lfilter,
                 'Forward-Backward': signal.filtfilt}
    return filtering


def get_fir_window_dict():
    fir = {'Bartlett-Hann': {'name': 'barthann',
                             'param': []},
           'Bartlett': {'name': 'bartlett', 'param': []},
           'Blackman': {'name': 'blackman', 'param': []},
           'Blackman-Harris': {'name': 'blackmanharris',
                               'param': []},
           'Bohman': {'name': 'bohman', 'param': []},
           'Boxcar': {'name': 'boxcar', 'param': []},
           'Dolph-Chebyshev': {'name': 'chebwin',
                               'param': ['Attenuation (dB)']},
           'Flat top': {'name': 'flattop', 'param': []},
           'Gaussian': {'name': 'gaussian',
                        'param': ['std']},
           'Generalized Gaussian': {'name': 'general_gaussian',
                                    'param': ['p', 'Sigma']},
           'Hamming': {'name': 'hamming', 'param': []},
           'Hann': {'name': 'hann', 'param': []},
           'Kaiser': {'name': 'kaiser', 'param': ['Beta']},
           'Nuttall': {'name': 'nuttall', 'param': []},
           'Parzen': {'name': 'parzen', 'param': []},
           'Slepian': {'name': 'slepian',
                       'param': ['width']},
           'Triangular': {'name': 'triang', 'param': []}}
    return fir


def get_iir_filter_dict():
    iir = {'Butterworth': 'butter', 'Chebyshev 1': 'cheby1',
           'Chebyshev 2': 'cheby2', 'Elliptic': 'ellip'}
    return iir


def map_adaf_to_signal_list(datafile):
    key_map = []
    if datafile.is_valid():
        for system_name, system in datafile.sys.items():
            for raster_name, raster in system.items():
                for signal_name in raster.keys():
                    key_map.append((system_name, raster_name, signal_name))
    return key_map


[docs]class FilterADAFsWithPlot(synode.Node): """ Filter ADAFs with a specified filter. Both IIR filters and FIR filters can be selected. The filter can be a forward or forward-backward filter. The resulting filter design and an example of filtered data can be inspected in real-time within the node's GUI. The FIR filter windows that can be used are: - Bartlett-Hann_ - Bartlett_ - Blackman_ - Blackman-Harris_ - Bohman_ - Boxcar_ - Dolph-Chebyshev_ - `Flat top`_ - Gaussian_ - `Generalized Gaussian`_ - Hamming_ - Hann_ - Kaiser_ - Nuttall_ - Parzen_ - Slepian_ - Triangular_ .. _Bartlett-Hann: http://en.wikipedia.org/wiki/Window_function#Bartlett.E2.80.93Hann_window .. _Bartlett: http://en.wikipedia.org/wiki/Window_function#Triangular_window .. _Blackman: http://en.wikipedia.org/wiki/Window_function#Blackman_windows .. _Blackman-Harris: http://en.wikipedia.org/wiki/Window_function#Blackman.E2.80.93Harris_window .. _Bohman: http://en.wikipedia.org/wiki/Window_function#Cosine_window .. _Boxcar: http://en.wikipedia.org/wiki/Window_function#Rectangular_window .. _Dolph-Chebyshev: http://en.wikipedia.org/wiki/Window_function#Dolph.E2.80.93Chebyshev_window .. _`Flat top`: http://en.wikipedia.org/wiki/Window_function#Flat_top_window .. _Gaussian: http://en.wikipedia.org/wiki/Window_function#Gaussian_window .. _`Generalized Gaussian`: http://en.wikipedia.org/wiki/Window_function#Gaussian_window .. _Hamming: http://en.wikipedia.org/wiki/Window_function#Hamming_window .. _Hann: http://en.wikipedia.org/wiki/Window_function#Hann_.28Hanning.29_window .. _Kaiser: http://en.wikipedia.org/wiki/Kaiser_window .. _Nuttall: http://en.wikipedia.org/wiki/Window_function#Nuttall_window.2C_continuous_first_derivative .. _Parzen: http://en.wikipedia.org/wiki/Window_function#Parzen_window .. _Slepian: http://en.wikipedia.org/wiki/Window_function#DPSS_or_Slepian_window .. _Triangular: http://en.wikipedia.org/wiki/Window_function#Triangular_window The IIR filter functions supported are: - Butterworth_ - `Chebyshev 1`_ - `Chebyshev 2`_ - Elliptic_ .. _Butterworth: http://en.wikipedia.org/wiki/Butterworth_filter .. _`Chebyshev 1`: http://en.wikipedia.org/wiki/Chebyshev_filter#Type_I_Chebyshev_filters .. _`Chebyshev 2`: http://en.wikipedia.org/wiki/Chebyshev_filter#Type_II_Chebyshev_filters .. _Elliptic: http://en.wikipedia.org/wiki/Elliptic_filter """ author = ('Helena Olen <helena.olen@combine.se>, ' 'Benedikt Ziegler <benedikt.ziegler@combine.se>') copyright = '(c) 2013, 2016 System Engineering Software Society' description = 'Filter ADAF data.' name = 'Filter ADAFs' nodeid = 'org.sysess.sympathy.data.adaf.filteradafswithplot' version = '1.1' icon = 'filter_adaf.svg' tags = Tags(Tag.Analysis.SignalProcessing) inputs = Ports([Port.ADAFs('Input ADAFs', name='port1')]) outputs = Ports([Port.ADAFs( 'Output ADAFs with filter applied', name='port1')]) parameters = synode.parameters() parameters.set_list( 'filter_type', plist=['IIR', 'FIR'], label='Filter type', value=[0], description='Combo of filter types', editor=synode.Util.combo_editor()) parameters.set_list( 'freq_type', plist=['Lowpass', 'Highpass', 'Bandpass', 'Bandstop'], value=[0], label='Frequency pass type', description='Frequency pass type required for the FIR filter.', editor=synode.Util.combo_editor()) # fir_page = parameters.create_page('fir_page', label='FIR') parameters.set_list( 'fir_windows', plist=sorted(get_fir_window_dict().keys()), value=[9], label='Filter windows', description='Filter windows for FIR filter', editor=synode.Util.combo_editor()) parameters.set_integer( 'fir_len', value=11, label='Filter length', description='Length of the filter', editor=synode.Util.lineedit_editor(placeholder='11')) parameters.set_string( 'fir_cutoff', label='Cutoff frequency', value='0.2', description="Cutoff frequency of filter (expressed in the same units " "as `nyq`) OR an array of cutoff frequencies (that is, " "band edges). In the latter case, the frequencies in " "`cutoff` should be positive and monotonically " "increasing between 0 and `nyq`. The values 0 and `nyq` " "must not be included in `cutoff`.", editor=synode.Util.lineedit_editor(placeholder='0.2, ..')) parameters.set_float( 'fir_w1', label='Beta', value=1.0, description='Filter specific parameter. Check the help.', editor=synode.Util.lineedit_editor(placeholder='1.0')) parameters.set_float( 'fir_w2', label='Sigma', value=1.0, description='Filter specific parameter. Check the help.', editor=synode.Util.lineedit_editor(placeholder='1.0')) # iir_page = parameters.create_page('iir_page', label='IIR') parameters.set_list( 'iir_filters', plist=sorted(get_iir_filter_dict().keys()), value=[0], label='Filter designs', description='IIR filters', editor=synode.Util.combo_editor()) parameters.set_string( 'iir_wp', label='Passband edge frequency', value='0.2', description='Passband edge frequency', editor=synode.Util.lineedit_editor( placeholder='0.2 or [0.2, 0.3]')) parameters.set_string( 'iir_ws', label='Stopband edge frequency', value='0.4', description='Stopband edge frequency', editor=synode.Util.lineedit_editor( placeholder='0.4 or [0.1, 0.4]')) parameters.set_float( 'iir_gpass', label='Max loss in passband (dB)', value=1.0, description='Max loss in the passband (dB)', editor=synode.Util.lineedit_editor(placeholder='2.0')) parameters.set_float( 'iir_gstop', label='Min attenuation in stopband (dB)', value=10.0, description='Min attenuation in the stopband (dB)', editor=synode.Util.lineedit_editor(placeholder='1.0')) parameters.set_list( 'filtering', plist=sorted(get_filtering_dict().keys()), value=[1], label='Filtering', description='Filtering types', editor=synode.Util.combo_editor()) parameters.set_list( 'signal_select', label='Select Signal', description='Select a signal', editor=synode.Util.combo_editor()) parameters.set_boolean( 'auto_plot', label='Auto refresh', description='Automatically refresh the plot') def adjust_parameters(self, node_context): datafile = node_context.input['port1'] if datafile.is_valid() and len(datafile) > 0: signal_map = map_adaf_to_signal_list(datafile[0]) else: signal_map = [] items = ['{} ({}/{})'.format( line[2], line[0], line[1]) for line in signal_map] node_context.parameters['signal_select'].adjust(items) def exec_parameter_view(self, node_context): return FilterADAFsPlotWidget(node_context) def execute(self, node_context): input_list = node_context.input['port1'] output_list = node_context.output['port1'] number_of_items = len(input_list) for idx, adaffile in enumerate(input_list): outadaffile = output_list.create() filter_signals(adaffile, outadaffile, node_context.parameters) output_list.append(outadaffile) self.set_progress(100.0 * (idx + 1) / number_of_items)
def form_layout_factory(parameter_widgets, fixed_width=None, add_stretch=False): """ A factory creating a 2 column (label, editor) form layout. Parameters ---------- parameter_widgets : [tuple] A list of tuples, where each tuple contains at least the widget. Optionally, a QLabel can be defined which would overwrite any existing label_widget of the main widget. If a label should be skipped but the editor should be aligned in the left column one can input an empty string. fixed_width : int, optional Define a fixed width for the editor column in pixels. add_stretch : bool, optional If a stretch should be added to the end of the layout. Returns ------- layout : QtGui.QVBoxLayout """ if fixed_width is not None and fixed_width < 0: fixed_width = None layout = QtGui.QGridLayout() for i, item in enumerate(parameter_widgets): # get label and editor widget widget = item[0] editor = getattr(widget, 'editor', None) label = getattr(widget, 'label_widget', None) # override label with a given label string or QLabel if len(item) > 1: given_label = item[1] if isinstance(given_label, six.string_types): label = QtGui.QLabel(six.text_type(label)) elif isinstance(given_label, QtGui.QLabel): label = given_label # assign editor to the right column if editor is None: editor = widget if label and editor: # add the label and editor to the layout label_widget = label() editor_widget = editor() layout.addWidget(label_widget, i, 0) layout.addWidget(editor_widget, i, 1) if fixed_width: editor_widget.setMaximumWidth(fixed_width) else: # add the given widget to the layout hlayout = QtGui.QHBoxLayout() hlayout.setContentsMargins(0, 0, 0, 0) hlayout.addWidget(widget) layout.addLayout(hlayout, i, 0, 1, 2) outer_layout = QtGui.QVBoxLayout() outer_layout.addLayout(layout) if add_stretch: outer_layout.addStretch() return outer_layout _scipy_filter_links = None def scipy_filter_links(): global _scipy_filter_links if _scipy_filter_links is None: from scipy import __version__ as scipy_version _scipy_filter_links = { 'FIR': 'http://docs.scipy.org/doc/scipy-{}' '/reference/generated/scipy.signal.firwin.html'.format( scipy_version), 'IIR': 'http://docs.scipy.org/doc/scipy-{}' '/reference/generated/scipy.signal.iirdesign.html'.format( scipy_version)} return _scipy_filter_links class FilterADAFsPlotWidget(ParameterView): def __init__(self, node_context, parent=None): super(FilterADAFsPlotWidget, self).__init__(parent=parent) self._node_context = node_context self._parameters = node_context.parameters self._datafile = node_context.input['port1'] self._status_message = '' self._is_valid = True self._init_gui() def resizeEvent(self, event): super(FilterADAFsPlotWidget, self).resizeEvent(event) self.figure.tight_layout() def _init_gui(self): self._fir_dict = get_fir_window_dict() # Init guis from parameter_root # global parameters self._filtering_combo = self._parameters['filtering'].gui() self._freq_combo = self._parameters['freq_type'].gui() # FIR specific parameters self._fir_windows_combo = self._parameters['fir_windows'].gui() # FIR filters self._fir_len = self._parameters['fir_len'].gui() self._fir_cutoff = self._parameters['fir_cutoff'].gui() self._fir_w1 = self._parameters['fir_w1'].gui() self._fir_w2 = self._parameters['fir_w2'].gui() # FIR layout fir_layout = form_layout_factory([(self._fir_windows_combo, ), (self._fir_len, ), (self._fir_cutoff, ), (self._fir_w1, ), (self._fir_w2, )], fixed_width=150, add_stretch=True) # IIR specific parameters self._iir_filters_combo = self._parameters['iir_filters'].gui() # IIR filters self._iir_wp = self._parameters['iir_wp'].gui() self._iir_ws = self._parameters['iir_ws'].gui() self._iir_gpass = self._parameters['iir_gpass'].gui() self._iir_gstop = self._parameters['iir_gstop'].gui() # IIR layout iir_layout = form_layout_factory([(self._iir_filters_combo, ), (self._iir_wp, ), (self._iir_ws, ), (self._iir_gpass, ), (self._iir_gstop, )], fixed_width=150, add_stretch=True) # Filter Tabs self._filter_tabs = QtGui.QTabWidget() iir_tab = QtGui.QWidget() iir_tab.setLayout(iir_layout) fir_tab = QtGui.QWidget() fir_tab.setLayout(fir_layout) value_names = self._parameters['filter_type'].list self._filter_tabs.addTab(iir_tab, value_names[0]) self._filter_tabs.addTab(fir_tab, value_names[1]) self._filter_tabs.setSizePolicy(QtGui.QSizePolicy.MinimumExpanding, QtGui.QSizePolicy.Minimum) # Filter type layout filter_type_groupbox = QtGui.QGroupBox('Filter Options') filter_type_layout = form_layout_factory([(self._filtering_combo, ), (self._freq_combo, )], fixed_width=150) filter_type_groupbox.setLayout(filter_type_layout) filter_type_groupbox.setSizePolicy(QtGui.QSizePolicy.MinimumExpanding, QtGui.QSizePolicy.Minimum) # Figure related parameters self._plot_update = self._parameters['auto_plot'].gui() self._plot_update_button = QtGui.QPushButton('Refresh') self.select_signal = self._parameters['signal_select'].gui() plot_button_layout = QtGui.QHBoxLayout() plot_button_layout.addWidget(self._plot_update) plot_button_layout.addWidget(self._plot_update_button) plot_group_layout = QtGui.QVBoxLayout() plot_group_layout.addLayout(plot_button_layout) plot_group_layout.addWidget(self.select_signal) plot_groupbox = QtGui.QGroupBox('Plot Options') plot_groupbox.setLayout(plot_group_layout) self.figure = Figure( facecolor=self.palette().color(QtGui.QPalette.Window).name()) self.canvas = FigureCanvas(self.figure) policy = QtGui.QSizePolicy() policy.setHorizontalStretch(1) policy.setVerticalStretch(1) policy.setHorizontalPolicy(QtGui.QSizePolicy.Expanding) policy.setVerticalPolicy(QtGui.QSizePolicy.Expanding) self.canvas.setSizePolicy(policy) self.canvas.setMinimumWidth(400) # Figure Layout plot_vlayout = QtGui.QVBoxLayout() plot_vlayout.addWidget(self.canvas) # Default navigation toolbar for matplotlib self.mpl_toolbar = NavigationToolbar(self.canvas, self) plot_vlayout.addWidget(self.mpl_toolbar) # Create parameter layout parameter_layout = QtGui.QVBoxLayout() parameter_layout.addWidget(self._filter_tabs) parameter_layout.addWidget(filter_type_groupbox) parameter_layout.addWidget(plot_groupbox) parameter_layout.addStretch() # Create global layout vline = QtGui.QFrame() vline.setFrameShape(QtGui.QFrame.VLine) vline.setFrameShadow(QtGui.QFrame.Sunken) layout = QtGui.QHBoxLayout() layout.addLayout(parameter_layout) layout.addWidget(vline) layout.addLayout(plot_vlayout) self.setLayout(layout) self._setup_plot() # Connect signals self._filter_tabs.currentChanged.connect(self._type_changed) self._fir_windows_combo.editor().currentIndexChanged.connect( self._fir_window_changed) self._fir_len.editor().valueChanged.connect(self._fir_len_changed) self._fir_cutoff.editor().valueChanged.connect( self._fir_cutoff_changed) self._fir_w1.editor().valueChanged.connect(self._fir_w1_changed) self._fir_w2.editor().valueChanged.connect(self._fir_w2_changed) self._iir_filters_combo.editor().currentIndexChanged.connect( self._irr_filter_changed) self._iir_wp.editor().valueChanged.connect(self._iir_wp_changed) self._iir_ws.editor().valueChanged.connect(self._iir_ws_changed) self._iir_gpass.editor().valueChanged.connect(self._irr_gpass_changed) self._iir_gstop.editor().valueChanged.connect(self._irr_gstop_changed) self._freq_combo.editor().valueChanged.connect( self._freq_type_changed) self._filtering_combo.editor().valueChanged.connect(self._plot) self._plot_update.valueChanged[bool].connect(self._enable_plot_button) self._plot_update_button.clicked[bool].connect(self.refresh_plot) self.select_signal.editor().currentIndexChanged[int].connect( self.refresh_plot) self._init_gui_from_parameters() def _init_gui_from_parameters(self): self._datafile = self._node_context.input['port1'] if self._datafile.is_valid() and len(self._datafile) > 0: self.signal_map = map_adaf_to_signal_list(self._datafile[0]) else: self.signal_map = [] self._plot_update_button.setEnabled( not self._parameters['auto_plot'].value) self._filter_tabs.setCurrentIndex( self._parameters['filter_type'].value[0]) self._freq_combo.editor().setEnabled( self._parameters['filter_type'].value[0]) self._fir_window_changed() self._plot(update_data=True) @property def status(self): return self._status_message @property def valid(self): return self._is_valid def clean_status(self): self._is_valid = True self._status_message = '' def validate_parameters(self): """Cross validate parameters""" parameters = self._parameters filter_type = parameters['filter_type'].selected if filter_type == 'IIR': wp, ws, gpass, gstop, ftype = get_iir_filter_parameters(parameters) wp_is_seq = isinstance(wp, collections.Sequence) ws_is_seq = isinstance(ws, collections.Sequence) both_are_seq = wp_is_seq and ws_is_seq if (wp_is_seq and not ws_is_seq) or (ws_is_seq and not wp_is_seq): message = ('Both, <b>Passband</b> and <b>Stopband</b> need ' 'to be either both floating point numbers or both ' 'a sequence of <b>two</b> floating point ' 'numbers.') self._is_valid = False elif both_are_seq and (len(wp) != 2 or len(ws) != 2): message = ('Both, <b>Passband</b> and <b>Stopband</b> need ' 'to be length 2, e.g.: "0.2, 0.3" and "0.1, 0.4"') self._is_valid = False elif (both_are_seq and (len(wp) == 2 and len(ws) == 2) and not ((min(ws) < min(wp) and max(wp) < max(ws)) or (min(ws) > min(wp) and max(wp) > max(ws)))): message = ('Either the <b>Passband</b> has to lie within the ' '<b>Stopband</b> or vice versa.') self._is_valid = False elif (both_are_seq and (len(wp) == 2 and len(ws) == 2) and not (ws[0] < ws[1])): message = ('The second <b>Stopband</b> value must be greater ' 'than the first one! E.g. "0.1, 0.4"') self._is_valid = False elif (both_are_seq and (len(wp) == 2 and len(ws) == 2) and not (wp[0] < wp[1])): message = ('The second <b>Passband</b> value must be greater ' 'than the first one! E.g. "0.2, 0.3"') self._is_valid = False elif gpass >= gstop: message = ('The <b>Max loss ...</b> must be larger ' 'than the <b>Min attenuation ..</b>!') self._is_valid = False else: message = None if not self._is_valid: if message is not None: self._status_message = self.build_error_message(message) self.status_changed.emit() # currently no cross validation of FIR filter parameters return self._is_valid def _type_changed(self, index): idx = self._filter_tabs.currentIndex() self._parameters['filter_type'].value = [idx] self._freq_combo.editor().setEnabled(idx == 1) self._plot() def _freq_type_changed(self): self.clean_status() self._plot() def _fir_window_changed(self): """FIR window function changed.""" def set_visibility_widgets(widgets, states): for widget, state in zip(widgets, states): widget.editor().setVisible(state) widget.label_widget().setVisible(state) # Change name on w1 and w2. selected_window = self._parameters['fir_windows'].selected params = self._fir_dict[selected_window]['param'] len_param = len(params) fir_w1_label = self._fir_w1.label_widget() fir_w2_label = self._fir_w2.label_widget() if len_param == 0: fir_w1_label.setText('') fir_w2_label.setText('') set_visibility_widgets([self._fir_w1, self._fir_w2], [False, False]) elif len_param == 1: fir_w1_label.setText(params[0]) fir_w2_label.setText('') set_visibility_widgets([self._fir_w1, self._fir_w2], [True, False]) elif len_param == 2: fir_w1_label.setText(params[0]) fir_w2_label.setText(params[1]) set_visibility_widgets([self._fir_w1, self._fir_w2], [True, True]) self.clean_status() self._plot() def _fir_len_changed(self): editor = self._fir_len.editor() self.validate_parameter('fir_len', editor, func=int) def _fir_cutoff_changed(self): text = self._parameters['fir_cutoff'].value editor = self._fir_cutoff.editor() validated = True message = '' try: value = ast.literal_eval(text) if isinstance(value, (int, float)) and value <= 0: validated = False message = '<i>{}</i>!'.format(text) elif (isinstance(value, collections.Sequence) and any(map(lambda i: i <= 0 or i >= 1, value))): validated = False message = 'Frequencies must be greater than 0 and less than 1!' elif (isinstance(value, collections.Sequence) and np.any(np.diff(value) <= 0)): validated = False message = 'The frequencies must be strictly increasing!' except (SyntaxError, ValueError): validated = False message = ('Mal-formatted input. Please enter only comma ' 'separated floats in the interval ]0, 1[!') except Exception as e: validated = False message = six.text_type(e) label = self._parameters['fir_cutoff'].label message = 'Invalid <b>{}</b>: {}'.format(label, message) self.handle_validation_state('fir_cutoff', validated, editor, message) if validated: self._plot() def _fir_w1_changed(self): editor = self._fir_w1.editor() self.validate_parameter('fir_w1', editor, func=float) def _fir_w2_changed(self): editor = self._fir_w2.editor() self.validate_parameter('fir_w2', editor, func=float) def _irr_filter_changed(self): self.clean_status() self._plot() def _iir_wp_changed(self): editor = self._iir_wp.editor() self.validate_irr_edge_frequencies('iir_wp', editor) def _iir_ws_changed(self): editor = self._iir_ws.editor() self.validate_irr_edge_frequencies('iir_ws', editor) def _irr_gpass_changed(self): editor = self._iir_gpass.editor() self.validate_parameter('iir_gpass', editor, func=float) def _irr_gstop_changed(self): editor = self._iir_gstop.editor() self.validate_parameter('iir_gstop', editor, func=float) def validate_irr_edge_frequencies(self, parameter, editor): text = self._parameters[parameter].value validated = True message = '' try: value = ast.literal_eval(text) if isinstance(value, (list, tuple)): out_of_limits = any([i <= 0 or i >= 1 for i in value]) if len(value) != 2 or out_of_limits: validated = False elif (isinstance(value, (int, float)) and (value <= 0 or value >= 1.)): validated = False except (SyntaxError, ValueError): validated = False message = ('Mal-formatted input! Only floating point numbers or ' 'comma separated floating point numbers are allowed!') except Exception as e: validated = False message = six.text_type(e) label = self._parameters['fir_cutoff'].label message = 'Invalid <b>{}</b>: {}'.format(label, message) self.handle_validation_state(parameter, validated, editor, message) def validate_parameter(self, parameter, editor, func=float): text = self._parameters[parameter].value validated = True message = '' try: value = func(text) if value <= 0.: validated = False except ValueError as e: validated = False message = six.text_type(e) self.handle_validation_state(parameter, validated, editor, message) def handle_validation_state(self, parameter, validated, editor, message=''): text = self._parameters[parameter].value if not validated and message == '': label = self._parameters[parameter].label message = ('Invalid <b>{}</b>: <i>{}</i>!' ''.format(label, text)) self._status_message = self.build_error_message(message) self._is_valid = validated self.set_widgets_state_color(editor, validated) self.status_changed.emit() if validated: self._plot() @staticmethod def set_widgets_state_color(widget, state): color = QtGui.QColor(0, 0, 0, 0) if not state: color = QtCore.Qt.red if widget is not None: palette = widget.palette() palette.setColor(widget.backgroundRole(), color) widget.setPalette(palette) def _enable_plot_button(self, state): # disable refresh button self._plot_update_button.setEnabled(not state) self._plot() def refresh_plot(self, i): self.mpl_toolbar.update() self._plot(update_data=True) def _plot(self, update_data=False): if not self.validate_parameters(): return b, a = None, None try: with warnings.catch_warnings(record=True) as w, CapturePrint() \ as cp: b, a = generate_filter(self._parameters) if len(w): self._is_valid = False message = six.text_type(w.pop(0).message) self._status_message = self.build_error_message(message) elif len(cp): self._is_valid = False message = self.build_error_message( '\n'.join([i for i in cp])) self._status_message = message else: self._is_valid = True self._status_message = '' except OverflowError: self._is_valid = False message = 'The value is too large.' self._status_message = self.build_error_message(message) except ValueError as e: self._is_valid = False message = self.build_error_message(six.text_type(e)) self._status_message = message except (SyntaxError, IndexError) as e: self._is_valid = False message = self.build_error_message(six.text_type(e)) self._status_message = message self.status_changed.emit() if b is not None and a is not None: if self._parameters['auto_plot'].value or update_data: self._update_data_plot(b, a) self._update_filter_plot(b, a) self.figure.tight_layout() self.canvas.draw_idle() def build_error_message(self, base_message): filter_type = self._parameters['filter_type'].selected filter_link = scipy_filter_links()[filter_type] message = ('<p>{}</p>' '<p>See the {} filter documentation for valid input ' 'parameter: <a href={}>{}</a></p>' ''.format(base_message, filter_type, filter_link, filter_link)) return message def _update_data_plot(self, b, a): self.filtered_signal_line.set_visible(self._is_valid) if not self._is_valid: return # get timeseries ts = self._get_current_signal() if ts is None: return filtering_dict = get_filtering_dict() selected_filter = self._parameters['filtering'].selected try: filtered_signal = filtering_dict[selected_filter](b, a, ts) except ValueError as e: self._is_valid = True message = self.build_error_message(six.text_type(e)) self._status_message = message self.status_changed.emit() return x = np.arange(len(ts)) self.original_signal_line.set_data(x, ts) self.filtered_signal_line.set_data(x, filtered_signal) self.data_axes.set_xlim(min(x), max(x)) self.data_axes.set_ylim(min([min(ts), min(filtered_signal)]), max([max(ts), max(filtered_signal)])) self.data_axes.autoscale_view(True, True, True) def _update_filter_plot(self, b, a): self.filter_magnitude_line.set_visible(self._is_valid) self.filter_phase_line.set_visible(self._is_valid) if self._is_valid: w, h = signal.freqz(b, a) # possibly add worN here w /= w.max() angles = np.unwrap(np.arctan2(h.imag, h.real)) with warnings.catch_warnings(record=True) as warn: self.filter_magnitude_line.set_data(w, 20 * np.log(np.abs(h))) if len(warn): self._status_message = six.text_type(warn[-1].message) self._is_valid = True self.status_changed.emit() self.filter_phase_line.set_data(w, angles) # following is need to update the data limits # and view after updating line data for ax in [self.filter_axes_magnitude, self.filter_axes_phase]: ax.relim() ax.autoscale_view(True, True, True) def _setup_plot(self): self.filter_axes_magnitude = self.figure.add_subplot(211) self.filter_axes_phase = self.filter_axes_magnitude.twinx() self.data_axes = self.figure.add_subplot(212) # setup filter subplot self.filter_axes_magnitude.set_ylabel('Amplitude [dB]', color='b') self.filter_axes_phase.set_ylabel('Phase', color='g') self.filter_axes_magnitude.set_xlabel('Normalized Frequency [' '$\\times \pi$ ' 'rad/sample]') # setup data subplot self.data_axes.set_ylabel('Data') self.original_signal_line, = self.data_axes.plot( [], 'ro', markersize=2, label='Data') self.filtered_signal_line, = self.data_axes.plot( [], 'b', label='Filtered Data') self.filter_magnitude_line, = self.filter_axes_magnitude.plot( [], [], 'b', label='Amplitude') self.filter_phase_line, = self.filter_axes_phase.plot( [], [], 'g', label='Phase') def _get_current_signal(self): # could possibly be simplified value = self._parameters['signal_select'].value if not value: return None current_selected_idx = value[0] if self.signal_map: system, raster, signal = self.signal_map[current_selected_idx] try: raster = self._datafile[0].sys[system][raster].to_table() ts = raster.get_column_to_array(signal) except KeyError: ts = None else: ts = None return ts