Source code for sympathy.app.interactive

# This file is part of Sympathy for Data.
# Copyright (c) 2015-2016 Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import os
import sys
import copy
import uuid
import contextlib
import argparse
from pathlib import Path
from typing import Type, Any

from PySide6 import QtCore

from sympathy.utils import filebase
from sympathy.utils.context import deprecated_method
from sympathy.platform.parameter_helper import ParameterGroup, ParameterRoot
from sympathy.platform import port as port_platform
from sympathy.utils.prim import fuzzy_filter, group_pairs
from sympathy.platform import types
from sympathy.platform import state
from sympathy.platform.basicnode import BasicNode
from sympathy.platform.library_cache import LibraryCache
from sympathy.platform.hooks import node_execution_started
from sympathy.platform import feature
from sympathy.app import library_creator
from sympathy.app.tasks.task_worker import get_worker_settings
from sympathy.app.state import LaunchState
from sympathy.worker import state as worker_state
from sympathy import launch


[docs] class InteractiveNotNodeError(Exception): pass
@contextlib.contextmanager def _instantiated_ports(ports, inputs=None): if inputs is None: yield return org_inp = ports.get('inputs') org_out = ports.get('outputs') inp = [dict(port) for port in org_inp or []] out = [dict(port) for port in org_out or []] mapping = {} for input_, port in zip(inputs, inp): dt = types.parse_base_line(port['type']) port['type_base'] = dt types.match(dt, input_.container_type, mapping) for port in out: dt = types.parse_base_line(port['type']) port['type_base'] = dt for port in inp + out: port['type'] = str(types.instantiate(port['type_base'], mapping)) del port['type_base'] ports['inputs'] = inp ports['outputs'] = out yield if org_inp is None: ports.pop('inputs') else: ports['inputs'] = org_inp if org_out is None: ports.pop('outputs') else: ports['outputs'] = org_out class _SyiContext: def __init__(self, sys_path): self.__sys_path = sys_path self.__sys_path_before = [] def __enter__(self): self.__sys_path_before = list(sys.path) sys.path[:] = self.__sys_path return self def __exit__(self, *args): sys.path[:] = self.__sys_path_before
[docs] class SyiNode: """ A Sympathy node which can be configured and executed from Python code. Should not be instantiated directly. Instead use :meth:`SyiLibrary.node`. """ def __init__(self, context, node, parameters, filename): parameters = copy.deepcopy(parameters) self.__context = context self.__node = node self.__parameters = parameters self.__syiparameters = SyiParameters( parameters['parameters'].get('data', {})) self.__filename = filename inputs = parameters['ports'].get('inputs', []) outputs = parameters['ports'].get('outputs', []) inputs, outputs = port_platform.instantiate( inputs, outputs, {}) for port in inputs: port['file'] = str(uuid.uuid4()) for port in outputs: port['file'] = str(uuid.uuid4()) parameters['ports']['inputs'] = inputs parameters['ports']['outputs'] = outputs def _input_objects(self, inputs): objects = {} if inputs: input_ports = self.__parameters.get('ports', {}).get('inputs', []) for port, input in zip(input_ports, inputs): objects[port['file']] = input return objects
[docs] def execute(self, inputs: list[Any] | None = None) -> list[Any]: """ Execute node. If the node has input ports, those should be passed in a list. Any output ports are returned in a list. E.g.:: >>> node.execute([table1, table2]) [<sympathy.typeutils.table.Table at 0x000000000000>] """ try: node_execution_started.value() except Exception as e: # Re-raise exception with traceback replaced function. # Child frames should not be included here! try: raise Exception except Exception as ei: raise e.with_traceback(ei.__traceback__) from None self._apply_parameters() ports = self.__parameters['ports'] with _instantiated_ports(ports, inputs): node_inputs = _build_ports(inputs, ports.get('inputs', [])) try: node_context = self.__node.build_execute_context( self.__parameters, None, objects=node_inputs, exclude_output=True) self.__node._execute_with_context(node_context) outputs = [output for output in node_context.output] node_context.close() return outputs finally: state.node_state().cleardata()
[docs] def configure(self, inputs=None): """ Configure node by launching parameter GUI. Requires that `gui=True` was passed to :func:`load_library`. """ self._apply_parameters() return self.__configure(inputs, False)
def __configure(self, inputs=None, return_widget=True): ports = self.__parameters['ports'] with _instantiated_ports(ports, inputs): node_inputs = _build_ports(inputs, ports.get('inputs', [])) node_context = None exclude_input = 'inputs' not in self.__parameters['ports'] try: node_context = self.__node.build_exec_parameter_view_context( self.__parameters, None, objects=node_inputs, exclude_input=exclude_input) res = self.__node._exec_parameter_view_with_context( node_context, return_widget=return_widget) if return_widget: # In this case res is the widget return res elif res is not None: self.parameters = ParameterRoot(res['parameters']['data']) finally: if node_context: node_context.close() state.node_state().cleardata() def _apply_parameters(self): self.__parameters['parameters']['data'] = ( self.__syiparameters.to_dict()) @property def parameters(self): """ Node parameters Can be modified to programmatically configure the node. """ return self.__syiparameters @parameters.setter def parameters(self, value: ParameterRoot): self.__parameters['parameters']['data'] = value.to_dict() self.__syiparameters = SyiParameters(value) @property def filename(self): return self.__filename @property def node_cls(self): return type(self.__node) @classmethod def from_node_cls(cls, node_cls): node = node_cls() module = sys.modules[node_cls.__module__] filename = module.__file__ node_dict = library_creator.get_properties( filename, node_cls.__name__, node_cls) return cls(None, node, node_dict, filename)
class SyiGetAttribute: def __init__(self, params, path): self.__params = params self.__path = path def __getattribute__(self, name): params = object.__getattribute__(self, '_SyiGetAttribute__params') path = object.__getattribute__(self, '_SyiGetAttribute__path') data = params for seg in path: data = data[seg] if name in data.keys(): value = data[name] if isinstance(value, ParameterGroup): return SyiGetAttribute( params, path + [name]) else: return data[name] return object.__getattribute__(self, name) def __dir__(self): params = object.__getattribute__(self, '_SyiGetAttribute__params') path = object.__getattribute__(self, '_SyiGetAttribute__path') data = params for seg in path: data = data[seg] return data.keys() class SyiParameters(ParameterRoot): @property @deprecated_method('7.0.0', 'node.parameters["myparam"]') def attributes(self): return SyiGetAttribute(self, []) @property @deprecated_method('7.0.0', 'node.parameters["myparam"]') def data(self): return self @data.setter def data(self, _): raise Exception( "Setting SyiParameters.data is not supported in Sympathy>=6.0.0. " "Please set the parameters directly on the node instead.")
[docs] class SyiLibrary: """ A library of nodes that can be configured and executed in Python code. Should not be instantiated directly. Instead call :func:`load_library`. """ def __init__(self, context, library, name_library, paths): self.__context = context self.__library = dict(group_pairs(library.items())) self.__name_library = dict(group_pairs(name_library)) self.__paths = paths def context(self): return self.__context
[docs] def node(self, nid, fuzzy_names=True) -> SyiNode: """ Attempt to find `nid` in the library. Argument `nid` can be either a node id or a node name. If no matching node can be found a KeyError is raised. If `fuzzy_names` is `True` (the default) and `nid` doesn't match any node exactly, it is used as a pattern that the node name must match. The characters of the pattern must appear in the node name in the same order as in the pattern, but must not be of the same case, and may have other characters in between them. If multiple nodes match the pattern a KeyError is raised. """ def get_node_dict(result): if len(result) == 1: return result[0] elif len(result) > 1: raise KeyError('Multiple names matching: {}'.format( [(v['id'], v.get('name')) for v in result])) node_dict = None for group in [self.__library, self.__name_library]: result = group.get(nid, None) if result is not None: node_dict = get_node_dict(result) if node_dict: break if node_dict is None and fuzzy_names: result = fuzzy_filter(nid, self.__name_library.items()) node_dicts = [node_dict_ for k, v in result for node_dict_ in v] if len(node_dicts) == 1: node_dict = node_dicts[0] elif len(node_dicts) > 1: raise KeyError('Multiple names matching: {}'.format( [(v['id'], v.get('name')) for v in node_dicts])) if node_dict is None: raise KeyError( 'Identifier does not match any nodeid or node name.') if node_dict.get('type') == 'flow': raise InteractiveNotNodeError flow_vars = { 'SY_FLOW_FILEPATH': '', 'SY_FLOW_DIR': '', 'SY_LIBRARY_DIR': '', } with self.__context: local_filename = node_dict['file'] sys.path.insert(0, os.path.dirname(local_filename)) modulename = os.path.splitext(os.path.basename(local_filename))[0] module = __import__(modulename) # TODO: (magnus) if/when interactive can load syx-files, it needs # to also load and run auto migrations, and tell basicnode to # run update_parameters on older nodes with no migrations. return SyiNode( self.__context, getattr(module, node_dict['class'])(), { 'parameters': node_dict['parameters'], 'label': node_dict['name'], 'ports': node_dict['ports'], 'flow_vars': flow_vars, }, local_filename)
[docs] def nodeids(self) -> list[str]: """Return a list of all nodeids in this library.""" return list(self.__library.keys())
def paths(self): return self.__paths
def _build_ports(values, ports): assert values is None or len(values) == len(ports) result = {} if values is None: # Construct dummy data to allow configuration without value. for port in ports: data = filebase.port_maker( port, None, external=None, no_datasource=True) result[port['file']] = data else: for port, value in zip(ports, values): result[port['file']] = value return result def _load_library_with_state(launch_state: LaunchState, library_id=None): import sympathy.platform.library from sympathy.app import util, library_creator, library_manager sys_path = list(sys.path) types.manager.normalize() storage_folder = util.storage_folder(launch_state.qt_config) library_paths = launch_state.properties.global_library_paths common_paths = [] for library in library_paths: common_paths.append(sympathy.platform.library.package_python_path( library)) sys.path.extend(common_paths) context = _SyiContext(list(sys.path)) state.node_state().set_attributes( library_dirs=library_paths, support_dirs=[], worker_settings=get_worker_settings(launch_state), ) installed_libraries = sympathy.platform.library.available_libraries( load=False) library_cache = LibraryCache( Path(storage_folder), library_paths, installed_libraries) try: with library_cache.lock(): result = library_creator.create( library_cache.installed_libraries(), library_paths, storage_folder, launch_state.session_folder) libraries = library_manager.libraries_from_creator(result) if library_id: node_library = { n['id']: n for x in libraries for n in x['nodes'] if n['library_identifier'] == library_id} name_library = [ (n['label'], n) for x in libraries for n in x['nodes'] if n['library_identifier'] == library_id] else: node_library = { n['id']: n for x in libraries for n in x['nodes']} name_library = [ (n['label'], n) for x in libraries for n in x['nodes']] library = SyiLibrary(context, node_library, name_library, library_paths) return library finally: sys.path[:] = sys_path def _has_interactive_worker_state() -> bool: try: wstate = worker_state.get_state() except Exception: return False else: if isinstance(wstate, _InteractiveWorkerState): return True raise Exception("Incorrect worker state.") class _InteractiveWorkerState(worker_state.WorkerState): @classmethod def create(cls, launch_state, app, *args, **kwargs): wstate = super().create(*args, **kwargs) wstate.features = wstate.features + tuple( plugin.create_app_features( launch_state.qt_config, parent=app) for plugin in feature.available_features()) return wstate def _setup(gui=True) -> LaunchState: from sympathy.app import util from sympathy.app.state import LaunchState from sympathy.qt_config import QtConfig launch.setup_environment(os.environ) config = QtConfig() launch_command = 'gui' if gui else 'cli' app_args = argparse.Namespace(inifile=None, launch_command=launch_command) launch_state = LaunchState.create(app_args, config) session_folder = util.create_session_folder(launch_state) app_args.session = session_folder app = QtCore.QCoreApplication.instance() if not app: if gui: app = config.create_gui_app([]) else: app = config.create_cli_app([]) if not _has_interactive_worker_state(): worker_args = argparse.Namespace() wstate = _InteractiveWorkerState.create( launch_state, app, worker_args, config) wstate.setup() worker_state.set_state(wstate) return launch_state
[docs] def load_library( gui: bool = True, library_id: str | None = None, ) -> SyiLibrary: """ Load the node library. If `library_id` is `None`, then all node libraries available in Sympathy will be loaded. Otherwise load only the library with the specified library id. In order to be able to launch node configuration GUIs using :meth:`SyiNode.configure`, `gui` must be `True` (the default). Setting `gui` to `False` might be required for scripts running without a graphical environment (e.g. headless servers). Runs :func:`start_session` if not already in a session. """ launch_state = _setup(gui) return _load_library_with_state(launch_state, library_id=library_id)
[docs] def node(node_cls: Type[BasicNode]) -> SyiNode: """ Get a :class:`SyiNode` from a node class. Runs :func:`start_session` if not already in a session. """ start_session() return SyiNode.from_node_cls(node_cls)
[docs] def start_session(): """ Start a session. Consider using :func:`session` instead to ensure that :func:`end_session` gets called at the end of the sesssion. Calling this if not in a session does nothing. """ if not _has_interactive_worker_state(): _setup()
[docs] def end_session(): """ End a session, tearing down the setup performed by :func:`start_session`. This function needs to be called in order to return a floating license. Consider using :func:`session` to ensure that this gets called at the end of the sesssion. Calling this if not in a session does nothing. """ if _has_interactive_worker_state(): worker_state.remove_state()
[docs] @contextlib.contextmanager def session(): """ Context manager for starting and ending a session. This context manager can be used to ensure that :func:`end_session` is always called at the end of the session, even if there is an exception. Example:: from sympathy.app import interactive with interactive.session(): library = interactive.load_library() node = library.node('Random Table') print(node.execute()[0]) """ if _has_interactive_worker_state(): yield else: _setup() try: yield finally: end_session()