# This file is part of Sympathy for Data.
# Copyright (c) 2015-2016 Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import os
import sys
import copy
import uuid
import contextlib
import argparse
from pathlib import Path
from typing import Type, Any
from PySide6 import QtCore
from sympathy.utils import filebase
from sympathy.utils.context import deprecated_method
from sympathy.platform.parameter_helper import ParameterGroup, ParameterRoot
from sympathy.platform import port as port_platform
from sympathy.utils.prim import fuzzy_filter, group_pairs
from sympathy.platform import types
from sympathy.platform import state
from sympathy.platform.basicnode import BasicNode
from sympathy.platform.library_cache import LibraryCache
from sympathy.platform.hooks import node_execution_started
from sympathy.platform import feature
from sympathy.app import library_creator
from sympathy.app.tasks.task_worker import get_worker_settings
from sympathy.app.state import LaunchState
from sympathy.worker import state as worker_state
from sympathy import launch
[docs]
class InteractiveNotNodeError(Exception):
pass
@contextlib.contextmanager
def _instantiated_ports(ports, inputs=None):
if inputs is None:
yield
return
org_inp = ports.get('inputs')
org_out = ports.get('outputs')
inp = [dict(port) for port in org_inp or []]
out = [dict(port) for port in org_out or []]
mapping = {}
for input_, port in zip(inputs, inp):
dt = types.parse_base_line(port['type'])
port['type_base'] = dt
types.match(dt, input_.container_type, mapping)
for port in out:
dt = types.parse_base_line(port['type'])
port['type_base'] = dt
for port in inp + out:
port['type'] = str(types.instantiate(port['type_base'], mapping))
del port['type_base']
ports['inputs'] = inp
ports['outputs'] = out
yield
if org_inp is None:
ports.pop('inputs')
else:
ports['inputs'] = org_inp
if org_out is None:
ports.pop('outputs')
else:
ports['outputs'] = org_out
class _SyiContext:
def __init__(self, sys_path):
self.__sys_path = sys_path
self.__sys_path_before = []
def __enter__(self):
self.__sys_path_before = list(sys.path)
sys.path[:] = self.__sys_path
return self
def __exit__(self, *args):
sys.path[:] = self.__sys_path_before
[docs]
class SyiNode:
"""
A Sympathy node which can be configured and executed from Python code.
Should not be instantiated directly. Instead use :meth:`SyiLibrary.node`.
"""
def __init__(self, context, node, parameters, filename):
parameters = copy.deepcopy(parameters)
self.__context = context
self.__node = node
self.__parameters = parameters
self.__syiparameters = SyiParameters(
parameters['parameters'].get('data', {}))
self.__filename = filename
inputs = parameters['ports'].get('inputs', [])
outputs = parameters['ports'].get('outputs', [])
inputs, outputs = port_platform.instantiate(
inputs, outputs, {})
for port in inputs:
port['file'] = str(uuid.uuid4())
for port in outputs:
port['file'] = str(uuid.uuid4())
parameters['ports']['inputs'] = inputs
parameters['ports']['outputs'] = outputs
def _input_objects(self, inputs):
objects = {}
if inputs:
input_ports = self.__parameters.get('ports', {}).get('inputs', [])
for port, input in zip(input_ports, inputs):
objects[port['file']] = input
return objects
[docs]
def execute(self, inputs: list[Any] | None = None) -> list[Any]:
"""
Execute node.
If the node has input ports, those should be passed in a list. Any
output ports are returned in a list. E.g.::
>>> node.execute([table1, table2])
[<sympathy.typeutils.table.Table at 0x000000000000>]
"""
try:
node_execution_started.value()
except Exception as e:
# Re-raise exception with traceback replaced function.
# Child frames should not be included here!
try:
raise Exception
except Exception as ei:
raise e.with_traceback(ei.__traceback__) from None
self._apply_parameters()
ports = self.__parameters['ports']
with _instantiated_ports(ports, inputs):
node_inputs = _build_ports(inputs, ports.get('inputs', []))
try:
node_context = self.__node.build_execute_context(
self.__parameters, None, objects=node_inputs,
exclude_output=True)
self.__node._execute_with_context(node_context)
outputs = [output for output in node_context.output]
node_context.close()
return outputs
finally:
state.node_state().cleardata()
def __configure(self, inputs=None, return_widget=True):
ports = self.__parameters['ports']
with _instantiated_ports(ports, inputs):
node_inputs = _build_ports(inputs, ports.get('inputs', []))
node_context = None
exclude_input = 'inputs' not in self.__parameters['ports']
try:
node_context = self.__node.build_exec_parameter_view_context(
self.__parameters, None, objects=node_inputs,
exclude_input=exclude_input)
res = self.__node._exec_parameter_view_with_context(
node_context, return_widget=return_widget)
if return_widget:
# In this case res is the widget
return res
elif res is not None:
self.parameters = ParameterRoot(res['parameters']['data'])
finally:
if node_context:
node_context.close()
state.node_state().cleardata()
def _apply_parameters(self):
self.__parameters['parameters']['data'] = (
self.__syiparameters.to_dict())
@property
def parameters(self):
"""
Node parameters
Can be modified to programmatically configure the node.
"""
return self.__syiparameters
@parameters.setter
def parameters(self, value: ParameterRoot):
self.__parameters['parameters']['data'] = value.to_dict()
self.__syiparameters = SyiParameters(value)
@property
def filename(self):
return self.__filename
@property
def node_cls(self):
return type(self.__node)
@classmethod
def from_node_cls(cls, node_cls):
node = node_cls()
module = sys.modules[node_cls.__module__]
filename = module.__file__
node_dict = library_creator.get_properties(
filename, node_cls.__name__, node_cls)
return cls(None, node, node_dict, filename)
class SyiGetAttribute:
def __init__(self, params, path):
self.__params = params
self.__path = path
def __getattribute__(self, name):
params = object.__getattribute__(self, '_SyiGetAttribute__params')
path = object.__getattribute__(self, '_SyiGetAttribute__path')
data = params
for seg in path:
data = data[seg]
if name in data.keys():
value = data[name]
if isinstance(value, ParameterGroup):
return SyiGetAttribute(
params, path + [name])
else:
return data[name]
return object.__getattribute__(self, name)
def __dir__(self):
params = object.__getattribute__(self, '_SyiGetAttribute__params')
path = object.__getattribute__(self, '_SyiGetAttribute__path')
data = params
for seg in path:
data = data[seg]
return data.keys()
class SyiParameters(ParameterRoot):
@property
@deprecated_method('7.0.0', 'node.parameters["myparam"]')
def attributes(self):
return SyiGetAttribute(self, [])
@property
@deprecated_method('7.0.0', 'node.parameters["myparam"]')
def data(self):
return self
@data.setter
def data(self, _):
raise Exception(
"Setting SyiParameters.data is not supported in Sympathy>=6.0.0. "
"Please set the parameters directly on the node instead.")
[docs]
class SyiLibrary:
"""
A library of nodes that can be configured and executed in Python code.
Should not be instantiated directly. Instead call :func:`load_library`.
"""
def __init__(self, context, library, name_library, paths):
self.__context = context
self.__library = dict(group_pairs(library.items()))
self.__name_library = dict(group_pairs(name_library))
self.__paths = paths
def context(self):
return self.__context
[docs]
def node(self, nid, fuzzy_names=True) -> SyiNode:
"""
Attempt to find `nid` in the library.
Argument `nid` can be either a node id or a node name. If no matching
node can be found a KeyError is raised.
If `fuzzy_names` is `True` (the default) and `nid` doesn't match any
node exactly, it is used as a pattern that the node name must match.
The characters of the pattern must appear in the node name in the same
order as in the pattern, but must not be of the same case, and may have
other characters in between them. If multiple nodes match the pattern a
KeyError is raised.
"""
def get_node_dict(result):
if len(result) == 1:
return result[0]
elif len(result) > 1:
raise KeyError('Multiple names matching: {}'.format(
[(v['id'], v.get('name')) for v in result]))
node_dict = None
for group in [self.__library, self.__name_library]:
result = group.get(nid, None)
if result is not None:
node_dict = get_node_dict(result)
if node_dict:
break
if node_dict is None and fuzzy_names:
result = fuzzy_filter(nid, self.__name_library.items())
node_dicts = [node_dict_
for k, v in result for node_dict_ in v]
if len(node_dicts) == 1:
node_dict = node_dicts[0]
elif len(node_dicts) > 1:
raise KeyError('Multiple names matching: {}'.format(
[(v['id'], v.get('name')) for v in node_dicts]))
if node_dict is None:
raise KeyError(
'Identifier does not match any nodeid or node name.')
if node_dict.get('type') == 'flow':
raise InteractiveNotNodeError
flow_vars = {
'SY_FLOW_FILEPATH': '',
'SY_FLOW_DIR': '',
'SY_LIBRARY_DIR': '',
}
with self.__context:
local_filename = node_dict['file']
sys.path.insert(0, os.path.dirname(local_filename))
modulename = os.path.splitext(os.path.basename(local_filename))[0]
module = __import__(modulename)
# TODO: (magnus) if/when interactive can load syx-files, it needs
# to also load and run auto migrations, and tell basicnode to
# run update_parameters on older nodes with no migrations.
return SyiNode(
self.__context,
getattr(module, node_dict['class'])(),
{
'parameters': node_dict['parameters'],
'label': node_dict['name'],
'ports': node_dict['ports'],
'flow_vars': flow_vars,
},
local_filename)
[docs]
def nodeids(self) -> list[str]:
"""Return a list of all nodeids in this library."""
return list(self.__library.keys())
def paths(self):
return self.__paths
def _build_ports(values, ports):
assert values is None or len(values) == len(ports)
result = {}
if values is None:
# Construct dummy data to allow configuration without value.
for port in ports:
data = filebase.port_maker(
port, None, external=None, no_datasource=True)
result[port['file']] = data
else:
for port, value in zip(ports, values):
result[port['file']] = value
return result
def _load_library_with_state(launch_state: LaunchState, library_id=None):
import sympathy.platform.library
from sympathy.app import util, library_creator, library_manager
sys_path = list(sys.path)
types.manager.normalize()
storage_folder = util.storage_folder(launch_state.qt_config)
library_paths = launch_state.properties.global_library_paths
common_paths = []
for library in library_paths:
common_paths.append(sympathy.platform.library.package_python_path(
library))
sys.path.extend(common_paths)
context = _SyiContext(list(sys.path))
state.node_state().set_attributes(
library_dirs=library_paths,
support_dirs=[],
worker_settings=get_worker_settings(launch_state),
)
installed_libraries = sympathy.platform.library.available_libraries(
load=False)
library_cache = LibraryCache(
Path(storage_folder), library_paths, installed_libraries)
try:
with library_cache.lock():
result = library_creator.create(
library_cache.installed_libraries(), library_paths,
storage_folder, launch_state.session_folder)
libraries = library_manager.libraries_from_creator(result)
if library_id:
node_library = {
n['id']: n for x in libraries for n in x['nodes']
if n['library_identifier'] == library_id}
name_library = [
(n['label'], n) for x in libraries for n in x['nodes']
if n['library_identifier'] == library_id]
else:
node_library = {
n['id']: n for x in libraries for n in x['nodes']}
name_library = [
(n['label'], n) for x in libraries for n in x['nodes']]
library = SyiLibrary(context, node_library, name_library,
library_paths)
return library
finally:
sys.path[:] = sys_path
def _has_interactive_worker_state() -> bool:
try:
wstate = worker_state.get_state()
except Exception:
return False
else:
if isinstance(wstate, _InteractiveWorkerState):
return True
raise Exception("Incorrect worker state.")
class _InteractiveWorkerState(worker_state.WorkerState):
@classmethod
def create(cls, launch_state, app, *args, **kwargs):
wstate = super().create(*args, **kwargs)
wstate.features = wstate.features + tuple(
plugin.create_app_features(
launch_state.qt_config, parent=app)
for plugin in feature.available_features())
return wstate
def _setup(gui=True) -> LaunchState:
from sympathy.app import util
from sympathy.app.state import LaunchState
from sympathy.qt_config import QtConfig
launch.setup_environment(os.environ)
config = QtConfig()
launch_command = 'gui' if gui else 'cli'
app_args = argparse.Namespace(inifile=None, launch_command=launch_command)
launch_state = LaunchState.create(app_args, config)
session_folder = util.create_session_folder(launch_state)
app_args.session = session_folder
app = QtCore.QCoreApplication.instance()
if not app:
if gui:
app = config.create_gui_app([])
else:
app = config.create_cli_app([])
if not _has_interactive_worker_state():
worker_args = argparse.Namespace()
wstate = _InteractiveWorkerState.create(
launch_state, app, worker_args, config)
wstate.setup()
worker_state.set_state(wstate)
return launch_state
[docs]
def load_library(
gui: bool = True,
library_id: str | None = None,
) -> SyiLibrary:
"""
Load the node library.
If `library_id` is `None`, then all node libraries available in Sympathy
will be loaded. Otherwise load only the library with the specified library
id.
In order to be able to launch node configuration GUIs using
:meth:`SyiNode.configure`, `gui` must be `True` (the default). Setting
`gui` to `False` might be required for scripts running without a graphical
environment (e.g. headless servers).
Runs :func:`start_session` if not already in a session.
"""
launch_state = _setup(gui)
return _load_library_with_state(launch_state, library_id=library_id)
[docs]
def node(node_cls: Type[BasicNode]) -> SyiNode:
"""
Get a :class:`SyiNode` from a node class.
Runs :func:`start_session` if not already in a session.
"""
start_session()
return SyiNode.from_node_cls(node_cls)
[docs]
def start_session():
"""
Start a session.
Consider using :func:`session` instead to ensure that :func:`end_session`
gets called at the end of the sesssion.
Calling this if not in a session does nothing.
"""
if not _has_interactive_worker_state():
_setup()
[docs]
def end_session():
"""
End a session, tearing down the setup performed by :func:`start_session`.
This function needs to be called in order to return a floating license.
Consider using :func:`session` to ensure that this gets called at the end
of the sesssion.
Calling this if not in a session does nothing.
"""
if _has_interactive_worker_state():
worker_state.remove_state()
[docs]
@contextlib.contextmanager
def session():
"""
Context manager for starting and ending a session.
This context manager can be used to ensure that :func:`end_session` is
always called at the end of the session, even if there is an exception.
Example::
from sympathy.app import interactive
with interactive.session():
library = interactive.load_library()
node = library.node('Random Table')
print(node.execute()[0])
"""
if _has_interactive_worker_state():
yield
else:
_setup()
try:
yield
finally:
end_session()