# This file is part of Sympathy for Data.
# Copyright (c) 2015-2016 System Engineering Software Society
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data. If not, see <http://www.gnu.org/licenses/>.
from __future__ import (print_function, division, unicode_literals,
absolute_import)
import json
import base64
import copy
import re
import sys
import os
import subprocess
from collections import OrderedDict
import six
from sympathy.api.nodeconfig import Tag, Tags, Port, Ports
from sympathy.platform.gennode import Node, BasicNode
from sympathy.types import exception as syexc
from sympathy.types import sylambda
from sympathy.utils.parameter_helper import ParameterRoot
from Gui import builtin
from sympathy.platform.gennode import Util
from sympathy.api.exceptions import SyDataError, sywarn
from sympathy.types import types
from sympathy.platform import os_support
from sympathy.platform import version_support as vs
list_failure_strategies = OrderedDict(
[('Error', 0), ('Create Empty Item', 1), ('Skip Item', 2)])
list_re = re.compile('^\[(.*)\]$')
[docs]class Apply(BasicNode):
"""
Apply Lambda function to Argument.
When the Lambda function has multiple input arguments Apply will do a
partial application producing a new Lambda function with with the argument
bound as the value for the first argument.
When the Lambda function has a single input argument Apply will evaluate
the function to produce a result, taking into account any arguments
previously bound.
"""
name = 'Apply'
description = 'Apply Lambda function to Argument'
nodeid = 'org.sysess.builtin.apply'
author = 'Erik der Hagopian <erik.hagopian@sysess.org>'
copyright = '(C) 2015-2016 System Engineering Software Society'
version = '1.0'
icon = 'apply.svg'
inputs = Ports([Port.Custom('<a> -> <b>', 'Lambda Function to Apply',
name='Function'),
Port.Custom('<a>', 'Argument', name='Argument')])
outputs = Ports([Port.Custom('<b>', 'Output', name='Output')])
tags = Tags(Tag.Generic.Lambda)
@staticmethod
def apply_eval(function, output, argument, portdict, writeback, objects,
socket_bundle=None):
flowdesc, ports = function.get()
nodes = [json.loads(base64.b64decode(node).decode('ascii'))
for node in flowdesc.nodes]
input_nodes = flowdesc.input_nodes
output_nodes = flowdesc.output_nodes
input_ports = flowdesc.input_ports
output_ports = flowdesc.output_ports
bypass_ports = flowdesc.bypass_ports
node_settings = flowdesc.node_settings
input_assign = [json.loads(base64.b64decode(port.data).decode('ascii'))
for port in ports]
input_assign.append(portdict['inputs'][1])
output_assign = [portdict['outputs'][0]]
objects = {} if objects is None else objects
objects[output_assign[0]['file']] = output
objects[input_assign[-1]['file']] = argument
builtin.flow_execute(nodes, input_nodes, output_nodes, input_ports,
output_ports, bypass_ports, {}, input_assign,
output_assign,
objects, writeback, node_settings, socket_bundle)
@staticmethod
def apply(function, output, portdict):
port = portdict['inputs'][1]
function.apply(
sylambda.PortDesc(
base64.b64encode(json.dumps(port).encode('ascii'))))
output.source(function)
def execute_basic(self, node_context):
def writeback(output):
def inner():
try:
output.close()
except ValueError:
# TODO (Erik): Fix proper handling with custom exception.
pass
except syexc.WritebackReadOnlyError:
pass
return inner
own_objects = node_context._own_objects
function = node_context.input['Function']
output = node_context.output['Output']
writeback_fn = None
if output in own_objects:
writeback_fn = writeback(output)
ports = node_context.definition['ports']
nargs = len(function.arguments())
if nargs == 0:
assert(False)
if nargs == 1:
argument = node_context.input['Argument']
if argument in own_objects:
builtin.set_read_through(argument)
self.apply_eval(
function, output, argument, ports,
writeback_fn, node_context._objects, self.socket_bundle)
elif nargs > 1:
self.apply(function, output, ports)
if writeback_fn:
writeback_fn()
[docs]class Map(BasicNode):
"""
Map Lambda function over each element in argument list.
Output list contains the result of element-wise application of the Lambda
function on each input element.
In contrast with Apply, partial application is not supported.
"""
name = 'Map'
description = 'Map Lambda function over each element in argument list'
nodeid = 'org.sysess.builtin.map'
author = 'Erik der Hagopian <erik.hagopian@sysess.org>'
copyright = '(C) 2015-2016 System Engineering Software Society'
version = '1.0'
icon = 'map.svg'
parameters = ParameterRoot()
parameters.set_list(
'fail_strategy', label='Action on exception',
list=list_failure_strategies.keys(), value=[0],
description='Decide how failure to process an item should be handled.',
editor=Util.combo_editor().value())
inputs = Ports([Port.Custom('<a> -> <b>', 'Lambda Function to Map',
name='Function'),
Port.Custom('[<a>]', 'Argument List', name='List')])
outputs = Ports([Port.Custom('[<b>]', 'Output List', name='List')])
tags = Tags(Tag.Generic.Lambda)
@staticmethod
def apply_eval(function, output_list, argument_list, portdict,
objects, progress=None, fail_strategy=0, top_level=False,
socket_bundle=None):
def writeback(output):
def inner():
output_list.append(output)
return inner
objects = {} if objects is None else objects
org_objects = dict(objects)
iter_portdict = copy.deepcopy(portdict)
out_list_port = iter_portdict['inputs'][1]
arg_list_port = iter_portdict['outputs'][0]
# Peal of the list from the type for the invoked lambda.
for list_port in [out_list_port, arg_list_port]:
match = list_re.match(list_port['type'])
if match is not None:
list_port['type'] = match.groups()[0]
nargs = len(argument_list)
for i, argument in enumerate(argument_list):
if progress is not None:
progress(100.0 * i / nargs)
output = output_list.create()
try:
if top_level:
objects = dict(org_objects)
Apply.apply_eval(function, output, argument, iter_portdict,
writeback(output), dict(objects),
builtin.sub_progress_socket_bundle(
socket_bundle, i, nargs))
except:
if fail_strategy == list_failure_strategies['Error']:
print('Encountered an error for item {}.'.format(i))
raise
elif fail_strategy == list_failure_strategies[
'Create Empty Item']:
output_list.append(output_list.create())
print('Encountered an error for item {}. '
'Creating empty item.'.format(i))
else:
print('Encountered an error for item {}. '
'Skipping item.'.format(i))
if progress is not None:
progress(100.0)
def execute_basic(self, node_context):
own_objects = node_context._own_objects
parameters = ParameterRoot(node_context.parameters)
function = node_context.input['Function']
argument = node_context.input['List']
if argument in own_objects:
builtin.set_read_through(argument)
output = node_context.output['List']
top_level = output in own_objects
if top_level:
builtin.set_write_through(output)
ports = node_context.definition['ports']
nargs = len(function.arguments())
if nargs == 0:
assert(False)
if nargs >= 1:
self.apply_eval(
function, output, argument, ports,
node_context._objects, self.set_progress,
parameters['fail_strategy'].value[0], top_level,
self.socket_bundle)
elif nargs > 1:
raise NotImplementedError(
'Partial map application is not supported.')
if top_level:
output.close()
[docs]class Empty(Node):
"""Generate empty data of inferred or defined datatype."""
author = ('Erik der Hagopian <erik.hagopian@sysess.org>, '
'Benedikt Ziegler <benedikt.ziegler@combine.se>')
copyright = '(C) 2016 System Engineering Software Society'
name = 'Empty'
description = 'Generate empty data of inferred or defined datatype'
nodeid = 'org.sysess.builtin.empty'
icon = 'empty.svg'
version = '1.1'
tags = Tags(Tag.Input.Generate)
inputs = Ports([])
outputs = Ports([Port.Custom(
'<a>',
'Output port containing empty data (must be connected or defined)')])
parameters = ParameterRoot()
parameters.set_string(
'datatype', label='Datatype',
description='Define the datatype of the output port.', value='<a>')
def verify_parameters(self, node_context):
return len(node_context.parameters['datatype'].value)
def execute(self, node_context):
str_type = node_context.definition['ports']['outputs'][0]['type']
arg_type = types.from_string(str_type)
if types.generics(arg_type):
raise SyDataError(
'Output port must be connected and non-generic')
[docs]class Propagate(Node):
"""Propagate input to output."""
author = 'Erik der Hagopian <erik.hagopian@sysess.org>'
copyright = '(C) 2017 System Engineering Software Society'
name = 'Propagate'
description = 'Propagate input to output'
nodeid = 'org.sysess.builtin.propagate'
icon = 'empty.svg'
version = '1.0'
tags = Tags(Tag.Input.Generate)
inputs = Ports([Port.Custom('<a>', 'Input')])
outputs = Ports([Port.Custom('<a>', 'Output')])
parameters = ParameterRoot()
def execute(self, node_context):
node_context.output[0].source(node_context.input[0])