# This file is part of Sympathy for Data.
# Copyright (c) 2015-2016 Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import json
import base64
import copy
import re
import sys
import os
import subprocess
import uuid as uuidgen
from sympathy.api.nodeconfig import Tag, Tags, Port, Ports
from sympathy.platform.node import Node
from sympathy.platform.node import parameters as sy_parameters
from sympathy.types import sylambda
from sympathy.app import builtin
from sympathy.platform.node import Util
from sympathy.api import ParameterView
from sympathy.api import exceptions
from sympathy.platform import types
from sympathy.platform import os_support
from sympathy.platform import version_support as vs
from sympathy.platform import qt_compat2
from sympathy.platform import widget_library as sywidgets
QtCore = qt_compat2.import_module('QtCore')
QtGui = qt_compat2.import_module('QtGui')
QtWidgets = qt_compat2.import_module('QtWidgets')
_error_strategy = 'Error'
_skip_strategy = 'Skip Item'
_empty_strategy = 'Create Empty Item'
list_failure_strategies = dict(
[(_error_strategy, 0), (_empty_strategy, 1), (_skip_strategy, 2)])
def _fuzzy_fail_strategy(strategy):
strategy = (strategy or '').lower()
if not strategy:
return _error_strategy
elif 'error' in strategy:
return _error_strategy
elif 'skip' in strategy:
return _skip_strategy
elif 'empty' in strategy:
return _empty_strategy
else:
return _error_strategy
_list_re = re.compile(r'\[(.*)\]')
[docs]
class Apply(Node):
"""
Apply Lambda function to Argument.
When the Lambda function has multiple input arguments Apply will do a
partial application producing a new Lambda function with with the argument
bound as the value for the first argument.
When the Lambda function has a single input argument Apply will evaluate
the function to produce a result, taking into account any arguments
previously bound.
"""
name = 'Apply'
description = 'Apply Lambda function to Argument'
nodeid = 'org.sysess.builtin.apply'
author = 'Erik der Hagopian <erik.hagopian@combine.se>'
icon = 'apply.svg'
inputs = Ports([Port.Custom('<a> -> <b>', 'Lambda Function to Apply',
name='Function'),
Port.Custom('<a>', 'Argument', name='Argument')])
outputs = Ports([Port.Custom('<b>', 'Output', name='Output')])
tags = Tags(Tag.Generic.Lambda)
@staticmethod
def apply_eval(function, output, argument, portdict, objects,
socket_bundle=None, parent_identifier=None):
def port_assign(port0, port1):
port0['type'] = port1['type']
port0['file'] = port1['file']
flowdesc, port_descs = function.get()
nodes = flowdesc.nodes
node_deps = flowdesc.node_deps
input_ports = flowdesc.input_ports
output_ports = flowdesc.output_ports
bypass_ports = flowdesc.bypass_ports
input_assign = [port.port for port in port_descs]
input_assign.append(portdict['inputs'][1])
output_assign = [portdict['outputs'][0]]
objects = {} if objects is None else objects
org_objects = dict(objects)
objects[output_assign[0]['file']] = output
objects[input_assign[-1]['file']] = argument
for port_desc in port_descs:
if port_desc.data:
objects[port_desc.port['file']] = port_desc.data
input_assign_map = {}
# Assigned input ports need to have their port description replaced.
for ports, assign in zip(input_ports, input_assign or []):
for port in ports:
input_assign_map[port] = assign
output_assign_map = dict(zip(output_ports, output_assign or []))
if bypass_ports:
bypass_port = int(bypass_ports[0])
node_parameters = {
'ports': {'inputs': [input_assign[bypass_port]],
'outputs': [output_assign[0]]},
'parameters': {'data': {'type': 'group'}, 'type': 'json'},
'id': 'org.sysess.builtin.propagate'
}
node = [None, [None, 'Propagate', f'{{{uuidgen.uuid4()}}}',
node_parameters]]
nodes.append(node)
output_filename_map = {}
input_filename_map = {}
filenames_set = set()
for node_data in nodes:
node_parameters = node_data[1][3]
for portmap, filemap, group in [
(input_assign_map, input_filename_map, 'inputs'),
(output_assign_map, output_filename_map, 'outputs')
]:
for port0 in node_parameters['ports'][group]:
port1 = portmap.get(port0['uuid'])
if port1:
filemap[port0['file']] = port1['file']
port_assign(port0, port1)
filenames_set.add(port0['file'])
prefix = uuidgen.uuid4()
filenames = list(sorted(filenames_set))
filenames_index = {f: i for i, f in enumerate(filenames)}
external_files = set()
for portlist in [input_assign, output_assign]:
external_files.update(p['file'] for p in portlist)
for node_data in nodes:
# For each assigned output port, any connected inputs also need
# to be assigned.
# Do not replace empty filename as these indicate a removed
# optional port.
node_parameters = node_data[1][3]
for port0 in node_parameters['ports']['inputs']:
filename = port0['file']
if filename and filename in output_filename_map:
port0['file'] = output_filename_map[filename]
for group in ['inputs', 'outputs']:
for port0 in node_parameters['ports'][group]:
filename = port0['file']
# Replace internal filenames with generated non-existing
# filenames.
if filename and filename not in external_files:
port0['file'] = (
f'{{{prefix}}}.{{{filenames_index[filename]}}}')
builtin.flow_execute(
nodes, node_deps, {},
objects, socket_bundle,
parent_identifier=parent_identifier)
objects.clear()
objects.update(org_objects)
@staticmethod
def apply(function, output, port, data):
function.apply(sylambda.PortDesc(port, data))
output.source(function)
def execute(self, node_context):
function = node_context.input['Function']
output = node_context.output['Output']
argument = node_context.input['Argument']
ports = node_context.definition['ports']
nargs = len(function.arguments())
if nargs == 0:
assert False
if nargs == 1:
self.apply_eval(
function, output, argument, ports,
node_context._objects, self.socket_bundle,
self._parent_identifier or self._identifier)
elif nargs > 1:
port = ports['inputs'][1]
self.apply(function, output, port, argument)
[docs]
class Map(Node):
"""
Map Lambda function over each element in argument list.
Output list contains the result of element-wise application of the Lambda
function on each input element.
In contrast with Apply, partial application is not supported.
"""
name = 'Map'
description = 'Map Lambda function over each element in argument list'
nodeid = 'org.sysess.builtin.map'
author = 'Erik der Hagopian <erik.hagopian@combine.se>'
icon = 'map.svg'
parameters = sy_parameters()
parameters.set_list(
'fail_strategy', label='Action on exception',
list=list_failure_strategies.keys(), value=[0],
description='Decide how failure to process an item should be handled.',
editor=Util.combo_editor())
inputs = Ports([Port.Custom('<a> -> <b>', 'Lambda Function to Map',
name='Function'),
Port.Custom('[<a>]', 'Argument List', name='List')])
outputs = Ports([Port.Custom('[<b>]', 'Output List', name='List')])
tags = Tags(Tag.Generic.Lambda)
@staticmethod
def apply_eval(function, output_list, argument_list, portdict,
objects, progress=None, fail_strategy=_error_strategy,
socket_bundle=None, parent_identifier=None):
objects = {} if objects is None else objects
iter_portdict = copy.deepcopy(portdict)
out_list_port = iter_portdict['inputs'][1]
arg_list_port = iter_portdict['outputs'][0]
# Peal of the list from the type for the invoked lambda.
for list_port in [out_list_port, arg_list_port]:
match = _list_re.fullmatch(list_port['type'])
if match is not None:
list_port['type'] = match.groups()[0]
nargs = len(argument_list)
for i, argument in enumerate(argument_list):
if progress is not None:
progress(100.0 * i / nargs)
output = output_list.create()
try:
Apply.apply_eval(function, output, argument, iter_portdict,
dict(objects),
builtin.sub_progress_socket_bundle(
socket_bundle, i, nargs),
parent_identifier=parent_identifier)
output_list.append(output)
except Exception:
if fail_strategy == _error_strategy:
raise exceptions.SyListIndexError(i, sys.exc_info())
elif fail_strategy == _empty_strategy:
output_list.append(output_list.create())
print('Encountered an error for item {}. '
'Creating empty item.'.format(i))
else:
print('Encountered an error for item {}. '
'Skipping item.'.format(i))
if progress is not None:
progress(100.0)
def update_parameters(self, parameter_root):
if ('fail_strategy' in parameter_root and
parameter_root['fail_strategy'].selected == 'Skip File'):
parameter_root['fail_strategy'].selected = _skip_strategy
# .list and .value will be updated automatically to match
# .value_names in the default update_parameters.
def execute(self, node_context):
parameters = node_context.parameters
function = node_context.input['Function']
argument = node_context.input['List']
if node_context._is_own_input(argument):
builtin.set_read_through(argument)
output = node_context.output['List']
ports = node_context.definition['ports']
nargs = len(function.arguments())
if nargs == 0:
assert False
if nargs >= 1:
self.apply_eval(
function, output, argument, ports,
node_context._objects, self.set_progress,
_fuzzy_fail_strategy(parameters['fail_strategy'].selected),
self.socket_bundle,
self._parent_identifier or self._identifier)
elif nargs > 1:
raise NotImplementedError(
'Partial map application is not supported.')
class TypeEditorParameterView(ParameterView):
def __init__(self, node_context, options, parent=None):
super().__init__(parent=parent)
self._parameters = node_context.parameters
self._types_combo = sywidgets.ValidatedTextComboBox()
value = self._parameters['datatype'].value
options = list(options)
if not any(
option for option, tooltip in options if option == value):
options.append((value, 'Saved data type'))
options = list(sorted(
options, key=lambda x: (x[0].lstrip()[:1] in '<[{(', x)))
for i, (name, tooltip) in enumerate(options):
self._types_combo.addItem(name)
self._types_combo.setItemData(
i, tooltip, role=QtCore.Qt.ItemDataRole.ToolTipRole)
self._types_combo.lineEdit().setBuilder(self._text_type_builder)
index = self._types_combo.findText(value)
if index > 0:
self._types_combo.setCurrentIndex(index)
else:
self._types_combo.setCurrentText(value)
self._types_combo.setToolTip(
self._parameters['datatype'].gui().toolTip())
layout = QtWidgets.QVBoxLayout()
layout.addWidget(self._types_combo)
self.setLayout(layout)
def _text_type_builder(self, value):
try:
types.from_type_expand(types.from_string(value))
except Exception:
raise sywidgets.ValidationError(f'Invalid data type: {value}')
return str(value)
def save_parameters(self):
self._parameters['datatype'].value = self._types_combo.value()
class ExtractParameterView(TypeEditorParameterView):
def __init__(self, node_context, parent=None):
options = [('() -> ()',
'Function with no argument and no result, data type')]
super().__init__(node_context, options, parent=parent)
ExtractLambdasHelper.add_fail_strategy(self._parameters)
fail_strategy = self._parameters['fail_strategy'].gui()
self.layout().addWidget(fail_strategy)
class ExtractLambdasHelper:
@staticmethod
def add_fail_strategy(parameters):
if 'fail_strategy' not in parameters:
error_handling = dict([
('error', 'Error'),
('skip', 'Skip item'),
])
parameters.set_string(
'fail_strategy',
label='Strategy for handling extraction issues',
description='How should the node handle the situation when '
'the extract fails?',
value='skip',
editor=Util.combo_editor(
options=error_handling))
[docs]
class Empty(Node):
"""
This node can only run successfully if it the output port has a concrete
type. This can be acheived either by connecting the output port to other
nodes so that the type becomes concrete or by specifying the type in the
configuration. For syntax and examples of how to specify port types in the
configuration, see :ref:`custom_ports`.
"""
author = ('Erik der Hagopian <erik.hagopian@combine.se>, '
'Benedikt Ziegler <benedikt.ziegler@combine.se>')
name = 'Empty'
description = 'Generate empty data of inferred or specified type'
nodeid = 'org.sysess.builtin.empty'
icon = 'empty.svg'
tags = Tags(Tag.Input.Generate)
inputs = Ports([])
outputs = Ports([Port.Custom(
'<a>',
'Output port containing empty data (must be connected or specified)')])
parameters = sy_parameters()
parameters.set_string(
'datatype', label='Datatype',
description='Define the datatype of the output port.', value='<a>')
def exec_parameter_view(self, node_context):
options = []
typealiases = list(sorted(types.typealias_names()))
for typealias in typealiases:
options.append((typealias, 'Alias data type'))
options.append(('<a>', 'Generic data type'))
options.append(('[<a>]', 'Generic list data type'))
options.append(('{<a>}', 'Generic dict data type'))
options.append(('(<a>,<b>)', 'Generic tuple 2 data type'))
return TypeEditorParameterView(node_context, options)
def execute(self, node_context):
str_type = node_context.definition['ports']['outputs'][0]['type']
arg_type = types.from_string(str_type)
if types.generics(arg_type):
raise exceptions.SyDataError(
'Output port must be connected and non-generic')
class Propagate(Node):
"""Propagate input to output."""
author = 'Erik der Hagopian <erik.hagopian@combine.se>'
name = 'Propagate'
description = 'Propagate input to output'
nodeid = 'org.sysess.builtin.propagate'
icon = 'empty.svg'
tags = Tags(Tag.Hidden.Internal)
inputs = Ports([Port.Custom('<a>', 'Input')])
outputs = Ports([Port.Custom('<a>', 'Output')])
parameters = sy_parameters()
def execute(self, node_context):
node_context.output[0].source(node_context.input[0])