Source code for node_image_calculator

# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import ast
import re
import numpy as np
import threading
import time
import traceback
import operator

from sympathy.api import ParameterView
from sympathy.api import node
from sympathy.api import qt2 as qt
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.platform import widget_library as sywidgets
from sympathy.platform.exceptions import NoDataError
from sympathy.platform import colors

from sympathy.types.sylist import sylist
from sympathy.types.sytuple import sytuple
from sympathy.api import dtypes
from sympathy.utils import search

from sylib.calculator.calculator_gui import CalcFieldWidget
from sylib.calculator.calculator_gui import TreeDragWidget
from sylib.calculator import plugins
from sylib.imageprocessing.image import ImagePort
from sylib.imageprocessing.image_viewer import ImageViewer
from sylib.util import base_eval


QtGui = qt.QtGui
QtCore = qt.QtCore
QtWidgets = qt.QtWidgets
Qt = QtCore.Qt


# TODO: inlined sylib.calculator.calculator_gui.init_tree from Sympathy 2.0.0
# to workaround changed interface. Update to use new calculator or refine
# local,
# inlined solution. Generic calculator code could be packaged as public
# functions if necessary. Aim to reduce duplication!
def _init_tree(function_tree, input_columns_and_types=None):
    """Initialises the function tree widget with common functions with
    tooltips.

    Parameters
    ---------
    function_tree : OldTreeDragWidget
        Function tree widget.
    """
    function_tree.setDragEnabled(True)
    function_tree.setDropIndicatorShown(True)
    function_tree.setDragDropMode(
        QtWidgets.QAbstractItemView.DragDropMode.DragOnly)
    function_tree.setColumnCount(1)
    function_tree.headerItem().setHidden(True)

    def trailing_whitespace(expr):
        res = False
        for item in ast.walk(
                compile(expr, '<string>', 'eval', ast.PyCF_ONLY_AST)):
            if isinstance(item, ast.Constant):
                s = item.value
                if re.search('\\s$', s):
                    res = True
                    break
        return res

    def _add_tree_item(parent, text, func_name, tool_tip,
                       warn_whitespace=False,
                       column=0):
        """
        Creates a new QtWidgets.QTreeWidgetItem and adds it as child to parent.

        Parameters:
        -----------
        parent : QtWidgets.QTreeWidgetItem
            The parent QtWidgets.QTreeWidgetItem node.
        column : int
            The column the text should be placed in.
        text : string
            The node text
        func_name : string
            The method syntax
        tool_tip : string
            The text at the tooltip

        Returns
        --------
        QtWidgets.QTreeWidgetItem
            The new QtWidgets.QTreeWidgetItem node.
        """
        item = QtWidgets.QTreeWidgetItem(parent)
        item.setText(column, text)
        item.setData(0, QtCore.Qt.ItemDataRole.UserRole, func_name)
        if warn_whitespace:
            item.setBackground(column, QtGui.QBrush(colors.WARNING_BG_COLOR))
            item.setToolTip(
                column,
                '{}\n'
                'Warning, this input contains whitespace at the end.'
                .format(tool_tip))
        else:
            item.setToolTip(column, tool_tip)

        parent.addChild(item)
        return item

    def build_tree(root, content):
        if isinstance(content, list):
            for item in content:
                _add_tree_item(root, *item)
        elif isinstance(content, dict):
            for tree_text, subcontent in content.items():
                subroot = QtWidgets.QTreeWidgetItem(root)
                subroot.setText(0, tree_text)
                build_tree(subroot, subcontent)
        else:
            raise TypeError("Plugin gui dict contained unsupported object "
                            "type: {}".format(type(gui_dict)))

    def purge_hidden_items(gui_dict, hidden_items):
        """
        Create a new gui_dict without the items in hidden_items.

        hidden_items should be a list of tuples with the "paths" in the
        gui_dict that should be hidden.
        """
        if not hidden_items:
            return gui_dict

        # The items that should be hidden on this level in the tree
        current_hidden = [item[0] for item in hidden_items if len(item) == 1]

        if isinstance(gui_dict, list):
            # Hide some items
            new_gui_dict = [item for item in gui_dict
                            if item[0] not in current_hidden]
        elif isinstance(gui_dict, dict):
            new_gui_dict = {}
            for subtree_label, subtree in gui_dict.items():
                if subtree_label in current_hidden:
                    # Hide this subtree
                    continue

                new_hidden = [item[1:] for item in hidden_items
                              if item[0] == subtree_label]
                new_gui_dict[subtree_label] = purge_hidden_items(
                    subtree, new_hidden)
        else:
            raise TypeError("Plugin gui dict contained unsupported object "
                            "type: {}".format(type(gui_dict)))
        return new_gui_dict

    if input_columns_and_types:
        available_signals = {
            'Signals': [[name, name, 'Signal: {}\nType: {}'.format(
                name, dtype), trailing_whitespace(name)]
                        for name, dtype in input_columns_and_types]}

        build_tree(function_tree, available_signals)

    available_plugins = sorted(plugins.available_plugins('python'),
                               key=operator.attrgetter("WEIGHT"))

    hidden_items = []
    for plugin in available_plugins:
        hidden_items.extend(plugin.hidden_items())
    for plugin in available_plugins:
        gui_dict = purge_hidden_items(plugin.gui_dict(), hidden_items)
        build_tree(function_tree, gui_dict)


