# This file is part of Sympathy for Data.
# Copyright (c) 2015-2016 Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import json
import base64
import copy
import re
import sys
import os
import subprocess
import uuid as uuidgen
from sympathy.api.nodeconfig import Tag, Tags, Port, Ports
from sympathy.platform.node import Node
from sympathy.platform.node import parameters as sy_parameters
from sympathy.types import sylambda
from sympathy.app import builtin
from sympathy.platform.node import Util
from sympathy.api import ParameterView
from sympathy.api import exceptions
from sympathy.platform import types
from sympathy.platform import os_support
from sympathy.platform import version_support as vs
from sympathy.platform import qt_compat2
from sympathy.platform import widget_library as sywidgets
QtCore = qt_compat2.import_module('QtCore')
QtGui = qt_compat2.import_module('QtGui')
QtWidgets = qt_compat2.import_module('QtWidgets')
_error_strategy = 'Error'
_skip_strategy = 'Skip Item'
_empty_strategy = 'Create Empty Item'
list_failure_strategies = dict(
[(_error_strategy, 0), (_empty_strategy, 1), (_skip_strategy, 2)])
def _fuzzy_fail_strategy(strategy):
strategy = (strategy or '').lower()
if not strategy:
return _error_strategy
elif 'error' in strategy:
return _error_strategy
elif 'skip' in strategy:
return _skip_strategy
elif 'empty' in strategy:
return _empty_strategy
else:
return _error_strategy
_list_re = re.compile(r'\[(.*)\]')
SYTYPES_DOC = """
Input and output datatypes needs to be specified in configuration. The default value:
'() -> ()' represents the generic (? symbol in the GUI) type that can be any type. Examples of
other datatypes are:
- datasource
- table
- adaf
- figure
- text
List datatype is represented by '[<datatype>]'. For example, if the lambda node takes a list of
tables and returns one figure, the datatype specification should be '[table] -> figure' in the
configuration.
The Lambda node will change its type to reflect the choosen datatype as long as it is a
non-generic function type and can be matched against connected nodes. Datatypes configured
that fail to satisfy the above are simply ignored.
"""
[docs]
class Apply(Node):
"""
Applies a defined Lambda node to an argument.
The argument input port will automatically be set to the input type of the Lambda node.
When the Lambda has multiple input arguments Apply will do a partial application producing a
new Lambda function with with the argument bound as the value for the first argument.
When the Lambda has a single input argument Apply will evaluate the node contents to produce a
result, taking into account any arguments previously bound.
"""
name = 'Apply'
description = 'Apply Lambda node to Argument.'
nodeid = 'org.sysess.builtin.apply'
author = 'Erik der Hagopian <erik.hagopian@combine.se>'
icon = 'apply.svg'
inputs = Ports([Port.Custom('<a> -> <b>', 'Lambda node to Apply',
name='Function'),
Port.Custom('<a>', 'Argument', name='Argument')])
outputs = Ports([Port.Custom('<b>', 'Output', name='Output')])
tags = Tags(Tag.Generic.Lambda)
@staticmethod
def apply_eval(function, output, argument, portdict, objects,
socket_bundle=None, parent_identifier=None):
def port_assign(port0, port1):
port0['type'] = port1['type']
port0['file'] = port1['file']
flowdesc, port_descs = function.get()
nodes = flowdesc.nodes
node_deps = flowdesc.node_deps
input_ports = flowdesc.input_ports
output_ports = flowdesc.output_ports
bypass_ports = flowdesc.bypass_ports
input_assign = [port.port for port in port_descs]
input_assign.append(portdict['inputs'][1])
output_assign = [portdict['outputs'][0]]
# objects maps filenames to opened port objects
objects = {} if objects is None else objects
org_objects = dict(objects)
objects[output_assign[0]['file']] = output
objects[input_assign[-1]['file']] = argument
# Add port objects from earlier Apply nodes
for port_desc in port_descs:
if port_desc.data:
objects[port_desc.port['file']] = port_desc.data
input_assign_map = {}
# Assigned input ports need to have their port description replaced.
# This ensures that input ports inside Lambda get the correct filename.
for ports, assign in zip(input_ports, input_assign or []):
for port in ports:
input_assign_map[port] = assign
output_assign_map = dict(zip(output_ports, output_assign or []))
# If input and output ports of lambda are connected directly to
# eachother, add a fake "Propagate" node.
if bypass_ports:
bypass_port = int(bypass_ports[0])
node_parameters = {
'ports': {'inputs': [input_assign[bypass_port]],
'outputs': [output_assign[0]]},
'parameters': {'data': {'type': 'group'}, 'type': 'json'},
'id': 'org.sysess.builtin.propagate'
}
node = [None, [None, 'Propagate', f'{{{uuidgen.uuid4()}}}',
node_parameters]]
nodes.append(node)
output_filename_map = {}
input_filename_map = {}
filenames_set = set()
# Copy type and file fields for input and output ports
for node_data in nodes:
node_parameters = node_data[1][3]
for portmap, filemap, group in [
(input_assign_map, input_filename_map, 'inputs'),
(output_assign_map, output_filename_map, 'outputs')
]:
for port0 in node_parameters['ports'][group]:
port1 = portmap.get(port0['uuid'])
if port1:
filemap[port0['file']] = port1['file']
port_assign(port0, port1)
filenames_set.add(port0['file'])
prefix = uuidgen.uuid4()
filenames = list(sorted(filenames_set))
filenames_index = {f: i for i, f in enumerate(filenames)}
external_files = set()
for portlist in [input_assign, output_assign]:
external_files.update(p['file'] for p in portlist)
for node_data in nodes:
# For each assigned output port, any connected inputs also need
# to be assigned.
# Do not replace empty filename as these indicate a removed
# optional port.
node_parameters = node_data[1][3]
for port0 in node_parameters['ports']['inputs']:
filename = port0['file']
if filename and filename in output_filename_map:
port0['file'] = output_filename_map[filename]
# Safeguard against logic errors by assigning nonsensical filenames
# to all ports that should be in-memory only.
for group in ['inputs', 'outputs']:
for port0 in node_parameters['ports'][group]:
filename = port0['file']
# Replace internal filenames with generated non-existing
# filenames.
if filename and filename not in external_files:
port0['file'] = (
f'{{{prefix}}}.{{{filenames_index[filename]}}}')
builtin.flow_execute(
nodes, node_deps, {},
objects, socket_bundle,
parent_identifier=parent_identifier)
objects.clear()
objects.update(org_objects)
@staticmethod
def apply(function: sylambda.sylambda, output, port, data):
function.apply(sylambda.PortDesc(port, data))
output.source(function)
def execute(self, node_context):
function = node_context.input['Function']
output = node_context.output['Output']
argument = node_context.input['Argument']
ports = node_context.definition['ports']
nargs = len(function.arguments())
if nargs == 0:
raise AssertionError()
if nargs == 1:
self.apply_eval(
function, output, argument, ports,
node_context._objects, self.socket_bundle,
self._parent_identifier or self._identifier)
elif nargs > 1:
port = ports['inputs'][1]
self.apply(function, output, port, argument)
[docs]
class Map(Node):
"""
Applies a defined Lambda node across a list of elements. The Lambda node is applied
element-wise and the results are returned as a list.
In contrast with Apply, partial application is not supported. This means that the Lambda node
cannot have any extra input ports, if used with the Map node. If more than
one input element is required, the elements could be zipped into a list of
tuples with the node Zip Tuple. The elements will then arrive as tuples in
the Lambda node and can be unzipped on the inside. Alternatively, it is
also possible to first apply a single element to the first port(s) and the
map over the last remaining port.
"""
name = 'Map'
description = 'Map Lambda node over each element in argument list.'
nodeid = 'org.sysess.builtin.map'
author = 'Erik der Hagopian <erik.hagopian@combine.se>'
icon = 'map.svg'
parameters = sy_parameters()
parameters.set_list(
'fail_strategy', label='Action on exception',
list=list_failure_strategies.keys(), value=[0],
description='Decide how failure to process an item should be handled.',
editor=Util.combo_editor())
inputs = Ports([Port.Custom('<a> -> <b>', 'Lambda node to Map',
name='Function'),
Port.Custom('[<a>]', 'Argument List', name='List')])
outputs = Ports([Port.Custom('[<b>]', 'Output List', name='List')])
tags = Tags(Tag.Generic.Lambda)
@staticmethod
def apply_eval(function, output_list, argument_list, portdict,
objects, progress=None, fail_strategy=_error_strategy,
socket_bundle=None, parent_identifier=None):
objects = {} if objects is None else objects
iter_portdict = copy.deepcopy(portdict)
out_list_port = iter_portdict['inputs'][1]
arg_list_port = iter_portdict['outputs'][0]
# Peal of the list from the type for the invoked lambda.
for list_port in [out_list_port, arg_list_port]:
match = _list_re.fullmatch(list_port['type'])
if match is not None:
list_port['type'] = match.groups()[0]
nargs = len(argument_list)
for i, argument in enumerate(argument_list):
if progress is not None:
progress(100.0 * i / nargs)
output = output_list.create()
try:
Apply.apply_eval(function, output, argument, iter_portdict,
dict(objects),
builtin.sub_progress_socket_bundle(
socket_bundle, i, nargs),
parent_identifier=parent_identifier)
output_list.append(output)
except Exception as exc:
if fail_strategy == _error_strategy:
raise exceptions.SyListIndexError(
i, sys.exc_info()
) from exc
elif fail_strategy == _empty_strategy:
output_list.append(output_list.create())
print('Encountered an error for item {}. '
'Creating empty item.'.format(i))
else:
print('Encountered an error for item {}. '
'Skipping item.'.format(i))
if progress is not None:
progress(100.0)
def update_parameters(self, parameter_root):
if ('fail_strategy' in parameter_root and
parameter_root['fail_strategy'].selected == 'Skip File'):
parameter_root['fail_strategy'].selected = _skip_strategy
# .list and .value will be updated automatically to match
# .value_names in the default update_parameters.
def execute(self, node_context):
parameters = node_context.parameters
function = node_context.input['Function']
argument = node_context.input['List']
if node_context._is_own_input(argument):
builtin.set_read_through(argument)
output = node_context.output['List']
ports = node_context.definition['ports']
nargs = len(function.arguments())
if nargs == 0:
raise AssertionError()
if nargs == 1:
self.apply_eval(
function, output, argument, ports,
node_context._objects, self.set_progress,
_fuzzy_fail_strategy(parameters['fail_strategy'].selected),
self.socket_bundle,
self._parent_identifier or self._identifier)
elif nargs > 1:
raise NotImplementedError(
'Partial map application is not supported.')
class TypeEditorParameterView(ParameterView):
def __init__(self, node_context, options, parent=None):
super().__init__(parent=parent)
self._parameters = node_context.parameters
self._types_combo = sywidgets.ValidatedTextComboBox()
value = self._parameters['datatype'].value
options = list(options)
if not any(
option for option, tooltip in options if option == value):
options.append((value, 'Saved data type'))
options = list(sorted(
options, key=lambda x: (x[0].lstrip()[:1] in '<[{(', x)))
for i, (name, tooltip) in enumerate(options):
self._types_combo.addItem(name)
self._types_combo.setItemData(
i, tooltip, role=QtCore.Qt.ItemDataRole.ToolTipRole)
self._types_combo.lineEdit().setBuilder(self._text_type_builder)
index = self._types_combo.findText(value)
if index > 0:
self._types_combo.setCurrentIndex(index)
else:
self._types_combo.setCurrentText(value)
self._types_combo.setToolTip(
self._parameters['datatype'].gui().toolTip())
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self._types_combo)
self.setLayout(layout)
def _text_type_builder(self, value):
try:
types.from_type_expand(types.from_string(value))
except Exception as exc:
raise sywidgets.ValidationError(
f"Invalid data type: {value}"
) from exc
return str(value)
def save_parameters(self):
self._parameters['datatype'].value = self._types_combo.value()
class ExtractParameterView(TypeEditorParameterView):
def __init__(self, node_context, parent=None):
options = [('() -> ()',
'Function with no argument and no result, data type')]
super().__init__(node_context, options, parent=parent)
ExtractLambdasHelper.add_fail_strategy(self._parameters)
fail_strategy = self._parameters['fail_strategy'].gui()
self.layout().addWidget(fail_strategy)
class ExtractLambdasHelper:
@staticmethod
def add_fail_strategy(parameters):
if 'fail_strategy' not in parameters:
error_handling = dict([
('error', 'Error'),
('skip', 'Skip item'),
])
parameters.set_string(
'fail_strategy',
label='Strategy for handling extraction issues',
description='How should the node handle the situation when '
'the extract fails?',
value='skip',
editor=Util.combo_editor(
options=error_handling))
[docs]
class Empty(Node):
"""
This node can only run successfully if the output port has a concrete
type. This can be acheived either by connecting the output port to other
nodes so that the type becomes concrete or by specifying the type in the
configuration. For syntax and examples of how to specify port types in the
configuration, see :ref:`custom_ports`.
"""
author = ('Erik der Hagopian <erik.hagopian@combine.se>, '
'Benedikt Ziegler <benedikt.ziegler@combine.se>')
name = 'Empty'
description = 'Generate empty data of inferred or specified type'
nodeid = 'org.sysess.builtin.empty'
icon = 'empty.svg'
tags = Tags(Tag.Input.Generate)
inputs = Ports([])
outputs = Ports([Port.Custom(
'<a>',
'Output port containing empty data (must be connected or specified)')])
parameters = sy_parameters()
parameters.set_string(
'datatype', label='Datatype',
description='Define the datatype of the output port.', value='<a>')
def exec_parameter_view(self, node_context):
options = []
typealiases = list(sorted(types.typealias_names()))
for typealias in typealiases:
options.append((typealias, 'Alias data type'))
options.append(('<a>', 'Generic data type'))
options.append(('[<a>]', 'Generic list data type'))
options.append(('{<a>}', 'Generic dict data type'))
options.append(('(<a>,<b>)', 'Generic tuple 2 data type'))
return TypeEditorParameterView(node_context, options)
def execute(self, node_context):
str_type = node_context.definition['ports']['outputs'][0]['type']
arg_type = types.from_string(str_type)
if types.generics(arg_type):
raise exceptions.SyDataError(
'Output port must be connected and non-generic')
class Propagate(Node):
"""Propagate input to output."""
author = 'Erik der Hagopian <erik.hagopian@combine.se>'
name = 'Propagate'
description = 'Propagate input to output'
nodeid = 'org.sysess.builtin.propagate'
icon = 'empty.svg'
tags = Tags(Tag.Hidden.Internal)
inputs = Ports([Port.Custom('<a>', 'Input')])
outputs = Ports([Port.Custom('<a>', 'Output')])
parameters = sy_parameters()
def execute(self, node_context):
node_context.output[0].source(node_context.input[0])