# This file is part of Sympathy for Data.
# Copyright (c) 2018, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import ast
import re
import numpy as np
import threading
import time
import traceback
import operator
from sympathy.api import ParameterView
from sympathy.api import node
from sympathy.api import qt2 as qt
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.platform import widget_library as sywidgets
from sympathy.platform.exceptions import NoDataError
from sympathy.platform import colors
from sympathy.types.sylist import sylist
from sympathy.types.sytuple import sytuple
from sympathy.api import dtypes
from sympathy.utils import search
from sylib.calculator.calculator_gui import CalcFieldWidget
from sylib.calculator.calculator_gui import TreeDragWidget
from sylib.calculator import plugins
from sylib.imageprocessing.image import ImagePort
from sylib.imageprocessing.image_viewer import ImageViewer
from sylib.util import base_eval
QtGui = qt.QtGui
QtCore = qt.QtCore
QtWidgets = qt.QtWidgets
Qt = QtCore.Qt
# TODO: inlined sylib.calculator.calculator_gui.init_tree from Sympathy 2.0.0
# to workaround changed interface. Update to use new calculator or refine
# local,
# inlined solution. Generic calculator code could be packaged as public
# functions if necessary. Aim to reduce duplication!
def _init_tree(function_tree, input_columns_and_types=None):
"""Initialises the function tree widget with common functions with
tooltips.
Parameters
---------
function_tree : OldTreeDragWidget
Function tree widget.
"""
function_tree.setDragEnabled(True)
function_tree.setDropIndicatorShown(True)
function_tree.setDragDropMode(
QtWidgets.QAbstractItemView.DragDropMode.DragOnly)
function_tree.setColumnCount(1)
function_tree.headerItem().setHidden(True)
def trailing_whitespace(expr):
res = False
for item in ast.walk(
compile(expr, '<string>', 'eval', ast.PyCF_ONLY_AST)):
if isinstance(item, ast.Constant):
s = item.value
if re.search('\\s$', s):
res = True
break
return res
def _add_tree_item(parent, text, func_name, tool_tip,
warn_whitespace=False,
column=0):
"""
Creates a new QtWidgets.QTreeWidgetItem and adds it as child to parent.
Parameters:
-----------
parent : QtWidgets.QTreeWidgetItem
The parent QtWidgets.QTreeWidgetItem node.
column : int
The column the text should be placed in.
text : string
The node text
func_name : string
The method syntax
tool_tip : string
The text at the tooltip
Returns
--------
QtWidgets.QTreeWidgetItem
The new QtWidgets.QTreeWidgetItem node.
"""
item = QtWidgets.QTreeWidgetItem(parent)
item.setText(column, text)
item.setData(0, QtCore.Qt.ItemDataRole.UserRole, func_name)
if warn_whitespace:
item.setBackground(column, QtGui.QBrush(colors.WARNING_BG_COLOR))
item.setToolTip(
column,
'{}\n'
'Warning, this input contains whitespace at the end.'
.format(tool_tip))
else:
item.setToolTip(column, tool_tip)
parent.addChild(item)
return item
def build_tree(root, content):
if isinstance(content, list):
for item in content:
_add_tree_item(root, *item)
elif isinstance(content, dict):
for tree_text, subcontent in content.items():
subroot = QtWidgets.QTreeWidgetItem(root)
subroot.setText(0, tree_text)
build_tree(subroot, subcontent)
else:
raise TypeError("Plugin gui dict contained unsupported object "
"type: {}".format(type(gui_dict)))
def purge_hidden_items(gui_dict, hidden_items):
"""
Create a new gui_dict without the items in hidden_items.
hidden_items should be a list of tuples with the "paths" in the
gui_dict that should be hidden.
"""
if not hidden_items:
return gui_dict
# The items that should be hidden on this level in the tree
current_hidden = [item[0] for item in hidden_items if len(item) == 1]
if isinstance(gui_dict, list):
# Hide some items
new_gui_dict = [item for item in gui_dict
if item[0] not in current_hidden]
elif isinstance(gui_dict, dict):
new_gui_dict = {}
for subtree_label, subtree in gui_dict.items():
if subtree_label in current_hidden:
# Hide this subtree
continue
new_hidden = [item[1:] for item in hidden_items
if item[0] == subtree_label]
new_gui_dict[subtree_label] = purge_hidden_items(
subtree, new_hidden)
else:
raise TypeError("Plugin gui dict contained unsupported object "
"type: {}".format(type(gui_dict)))
return new_gui_dict
if input_columns_and_types:
available_signals = {
'Signals': [[name, name, 'Signal: {}\nType: {}'.format(
name, dtype), trailing_whitespace(name)]
for name, dtype in input_columns_and_types]}
build_tree(function_tree, available_signals)
available_plugins = sorted(plugins.available_plugins('python'),
key=operator.attrgetter("WEIGHT"))
hidden_items = []
for plugin in available_plugins:
hidden_items.extend(plugin.hidden_items())
for plugin in available_plugins:
gui_dict = purge_hidden_items(plugin.gui_dict(), hidden_items)
build_tree(function_tree, gui_dict)
[docs]
class ImageCalculator(node.Node):
author = 'Mathias Broxvall'
icon = 'image_image_calculator.svg'
description = 'Creates an image from a Python expression.'
tags = Tags(Tag.ImageProcessing.IO)
name = 'Image calculator'
nodeid = 'com.sympathyfordata.imageanalysis.image_calculator'
__doc__ = description
inputs = Ports([
Port.Custom('<a>', 'Input', name='arg', n=(0, 1, 1)),
])
outputs = Ports([
ImagePort('Result', name='result'),
])
parameters = node.parameters()
parameters.set_string(
'expression', value="arg.get_image()",
description='Expression that evaluates to a 2D or 3D array',
label='Image'
)
def exec_parameter_view(self, node_context):
return ImageCalculatorWidget(node_context)
def execute(self, node_context):
params = node_context.parameters
expression = params['expression'].value
arg = node_context.input.group('arg')
dct = {}
if len(arg) > 0:
dct['arg'] = arg[0]
# for i in range(len(arg)):
# dct['arg{}'.format(i+1)] = arg[i]
im = np.array(base_eval(expression, dct))
node_context.output['result'].set_image(im)
class ImageObjectProxy:
def __init__(self, data):
self._data = data
def get_image(self):
return self._data
class ImageCalculatorWidget(ParameterView):
def __init__(self, node_context, parent=None):
super(ParameterView, self).__init__(parent=parent)
self._validator = None
self._node_context = node_context
self._parameters = node_context.parameters
self.main_layout = QtWidgets.QVBoxLayout()
self.setLayout(self.main_layout)
# Main view layout
#
# vsplitter
# - Image viewer
# - hsplitter
# - code_editor
# - signals widget
self.vsplitter = QtWidgets.QSplitter()
self.vsplitter.setOrientation(Qt.Orientation.Vertical)
self.main_layout.addWidget(self.vsplitter)
# Image viewer
self.image_viewer = ImageViewer()
self.vsplitter.addWidget(self.image_viewer)
#
# Calculator area
#
self.hsplitter = QtWidgets.QSplitter()
self.hsplitter.setOrientation(Qt.Orientation.Horizontal)
self.vsplitter.addWidget(self.hsplitter)
# Code editor
self.code_editor = CalcFieldWidget()
self.highlighter = sywidgets.pygments_highlighter_factory(
self.code_editor, 'python')
self.code_editor.textChanged.connect(self.highlighter.rehighlight)
self.code_editor.setPlainText(self._parameters['expression'].value)
self.hsplitter.addWidget(self.code_editor)
# function widget
self.function_tree = TreeDragWidget()
self.search_field = sywidgets.ClearButtonLineEdit(
placeholder='Search for signal or function')
self.tree_head = None
function_label = QtWidgets.QLabel("Signals and Common functions")
self.function_tree.setMaximumWidth(2000)
function_tree_layout = QtWidgets.QVBoxLayout()
function_tree_layout.setContentsMargins(0, 0, 0, 0)
function_tree_layout.setSpacing(5)
function_tree_layout.addWidget(function_label)
function_tree_layout.addWidget(self.function_tree)
function_tree_layout.addWidget(self.search_field)
functions_widget = QtWidgets.QWidget()
functions_widget.setLayout(function_tree_layout)
self.hsplitter.addWidget(functions_widget)
_init_tree(self.function_tree, self.calculate_signals_names())
self.tree_head = self.function_tree.invisibleRootItem()
# Connect slots for actions
self.code_editor.textChanged.connect(self.code_updated)
self.search_field.textChanged.connect(self.search)
self.function_tree.itemDoubleClicked.connect(self.insert_function)
self.code_editor.insert_function.connect(self.insert_text)
# Valid and status messages
self._valid = False
self._status_message = None
# Preview thread
self.execute_expr = None
self.thread_exit_event = threading.Event()
self.thread = threading.Thread(target=self.run)
self.thread.start()
def code_updated(self):
self._parameters['expression'].value = self.code_editor.toPlainText()
def cleanup(self):
self.thread_exit_event.set()
@property
def valid(self):
return self._valid
def has_status(self):
return True
@property
def status(self):
if self._status_message:
return self._status_message
return ''
def calculate_signals_names(self):
def flatten_input(in_data, names_and_types, prefix=''):
if any(isinstance(in_data, sytype) for sytype in
(sytuple, sylist)):
for i, item in enumerate(in_data):
name = prefix + '[{}]'.format(str(i))
flatten_input(item, names_and_types, name)
else:
try:
names = in_data.names('calc')
types = in_data.types('calc')
except (AttributeError, NoDataError):
names = []
types = []
for cname, dtype in zip(names, types):
if cname not in [nt[0] for nt in names_and_types]:
dtype = dtypes.typename_from_kind(dtype.kind)
name = prefix + cname
names_and_types.add((name, dtype))
names_and_types = set()
arg = self._node_context.input.group('arg')
if len(arg) > 0:
flatten_input(arg[0], names_and_types, prefix='arg')
return sorted(names_and_types, key=lambda key: key[0].lower())
else:
return []
def insert_function(self, item):
text = item.data(0, QtCore.Qt.ItemDataRole.UserRole)
self.insert_text(text)
def insert_text(self, text):
self.code_editor.insertPlainText(text)
def eval_expression(self, expr):
dct = {}
arg = self._node_context.input.group('arg')
if len(arg) > 0:
dct['arg'] = arg[0]
if not arg[0].is_valid():
self._valid = True
self._status_message = (
'Preview not available, connect input and run all previous'
' nodes first')
self.status_changed.emit()
return np.array([[1]])
try:
im = np.array(base_eval(expr, dct))
self._valid = True
self._status_message = ''
self.status_changed.emit()
except Exception as e:
self._valid = False
self._status_message = (
traceback.format_exception_only(type(e), e)[-1])
self.status_changed.emit()
im = np.array([[1]])
return im
def run(self):
prev_executed_expr = None
while not self.thread_exit_event.is_set():
tmp_expr = self._parameters['expression'].value
if tmp_expr == prev_executed_expr:
time.sleep(0.1)
continue
if prev_executed_expr is not None:
time.sleep(1)
expr = self._parameters['expression'].value
if expr == tmp_expr and expr != prev_executed_expr:
prev_executed_expr = expr
im = self.eval_expression(expr)
if len(im.shape) == 0:
im = im.reshape((1, 1))
elif len(im.shape) == 1:
im = im.reshape(im.shape+(1,))
if len(im.shape) <= 3:
try:
self.image_viewer.update_data(ImageObjectProxy(im))
except IndexError:
pass
def search(self):
def recursive_hide(node, pattern, exact_term):
"""
Recursively matches the given pattern against tooltips from the
given node and down, or the exact search term against the tree
items themselves. Returns true if the node is hidden, otherwise
false."""
if node.childCount() == 0:
text = node.text(0)
data = node.data(0, QtCore.Qt.ItemDataRole.UserRole)
if not (search.matches(pattern, text) or
exact_term in data):
do_hide = True
else:
do_hide = False
else:
num_hidden = 0
for index in range(0, node.childCount()):
if recursive_hide(node.child(index), pattern, exact_term):
num_hidden += 1
if num_hidden == node.childCount():
do_hide = True
else:
do_hide = False
node.setHidden(do_hide)
node.setExpanded(not do_hide)
if len(exact_term) == 0:
node.setExpanded(False)
return do_hide
term = self.search_field.text()
pattern = search.fuzzy_pattern(term)
recursive_hide(self.tree_head, pattern, term)