[docs] class ImageCalculator(node.Node): author = 'Mathias Broxvall' icon = 'image_image_calculator.svg' description = 'Creates an image from a Python expression.' tags = Tags(Tag.ImageProcessing.IO) name = 'Image calculator' nodeid = 'com.sympathyfordata.imageanalysis.image_calculator' __doc__ = description inputs = Ports([ Port.Custom('<a>', 'Input', name='arg', n=(0, 1, 1)), ]) outputs = Ports([ ImagePort('Result', name='result'), ]) parameters = node.parameters() parameters.set_string( 'expression', value="arg.get_image()", description='Expression that evaluates to a 2D or 3D array', label='Image' ) def exec_parameter_view(self, node_context): return ImageCalculatorWidget(node_context) def execute(self, node_context): params = node_context.parameters expression = params['expression'].value arg = node_context.input.group('arg') dct = {} if len(arg) > 0: dct['arg'] = arg[0] # for i in range(len(arg)): # dct['arg{}'.format(i+1)] = arg[i] im = np.array(base_eval(expression, dct)) node_context.output['result'].set_image(im)
class ImageObjectProxy: def __init__(self, data): self._data = data def get_image(self): return self._data class ImageCalculatorWidget(ParameterView): def __init__(self, node_context, parent=None): super(ParameterView, self).__init__(parent=parent) self._validator = None self._node_context = node_context self._parameters = node_context.parameters self.main_layout = QtWidgets.QVBoxLayout() self.setLayout(self.main_layout) # Main view layout # # vsplitter # - Image viewer # - hsplitter # - code_editor # - signals widget self.vsplitter = QtWidgets.QSplitter() self.vsplitter.setOrientation(Qt.Orientation.Vertical) self.main_layout.addWidget(self.vsplitter) # Image viewer self.image_viewer = ImageViewer() self.vsplitter.addWidget(self.image_viewer) # # Calculator area # self.hsplitter = QtWidgets.QSplitter() self.hsplitter.setOrientation(Qt.Orientation.Horizontal) self.vsplitter.addWidget(self.hsplitter) # Code editor self.code_editor = CalcFieldWidget() self.highlighter = sywidgets.pygments_highlighter_factory( self.code_editor, 'python') self.code_editor.textChanged.connect(self.highlighter.rehighlight) self.code_editor.setPlainText(self._parameters['expression'].value) self.hsplitter.addWidget(self.code_editor) # function widget self.function_tree = TreeDragWidget() self.search_field = sywidgets.ClearButtonLineEdit( placeholder='Search for signal or function') self.tree_head = None function_label = QtWidgets.QLabel("Signals and Common functions") self.function_tree.setMaximumWidth(2000) function_tree_layout = QtWidgets.QVBoxLayout() function_tree_layout.setContentsMargins(0, 0, 0, 0) function_tree_layout.setSpacing(5) function_tree_layout.addWidget(function_label) function_tree_layout.addWidget(self.function_tree) function_tree_layout.addWidget(self.search_field) functions_widget = QtWidgets.QWidget() functions_widget.setLayout(function_tree_layout) self.hsplitter.addWidget(functions_widget) _init_tree(self.function_tree, self.calculate_signals_names()) self.tree_head = self.function_tree.invisibleRootItem() # Connect slots for actions self.code_editor.textChanged.connect(self.code_updated) self.search_field.textChanged.connect(self.search) self.function_tree.itemDoubleClicked.connect(self.insert_function) self.code_editor.insert_function.connect(self.insert_text) # Valid and status messages self._valid = False self._status_message = None # Preview thread self.execute_expr = None self.thread_exit_event = threading.Event() self.thread = threading.Thread(target=self.run) self.thread.start() def code_updated(self): self._parameters['expression'].value = self.code_editor.toPlainText() def cleanup(self): self.thread_exit_event.set() @property def valid(self): return self._valid def has_status(self): return True @property def status(self): if self._status_message: return self._status_message return '' def calculate_signals_names(self): def flatten_input(in_data, names_and_types, prefix=''): if any(isinstance(in_data, sytype) for sytype in (sytuple, sylist)): for i, item in enumerate(in_data): name = prefix + '[{}]'.format(str(i)) flatten_input(item, names_and_types, name) else: try: names = in_data.names('calc') types = in_data.types('calc') except (AttributeError, NoDataError): names = [] types = [] for cname, dtype in zip(names, types): if cname not in [nt[0] for nt in names_and_types]: dtype = dtypes.typename_from_kind(dtype.kind) name = prefix + cname names_and_types.add((name, dtype)) names_and_types = set() arg = self._node_context.input.group('arg') if len(arg) > 0: flatten_input(arg[0], names_and_types, prefix='arg') return sorted(names_and_types, key=lambda key: key[0].lower()) else: return [] def insert_function(self, item): text = item.data(0, QtCore.Qt.ItemDataRole.UserRole) self.insert_text(text) def insert_text(self, text): self.code_editor.insertPlainText(text) def eval_expression(self, expr): dct = {} arg = self._node_context.input.group('arg') if len(arg) > 0: dct['arg'] = arg[0] if not arg[0].is_valid(): self._valid = True self._status_message = ( 'Preview not available, connect input and run all previous' ' nodes first') self.status_changed.emit() return np.array([[1]]) try: im = np.array(base_eval(expr, dct)) self._valid = True self._status_message = '' self.status_changed.emit() except Exception as e: self._valid = False self._status_message = ( traceback.format_exception_only(type(e), e)[-1]) self.status_changed.emit() im = np.array([[1]]) return im def run(self): prev_executed_expr = None while not self.thread_exit_event.is_set(): tmp_expr = self._parameters['expression'].value if tmp_expr == prev_executed_expr: time.sleep(0.1) continue if prev_executed_expr is not None: time.sleep(1) expr = self._parameters['expression'].value if expr == tmp_expr and expr != prev_executed_expr: prev_executed_expr = expr im = self.eval_expression(expr) if len(im.shape) == 0: im = im.reshape((1, 1)) elif len(im.shape) == 1: im = im.reshape(im.shape+(1,)) if len(im.shape) <= 3: try: self.image_viewer.update_data(ImageObjectProxy(im)) except IndexError: pass def search(self): def recursive_hide(node, pattern, exact_term): """ Recursively matches the given pattern against tooltips from the given node and down, or the exact search term against the tree items themselves. Returns true if the node is hidden, otherwise false.""" if node.childCount() == 0: text = node.text(0) data = node.data(0, QtCore.Qt.ItemDataRole.UserRole) if not (search.matches(pattern, text) or exact_term in data): do_hide = True else: do_hide = False else: num_hidden = 0 for index in range(0, node.childCount()): if recursive_hide(node.child(index), pattern, exact_term): num_hidden += 1 if num_hidden == node.childCount(): do_hide = True else: do_hide = False node.setHidden(do_hide) node.setExpanded(not do_hide) if len(exact_term) == 0: node.setExpanded(False) return do_hide term = self.search_field.text() pattern = search.fuzzy_pattern(term) recursive_hide(self.tree_head, pattern, term)