Source code for node_graph

# This file is part of Sympathy for Data.
# Copyright (c) 2021 Combine Control Systems
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import time
import pathlib
import os
import enum
import http
from collections import abc
from datetime import datetime, timedelta, UTC
from sympathy.api import qt2 as qt_compat
from sympathy.api import node
from sympathy.api.nodeconfig import Tag, Tags, Ports, Port
from sympathy.api.exceptions import SyConfigurationError, SyDataError
from sympathy.utils import parameters as utils_parameters
from sympathy.utils.parameters import set_output_directory
from sympathy.utils import credentials
from sympathy.platform import item_models
from sympathy.platform import widget_library as sywidgets
from sylib_azure.clients.graph import graph
from sylib_azure import utils
QtWidgets = qt_compat.import_module('QtWidgets')
QtGui = qt_compat.import_module('QtGui')
QtCore = qt_compat.import_module('QtCore')


_graph_port_name = 'graph'

_name_parameter = 'name'
_site_parameter = 'site'
_drive_parameter = 'drive'
_drive_item_parameter = 'drive_item'
_confirm_delete_parameter = 'confirm_delete'


def _set_name_parameter(parameters, name=_name_parameter, label='Name',
                        description='Set name'):
    parameters.set_string(
        name,
        value='',
        label=label,
        editor=node.editors.lineedit_editor(
            placeholder='Required'),
        description=description)


def _set_option_parameter(parameters, name, label, description):
    parameters.set_string(
        name,
        value='',
        label=label,
        editor=node.editors.combo_editor(
            options=[],
            edit=True,
            placeholder='Required',
            include_empty=True),
        description=description)


def _set_confirm_delete_parameter(parameters):
    parameters.set_boolean(
        _confirm_delete_parameter, label='Confirm Delete',
        value=False,
        description=(
            'For safety, ensure all looks good before confirming delete'))


def set_drive_parameter(
        parameters, description='Select drive'):
    _set_option_parameter(parameters, _drive_parameter, 'Drive', description)


def set_drive_item_parameter(
        parameters, description='Select drive item'):
    _set_option_parameter(
        parameters, _drive_item_parameter, 'Drive Item', description)


def set_drive_item_output_parameters(parameters):
    parameters.set_boolean(
        'output_files',
        value=True,
        label='Output files',
        description='Include files in the output')

    parameters.set_boolean(
        'output_folders',
        value=True,
        label='Output folders',
        description='Include folders in the output')


def graph_port(opt=None):
    n = utils.opt_n(opt)
    return graph.GraphPort('Graph', name=_graph_port_name, n=n)


def propagate_graph(input, output):
    if output:
        output[0].set(input.get())


def _drive_item(drive_id, item_id):
    return dict(drive_id=drive_id, item_id=item_id)


def _drive_item_file(drive_id, item_id, filename):
    res = _drive_item(drive_id, item_id)
    res.update(dict(filename=filename))
    return res


def _has_all_keys(item, keys):
    return item is not None and all(v in item for v in keys)


def check_item(item):
    return _has_all_keys(item, ['id', 'name'])


def check_file(item):
    return _has_all_keys(item, ['id', 'name', 'parentReference', 'file'])


def check_folder(item):
    return _has_all_keys(item, ['id', 'name', 'parentReference', 'folder'])


def check_drive(item):
    return _has_all_keys(item, ['id', 'name', 'driveType'])


def check_response_list(item):
    return 'value' in item and isinstance(item['value'], list)


def ensure_file(item):
    if not check_file(item):
        raise SyDataError('File Item is not valid')


def ensure_folder(item):
    if not check_folder(item):
        raise SyDataError('Folder Item is not valid')


def ensure_drive(item):
    if not check_drive(item):
        raise SyDataError('Drive is not valid')


def ensure_response_list(item):
    if not check_response_list(item):
        raise SyDataError('Response List is not valid')


def _put_upload_range_data(graph, upload_url, body, **kwargs):
    return _put_fq_content_json(graph, upload_url, body, **kwargs)


def _get_response_headers(response_error):
    return response_error.response.headers


def _get_json(graph, resource, **kwargs):
    return graph.get_json(resource, **kwargs)


def _get_json_all(graph, resource, **kwargs):
    return graph.get_json_all(resource, **kwargs)


def _get_iter_content(graph, resource, **kwargs):
    return graph.get_iter_content(resource, **kwargs)


def _delete(graph, resource, **kwargs):
    return graph.delete(resource, **kwargs)


def _post_json(graph, resource, body=None, **kwargs):
    return graph.post_json(resource, body=body, **kwargs)


def _post_json_all(graph, resource, body=None, **kwargs):
    return graph.post_json_all(resource, body=body, **kwargs)


def _put_fq_content_json(graph, resource, body=None, **kwargs):
    return graph.put_fq_content_json(resource, body=body, **kwargs)


_get_me_drive = _get_json_all

_get_drive = _get_json_all


class Path:
    def __init__(self, path, name, paths=None, get=None, delete=None,
                 post=None,
                 put=None):
        self.path = path
        self.name = name
        self.paths = list(paths or [])
        self._get = get
        self._post = post
        self._put = put
        self._delete = delete

    def resource(self, **kwargs):
        return self.path.format(**kwargs)

    def _check_set(self, method):
        if method is None:
            raise AssertionError(f'Method is not set for path: {self.name}')

    def _call_method(self, method, graph, path_args,
                     *args,
                     **kwargs):
        self._check_set(method)
        resource = self.resource(**path_args)
        return method(graph, resource, *args, **kwargs)

    def get(self, *args, **kwargs):
        return self._call_method(self._get, *args, **kwargs)

    def post(self, *args, **kwargs):
        return self._call_method(self._post, *args, **kwargs)

    def put(self, *args, **kwargs):
        return self._call_method(self._put, *args, **kwargs)

    def delete(self, *args, **kwargs):
        return self._call_method(self._delete, *args, **kwargs)

    def __str__(self):
        return f'Path(path={repr(self.path)}, name={repr(self.name)})'

    def copy(self):
        return Path(
            self.path, name=self.name, paths=self.paths, get=self._get,
            delete=self._delete, post=self._post, put=self._put)


class API(abc.Mapping):
    def __init__(self, paths):
        self._paths = self._path_tree_to_dict(paths)

    def _flatten_path(self, path):
        paths = []

        def inner(prefix, path):
            prefix = list(prefix)
            prefix.append(path)
            paths.append(prefix)

            for sub in path.paths:
                inner(prefix, sub)

        inner([], path)

        res = {}
        for segs in paths:
            last = segs[-1]
            new = last.copy()
            new.path = '/'.join([''] + [s.path for s in segs])
            res[new.name] = new
        return res

    def _path_tree_to_dict(self, paths):
        return {k: p for path in paths
                for k, p in self._flatten_path(path).items()}

    def __getattr__(self, key):
        return self._paths[key]

    def __getitem__(self, key):
        return self._paths[key]

    def __iter__(self):
        return iter(self._paths)

    def __len__(self):
        return len(self._paths)


_drive_item_path = Path(
    '{item_id}', name='drive_item', delete=_delete, get=_get_json, paths=[
        Path('children', name='drive_item_children',
             get=_get_json_all, post=_post_json),
        Path('content', name='drive_item_content',
             get=_get_iter_content),
        Path('createLink', name='drive_item_create_link', post=_post_json),
        Path('checkout', name='drive_item_checkout', post=_post_json),
        Path('checkin', name='drive_item_checkin', post=_post_json),
    ])

api = API([
    Path('me/drive', name='me_drive', get=_get_json),
    Path('drives', name='drives', paths=[
        Path('{drive_id}', name='drive', get=_get_json, paths=[
            Path('root', name='drive_root', get=_get_json),
            Path('items', name='drive_items', paths=[
                _drive_item_path,
                Path('{item_id}:/{filename}', name='drive_item_relative_file',
                     get=_get_json),
                Path('{item_id}:/{filename}:/createUploadSession',
                     name='drive_item_upload_session',
                     post=_post_json_all),
                Path("{item_id}/search(q='{query}')", name='drive_item_search',
                     get=_get_json_all),
            ])
        ])
    ]),
    Path('sites', name='sites', paths=[
        Path('{site_id}', name='site', get=_get_json, paths=[
            Path('drives', name='site_drives', get=_get_json_all),
        ])
    ]),
    Path('sites?search=*', name='sites_search', get=_get_json_all, paths=[]),
    Path('search/query', name='search_query', post=_post_json_all),
])


DataRole = QtCore.Qt.ItemDataRole.UserRole + 1
PathRole = QtCore.Qt.ItemDataRole.UserRole + 2
FetchedRole = QtCore.Qt.ItemDataRole.UserRole + 3


class FileItem(item_models.TreeItem):
    def __init__(self, key, data, style, parent=None):
        self._key = key
        self._data = data
        self._parent = parent
        self._style = style

    def data(self, role=QtCore.Qt.ItemDataRole.DisplayRole):
        res = None
        if role == QtCore.Qt.ItemDataRole.DisplayRole:
            res = self.name()
        elif role == QtCore.Qt.ItemDataRole.DecorationRole:
            res = self.icon()
        elif role == QtCore.Qt.ItemDataRole.ToolTipRole:
            res = self.tool_tip()
        elif role == DataRole:
            res = self._data
        elif role == FetchedRole:
            res = self.fetched()
        elif role == PathRole:
            res = []
            curr = self
            while curr._parent:
                res.append(curr._key)
                curr = curr._parent
            res = list(reversed(res))
        return res

    def name(self):
        return self._data['name']

    def fetched(self):
        return True

    def status(self):
        return True

    def set_status(self, ok):
        return True

    def remove_children(self):
        pass

    def set_children(self, data):
        pass

    def icon(self):
        return sywidgets.file_icon(self._style)

    def tool_tip(self):
        return ''

    def parent(self):
        return self._parent

    def flags(self):
        return (QtCore.Qt.ItemFlag.ItemIsEnabled
                | QtCore.Qt.ItemFlag.ItemIsSelectable)

    def header_data(self, section, orientation, role):
        pass


class FolderItem(FileItem):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._fetched = False
        self._children_dict = {}
        self._keys = list(range(self.child_count()))
        self._values = [{}] * len(self._keys)
        self._ok = True

    def index(self, child):
        return self._values.index(child._data)

    def child_count(self):
        return self._data['folder']['childCount']

    def _set_child_count(self, count):
        self._data['folder']['childCount'] = count

    def _make_item(self, key, value):
        if check_file(value):
            cls = FileItem
        elif check_folder(value):
            cls = FolderItem
        elif check_item(value):
            cls = UnsupportedItem
        else:
            cls = LoadingItem
        return cls(key, value, self._style, parent=self)

    def child(self, row):
        res = self._children_dict.get(row)
        if res is None:
            key = self._keys[row]
            value = self._values[row]
            res = self._make_item(key, value)
            self._children_dict[row] = res
        return res

    def path_to_rows(self, path):
        rows = []
        item = self
        for seg in path or []:
            if isinstance(item, FolderItem):
                try:
                    row = item._keys.index(seg)
                    rows.append(row)
                    item = item.child(row)
                except Exception:
                    rows.clear()
                    break
            else:
                rows.clear()
                break
        return rows

    def fetched(self):
        return self._fetched

    def remove_children(self):
        self._children_dict.clear()
        self.set_values([])

    def set_children(self, data):
        self.remove_children()
        self.set_values(data)
        self._fetched = True

    def set_values(self, values):
        self._set_child_count(len(values))
        self._values[:] = values
        self._keys[:] = range(len(values))

    def set_status(self, ok):
        self._ok = ok

    def status(self):
        return self._ok

    def icon(self):
        if self._ok:
            return sywidgets.folder_icon(self._style)
        else:
            return sywidgets.browser_reload_icon(self._style)


class UnsupportedItem(FileItem):

    def data(self, role=QtCore.Qt.ItemDataRole.DisplayRole):
        res = None
        if role == QtCore.Qt.ItemDataRole.DisplayRole:
            res = self.name()
        elif role == QtCore.Qt.ItemDataRole.DecorationRole:
            res = self.icon()
        elif role == QtCore.Qt.ItemDataRole.ToolTipRole:
            res = self.tool_tip()
        elif role == PathRole:
            res = []
            curr = self
            while curr._parent:
                res.append(curr._key)
                curr = curr._parent
            res = list(reversed(res))
        return res

    def name(self):
        return f'{self._data["name"]} (unsupported)'

    def fetched(self):
        return False

    def icon(self):
        return sywidgets.messagebox_question_icon(self._style)

    def tool_tip(self):
        return ''

    def parent(self):
        return self._parent

    def flags(self):
        return QtCore.Qt.ItemFlag.NoItemFlags

    def header_data(self, section, orientation, role):
        pass


class LoadingItem(UnsupportedItem):
    def name(self):
        return 'Loading ...'

    def icon(self):
        return sywidgets.messagebox_warning_icon(self._style)


def _create_root(items, style):
    res = FolderItem(
        'root', {'name': '', 'folder': {'childCount': len(items)}}, style)
    res.set_values(items)
    return res


class DriveItemTreeView(sywidgets.HtmlTreeView):
    selection_changed = QtCore.Signal(QtCore.QModelIndex)
    fetch_requested = QtCore.Signal(QtCore.QModelIndex)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.setContextMenuPolicy(
            QtCore.Qt.ContextMenuPolicy.CustomContextMenu)
        self.customContextMenuRequested.connect(self._context_menu)
        self.expanded.connect(self.fetch_requested)
        self._retry_action = QtGui.QAction('Load', parent=self)

    def _context_menu(self, pos):
        index = self.indexAt(pos)
        if index.isValid():
            item = self.model().get_item(index)
            if item.child_count() > 0 and not item.fetched():
                context_menu = QtWidgets.QMenu(self)
                context_menu.addAction(self._retry_action)

                def retry_triggered(*args):
                    self.fetch_requested.emit(index)

                self._retry_action.triggered.connect(retry_triggered)

                context_menu.exec(self.viewport().mapToGlobal(pos))
                self._retry_action.triggered.disconnect(retry_triggered)

    def selectionChanged(self, selected, deselected):
        selected_indexes = selected.indexes()

        index = QtCore.QModelIndex()
        super().selectionChanged(selected, deselected)

        if selected_indexes:
            for selected_index in selected_indexes:
                index = selected_index

        self.selection_changed.emit(index)


class AsyncGraphNode(utils.AsyncAzureNode):

    azure_port_name = _graph_port_name
    azure_port_display = 'Graph'


class DriveItemTreeModel(item_models.TreeModel):
    def get_item(self, index):
        return self._get_item(index)

    def remove_items(self, items):
        item_to_index = {
            index: item for index, item in self._index_to_item.items()
            if item in items}
        self._clear_items()
        self._index_to_item.update(item_to_index)


class AsyncDriveItemView(utils.AsyncAzureParameterView):

    def __init__(self, node, ctx, azure_port, parent=None):
        super().__init__(node, ctx, azure_port, parent=parent)
        self.ctx = ctx
        self._token = None
        self._data = {}
        if azure_port.is_valid():
            self._data = azure_port.data
        self._drive_item = ctx.parameters[_drive_item_parameter]
        self._drive = ctx.parameters[_drive_parameter]

        label = ''
        current_option = utils.get_current_option(self._drive_item)
        if current_option:
            label = next(iter(current_option.values()))

        root_data = []
        if check_folder(self._data):
            root_data = [self._data]

        self._root = _create_root(root_data, self.style())

        self._model = DriveItemTreeModel(self._root)

        self._tree_view = DriveItemTreeView()
        self._tree_view.setHeaderHidden(True)
        self._tree_view.setModel(self._model)
        self._status_label = QtWidgets.QLabel()
        self._set_status(label)

        self._requests = {}
        self._timeout = 4

        layout = QtWidgets.QVBoxLayout()
        self._tree_view.fetch_requested.connect(self._handle_fetch_tree)
        self._tree_view.selection_changed.connect(
            self._handle_selection_changed)
        self.setLayout(layout)
        layout.addWidget(self._tree_view)
        layout.addWidget(self._status_label)

    def _handle_token(self, token):
        super()._handle_token(token)
        self._token = token

    def _set_item_status(self, index, item, status):
        item.set_status(status)
        self._model.dataChanged.emit(
            index, index, [
                QtCore.Qt.ItemDataRole.DisplayRole,
                QtCore.Qt.ItemDataRole.DecorationRole])

    def _handle_fetch_tree(self, index):
        item = self._model.get_item(index)
        data = item.data(DataRole)
        fetched = item.data(FetchedRole)

        if not self._token:
            self._set_item_status(index, item, False)
        elif not fetched:
            folder_id = data['id']
            drive_id = data['parentReference']['driveId']
            resource = api.drive_item_children.resource(
                drive_id=drive_id, item_id=folder_id)

            if resource not in self._requests:

                with self._azure.connected_client(self._token_manager):
                    delayed = self._azure.get_json_all(resource)
                    self._worker_manager.add_delayed_request(
                        resource, delayed, timeout=self._timeout)
                    self._requests[resource] = ('fetch_folder', item)

    def _handle_response(self, resource, ok, result):
        resource_data = self._requests.pop(resource)
        if resource_data:
            action = resource_data[0]
            if action == 'fetch_folder':
                item = resource_data[1]
                path = item.data(PathRole)
                index = QtCore.QModelIndex()
                for p in path:
                    index = self._model.index(p, 0, parent=index)
                self._set_item_status(index, item, ok)
                child_count = item.child_count()

                if ok and child_count:
                    child_items = [item.child(i) for i in range(child_count)]

                    self._model.beginRemoveRows(index, 0, child_count - 1)
                    item.remove_children()
                    self._model.endRemoveRows()
                    self._model.remove_items(child_items)
                    children_data = result['value']
                    self._model.beginInsertRows(
                        index, 0, len(children_data) - 1)
                    item.set_children(children_data)
                    self._model.endInsertRows()

    def _set_status(self, path):
        if path:
            self._status_label.setText(f"<b>Drive Item</b>: {path}")
        else:
            self._status_label.setText('No item selected')

    def _handle_selection_changed(self, index):
        if index.isValid():
            item = self._model.get_item(index)
            data = item.data(DataRole)
            drive_id = data['parentReference']['driveId']
            drive = {drive_id: self._data['name']}
            self._drive.value = drive_id
            self._drive.adjust(drive)

            drive_item_id = data['id']
            names = []

            curr = item
            while curr:
                names.append(curr.name())
                curr = curr.parent()

            name = '/'.join(reversed(names))
            drive_item = {drive_item_id: name}

            self._drive_item.value = drive_item_id
            self._drive_item.adjust(drive_item)
            self._set_status(name)

        else:
            self._drive.adjust([])
            self._drive.value = ''
            self._drive_item.adjust([])
            self._drive_item.value = ''
            self._set_status('')


class ChoiceParameterSetter:
    options = None
    name = None

    @classmethod
    def set_definition(cls, parameter, **kwargs):
        raise NotImplementedError()

    @classmethod
    def option_list(cls):
        return [v.value for v in cls.options]

    @classmethod
    def get_parameter(cls, parameters):
        return parameters[cls.name]

    @classmethod
    def get_option(cls, parameters):
        return cls.options(parameters[cls.name].value)


class IfFileExistsOptions(enum.StrEnum):
    skip = 'Skip'
    overwrite = 'Overwrite'
    raise_exc = 'Raise exception'


class IfFileExists(ChoiceParameterSetter):
    options = IfFileExistsOptions
    name = 'if_file_exists'

    @classmethod
    def set_definition(cls, parameters, **kwargs):
        parameters.set_string(
            cls.name, label='If file already exists',
            value=cls.options.skip.value,
            description=('What to do if the file already exists'),
            editor=node.editors.combo_editor(
                options=cls.option_list()))


class IfFolderExistsOptions(enum.StrEnum):
    skip = 'Skip'
    raise_exc = 'Raise exception'


class IfFolderExists(ChoiceParameterSetter):
    options = IfFolderExistsOptions
    name = 'if_folder_exists'

    @classmethod
    def set_definition(cls, parameters, **kwargs):
        parameters.set_string(
            cls.name, label='If folder already exists',
            value=cls.options.skip.value,
            description=('What to do if the folder already exists'),
            editor=node.editors.combo_editor(
                options=cls.option_list()))


class LinkTypeOptions(enum.StrEnum):
    view = 'view'
    edit = 'edit'
    embed = 'embed'


class LinkType(ChoiceParameterSetter):
    options = LinkTypeOptions
    name = 'link_type'

    @classmethod
    def set_definition(cls, parameters, **kwargs):
        parameters.set_string(
            cls.name, label='Type',
            value=cls.options.view.value,
            description=(
                'Type options, see '
                'https://docs.microsoft.com/en-us/graph/api/'
                'driveitem-createlink#link-types'),
            editor=node.editors.combo_editor(
                options=cls.option_list()))


class LinkScopeOptions(enum.StrEnum):
    anonymous = 'anonymous'
    organization = 'organization'


class LinkScope(ChoiceParameterSetter):
    options = LinkScopeOptions
    name = 'link_scope'

    @classmethod
    def set_definition(cls, parameters, **kwargs):
        parameters.set_string(
            cls.name, label='Scope',
            value=cls.options.organization.value,
            description=(
                'Scope options, see '
                'https://docs.microsoft.com/en-us/graph/api/'
                'driveitem-createlink#scope-types'),
            editor=node.editors.combo_editor(
                options=cls.option_list()))


def _isoformat_to_datetime(isoformat):
    utc = False
    modified = isoformat.rstrip('zZ')

    if isoformat != modified:
        utc = True
        isoformat = modified

    prefix, _, suffix = isoformat.partition('.')

    if suffix:
        suffix = (suffix + '0' * 6)[:6]
        isoformat = f'{prefix}.{suffix}'

    dt = datetime.fromisoformat(isoformat)

    if utc:
        dt = dt.replace(tzinfo=UTC)
    return dt


[docs] class SelectDriveItemAsyncGraph(node.Node): """ Select :ref:`graph_drive_item_365` in Folder specified by the :ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Allows interactive selection of a Drive Item contained under the Folder Item passed as :ref:`graph_data_365` input. For example the Root Folder of a Drive. Outputs :ref:`Graph port <graph_port_365>` with the selected Drive Item as :ref:`graph_data_365`. """ name = 'Select Drive Item in Folder' nodeid = 'com.sympathyfordata.azure.graph.driveitem.select' related = ['com.sympathyfordata.azure.graph.drive.root'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_select.svg' parameters = node.parameters() set_drive_parameter(parameters) set_drive_item_parameter(parameters) inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def exec_parameter_view(self, ctx): return AsyncDriveItemView(self, ctx, ctx.input[_graph_port_name]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] drive_id = ctx.parameters[_drive_parameter].value item_id = ctx.parameters[_drive_item_parameter].value if not (drive_id or item_id): raise SyConfigurationError('A Drive Item should be selected') with in_graph.connected(self): data = api.drive_item.get( in_graph, _drive_item(drive_id, item_id)).run() propagate_graph(in_graph, [out_graph]) out_graph.data = data
[docs] class Graph(node.Node): """ Create a new Graph connection. This is the starting point for processing of data from SharePoint or OneDrive via :ref:`sympathy_graph_365`. Optional name can be set to support multiple, different, logins. One login for each name. See :ref:`graph_port_365` for details about the Graph output port. """ name = 'Graph 365' nodeid = 'com.sympathyfordata.azure.graph.connection' icon = 'node_graph.svg' tags = Tags(Tag.Azure.Graph) parameters = node.parameters() utils_parameters.set_azure_connection( parameters, 'Graph', description='Azure credentials for Sympathy Azure Toolkit') outputs = Ports([graph_port()]) def execute(self, ctx): connection = ctx.parameters[ utils_parameters.azure_connection_name].value out_graph = ctx.output[_graph_port_name] out_graph.set({ 'connection': connection, 'data': None, }) with out_graph.connected(self): out_graph.ensure_token()
[docs] class GetResourceGraph(node.Node): """ Get Custom Resource by specifying the path and query part of the URL. For example, to output the OneDrive Drive, enter `/me/drive`. Outputs :ref:`Graph port <graph_port_365>` with the output as :ref:`graph_data_365`. """ name = 'Get Custom Resource' nodeid = 'com.sympathyfordata.azure.graph.resource.get' related = ['com.sympathyfordata.azure.graph.connection'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph.svg' parameters = node.parameters() parameters.set_string( 'resource', value='', label='Resource', editor=node.editors.lineedit_editor( placeholder='Required')) inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] resource = ctx.parameters['resource'].value if not resource: raise SyConfigurationError('Resource should not be empty') with in_graph.connected(self): res = in_graph.get_json_all(resource).run() propagate_graph(in_graph, [out_graph]) out_graph.data = res
[docs] class DownloadFileItemGraph(node.Node): """ Download File specified by the :ref:`File Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input to configured output directory. Output Datasource pointing to downloaded file. Optionally outputs :ref:`Graph port <graph_port_365>` with File Item as :ref:`graph_data_365`. """ name = 'Download File' nodeid = 'com.sympathyfordata.azure.graph.file.download' related = ['com.sympathyfordata.azure.graph.driveitem.select'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_file.svg' parameters = node.parameters() set_output_directory( parameters, description='Select output directory for downloaded files.') IfFileExists.set_definition(parameters) inputs = Ports([graph_port()]) outputs = Ports([graph_port(opt='in'), Port.Datasource('File', name='file')]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_file = ctx.output['file'] out_graphs = ctx.output.group(_graph_port_name) if_exists = IfFileExists.get_option(ctx.parameters) item = in_graph.data ensure_file(item) directory = pathlib.Path(ctx.parameters['directory'].value) item_name = item['name'] item_path = directory / item_name if item_path.is_file(): if if_exists == IfFileExists.options.skip: print(f'File {item_path} already exists, skip download') elif if_exists == IfFileExists.options.raise_exc: raise SyDataError(f'File {item_path} already exists') item_id = item['id'] drive_id = item['parentReference']['driveId'] with in_graph.connected(self): with item_path.open(mode='wb') as f: for seg in api.drive_item_content.get( in_graph, _drive_item(drive_id, item_id)).run(): f.write(seg) item_file_system_info = item.get('fileSystemInfo') if item_file_system_info: modified_iso = item_file_system_info.get( 'lastModifiedDateTime') if modified_iso: modified_dt = _isoformat_to_datetime(modified_iso) modified = modified_dt.timestamp() os.utime(str(item_path), times=(time.time(), modified)) out_file.encode_path(str(item_path)) propagate_graph(in_graph, out_graphs)
def expiration(data): return _isoformat_to_datetime(data['expirationDateTime']) def range_header(start, chunk_size, file_size): assert chunk_size > 0 end = start + chunk_size - 1 return { 'Content-Range': f'bytes {start}-{end}/{file_size}', 'Content-Length': str(chunk_size), } def is_complete(response): return _has_all_keys(response, ['id', 'name', 'size', 'file']) def has_ranges(response): return _has_all_keys( response, ["expirationDateTime", "nextExpectedRanges"])
[docs] class UploadFileGraph(node.Node): """ Upload File to the Folder specified by the :ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Optionally outputs :ref:`Graph port <graph_port_365>` with the Folder Item as :ref:`graph_data_365`. """ name = 'Upload File' nodeid = 'com.sympathyfordata.azure.graph.file.upload' related = ['com.sympathyfordata.azure.graph.driveitem.select'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_file.svg' parameters = node.parameters() IfFileExists.set_definition(parameters) inputs = Ports([graph_port(), Port.Datasource('File', name='file')]) outputs = Ports([graph_port(opt='out')]) def _upload_chunks(self, in_graph, path, session): upload_url = session['uploadUrl'] expiration_dt = expiration(session) file_size = path.stat().st_size # https://docs.microsoft.com/en-us/graph/api/driveitem- # createuploadsession?view=graph-rest-1.0#upload-bytes-to-the- # upload-session base_chunk_size = 2 ** 10 * 320 # 3840 KiB max_chunk_size = base_chunk_size * 12 start = 0 chunk_size = min(file_size, max_chunk_size) end = file_size done = file_size == 0 res = None with path.open(mode='rb') as f: while not done: if datetime.now(UTC) > expiration_dt: break f.seek(start) res = _put_upload_range_data( in_graph, upload_url, f.read(chunk_size), headers=range_header(start, chunk_size, file_size)).run() if is_complete(res): break elif has_ranges(res): expiration_dt = expiration(res) next_expected_ranges = res["nextExpectedRanges"] for next_range in next_expected_ranges: start, end = next_range.split('-') start = int(start) if end: end = int(end) + 1 else: end = file_size chunk_size = min(end - start, max_chunk_size) break else: raise AssertionError() return res def execute(self, ctx): in_graph = ctx.input[_graph_port_name] in_file = ctx.input['file'] out_graphs = ctx.output.group(_graph_port_name) if_exists = IfFileExists.get_option(ctx.parameters) item = in_graph.data conflict_behavior = 'fail' if if_exists == IfFileExists.options.overwrite: conflict_behavior = 'replace' ensure_folder(item) item_id = item['id'] drive_id = item['parentReference']['driveId'] path = pathlib.Path(in_file.decode_path()) filename = path.name res = None with in_graph.connected(self): body = { "item": { "@microsoft.graph.conflictBehavior": conflict_behavior, }, } try: session = api.drive_item_upload_session.post( in_graph, _drive_item_file(drive_id, item_id, filename), body).run() except utils.ResponseError as r: if (if_exists == IfFileExists.options.skip and utils.get_response_code(r) == 'nameAlreadyExists'): print(f'Item named {filename} already exists, skip upload') res = api.drive_item_relative_file.get( in_graph, _drive_item_file(drive_id, item_id, filename)).run() else: raise else: res = self._upload_chunks(in_graph, path, session) assert res and is_complete(res) propagate_graph(in_graph, out_graphs) if out_graphs: out_graphs[0].data = res
[docs] class CheckoutFileGraph(node.Node): """ Check out the File specified by the :ref:`File Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Optionally outputs :ref:`Graph port <graph_port_365>` """ name = 'Check out File' nodeid = 'com.sympathyfordata.azure.graph.file.checkout' related = [ 'com.sympathyfordata.azure.graph.driveitem.select', 'com.sympathyfordata.azure.graph.file.checkin' ] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_check.svg' parameters = node.parameters() inputs = Ports([graph_port()]) outputs = Ports([graph_port(opt='out')]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graphs = ctx.output.group(_graph_port_name) item = in_graph.data ensure_file(item) item_id = item['id'] drive_id = item['parentReference']['driveId'] with in_graph.connected(self): body = None api.drive_item_checkout.post( in_graph, _drive_item(drive_id, item_id), body, statuses=[http.HTTPStatus.NO_CONTENT], response_type=None).run() propagate_graph(in_graph, out_graphs)
[docs] class CheckInFileGraph(node.Node): """ Check in the File specified by the :ref:`File Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Optionally outputs :ref:`Graph port <graph_port_365>` """ name = 'Check in File' nodeid = 'com.sympathyfordata.azure.graph.file.checkin' related = ['com.sympathyfordata.azure.graph.driveitem.select'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_check.svg' parameters = node.parameters() parameters.set_string( 'comment', label='Comment', description='Comment for the checked in version.', value='') parameters.set_boolean( 'check_in_as_published', label='Check in as published', value=False, description='Published status after check in') inputs = Ports([graph_port()]) outputs = Ports([graph_port(opt='out')]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graphs = ctx.output.group(_graph_port_name) item = in_graph.data ensure_file(item) comment = ctx.parameters['comment'].value item_id = item['id'] drive_id = item['parentReference']['driveId'] with in_graph.connected(self): body = { "comment": comment, } if ctx.parameters['check_in_as_published'].value: body["checkInAs"] = 'published' api.drive_item_checkin.post( in_graph, _drive_item(drive_id, item_id), body, statuses=[http.HTTPStatus.NO_CONTENT], response_type=None).run() propagate_graph(in_graph, out_graphs)
[docs] class CreateFileLinkGraph(node.Node): """ Create File Link to the File specified by the :ref:`File Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Optionally outputs :ref:`Graph port <graph_port_365>` with the Link Item as :ref:`graph_data_365`. """ name = 'Create File Link' nodeid = 'com.sympathyfordata.azure.graph.file.createlink' related = ['com.sympathyfordata.azure.graph.driveitem.select'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_create.svg' parameters = node.parameters() LinkType.set_definition(parameters) LinkScope.set_definition(parameters) utils_parameters.set_secret_connection( parameters, name='password', label='Password', description='Optional password, empty means no password.') parameters.set_integer( 'expire_after', label='Expire after (days)', description=('Set expiration time of link (days) from time of ' 'creation (when the node is run).\n' '0 means no expiration or organization limit'), value=30) inputs = Ports([graph_port()]) outputs = Ports([graph_port(opt='in'), Port.Text('Web URL', name='web_url')]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_url = ctx.output['web_url'] out_graphs = ctx.output.group(_graph_port_name) password_connection = ctx.parameters['password'].value expire_after = ctx.parameters['expire_after'].value link_type = LinkType.get_option(ctx.parameters) link_scope = LinkScope.get_option(ctx.parameters) item = in_graph.data ensure_file(item) body = { "type": link_type.value, "scope": link_scope.value, } password = password_connection.resource if password: if (password_connection.credentials.mode == credentials.secrets_mode): secrets = credentials.get_secret_credentials( self, password_connection) password = credentials.expand_secrets( password, secrets) body["password"] = password if expire_after: now = datetime.now(UTC) now += timedelta(days=expire_after) body["expirationDateTime"] = now.strftime('%Y-%m-%dT%H:%M:%SZ') item_id = item['id'] drive_id = item['parentReference']['driveId'] with in_graph.connected(self): data = api.drive_item_create_link.post( in_graph, _drive_item(drive_id, item_id), body).run() propagate_graph(in_graph, out_graphs) if out_graphs: out_graphs[0].data = data out_url.set(data['link']['webUrl'])
[docs] class DeleteFileItemGraph(node.Node): """ Delete File specified by the :ref:`File Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Optionally outputs :ref:`Graph port <graph_port_365>` """ name = 'Delete File in Graph' nodeid = 'com.sympathyfordata.azure.graph.file.delete' tags = Tags(Tag.Azure.Graph) icon = 'node_graph_delete.svg' parameters = node.parameters() _set_confirm_delete_parameter(parameters) inputs = Ports([graph_port()]) outputs = Ports([graph_port(opt='out')]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graphs = ctx.output.group(_graph_port_name) item = in_graph.data ensure_file(item) item_id = item['id'] drive_id = item['parentReference']['driveId'] with in_graph.connected(self): if not ctx.parameters[_confirm_delete_parameter].value: print('Delete file, dry run:') try: item = api.drive_item.get( in_graph, _drive_item(drive_id, item_id)).run() print(f"Would remove file: {item['name']}") except Exception: print('Specified file does not exist') else: api.drive_item.delete( in_graph, _drive_item(drive_id, item_id)).run() propagate_graph(in_graph, out_graphs)
[docs] class DeleteFolderItemGraph(node.Node): """ Delete Folder specified by the :ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Optionally outputs :ref:`Graph port <graph_port_365>` """ name = 'Delete Folder' nodeid = 'com.sympathyfordata.azure.graph.folder.delete' related = ['com.sympathyfordata.azure.graph.driveitem.select'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_delete.svg' parameters = node.parameters() _set_confirm_delete_parameter(parameters) inputs = Ports([graph_port()]) outputs = Ports([graph_port(opt='out')]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graphs = ctx.output.group(_graph_port_name) item = in_graph.data ensure_folder(item) item_id = item['id'] drive_id = item['parentReference']['driveId'] with in_graph.connected(self): if not ctx.parameters[_confirm_delete_parameter].value: print('Delete folder, dry run:') try: item = api.drive_item.get( in_graph, _drive_item(drive_id, item_id)).run() print(f"Would remove folder: {item['name']}") except Exception: print('Specified folder does not exist') else: item = api.drive_item.delete( in_graph, _drive_item(drive_id, item_id)).run() propagate_graph(in_graph, out_graphs)
[docs] class CreateFolderItemGraph(node.Node): """ Create Folder in the Folder specified by the :ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Optionally outputs :ref:`Graph port <graph_port_365>` with the Folder Item as :ref:`graph_data_365`. """ name = 'Create Folder' nodeid = 'com.sympathyfordata.azure.graph.folder.create' related = ['com.sympathyfordata.azure.graph.driveitem.select'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_create.svg' parameters = node.parameters() IfFolderExists.set_definition(parameters) _set_name_parameter(parameters, description='Set folder name') inputs = Ports([graph_port()]) outputs = Ports([graph_port(opt='out')]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graphs = ctx.output.group(_graph_port_name) if_exists = IfFolderExists.get_option(ctx.parameters) name = ctx.parameters['name'].value if not name: raise SyConfigurationError('Name should not be empty') item = in_graph.data ensure_folder(item) item_id = item['id'] drive_id = item['parentReference']['driveId'] with in_graph.connected(self): conflict_behavior = 'fail' res = None if if_exists == IfFolderExists.options.skip: # https://github.com/OneDrive/onedrive-api-docs/issues/1144 # Only fileSystemInfo gets modified, the folder is not # re-created. conflict_behavior = 'replace' # Skip could also be implemented using 'fail' with a follow up # query for the named item. body = { "@microsoft.graph.conflictBehavior": conflict_behavior, "folder": {"childCount": 0}, "name": name, } res = api.drive_item_children.post( in_graph, _drive_item(drive_id, item_id), body).run() propagate_graph(in_graph, out_graphs) if out_graphs: out_graphs[0].data = res
[docs] class DriveGraph(node.Node): """ Get OneDrive Drive. Outputs :ref:`Graph port <graph_port_365>` with the Drive as :ref:`graph_data_365`. """ name = 'Get OneDrive Drive' nodeid = 'com.sympathyfordata.azure.graph.drive.get' tags = Tags(Tag.Azure.Graph) icon = 'node_graph_get.svg' parameters = node.parameters() inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] with in_graph.connected(self): data = api.me_drive.get(in_graph, {}).run() propagate_graph(in_graph, [out_graph]) out_graph.data = data
[docs] class SiteGraph(AsyncGraphNode): """ Select SharePoint Site. Outputs :ref:`Graph port <graph_port_365>` with the Site as :ref:`graph_data_365`. """ name = 'Select SharePoint Site' nodeid = 'com.sympathyfordata.azure.graph.site.select' tags = Tags(Tag.Azure.Graph) icon = 'node_graph_select.svg' parameters = node.parameters() parameters.set_string( _site_parameter, value='', label='SharePoint Site', editor=node.editors.combo_editor( options=[], edit=True, placeholder='Required', include_empty=True), description='Select site to output') inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] site_id = ctx.parameters[_site_parameter].value if not site_id: raise SyConfigurationError('SharePoint site should not be empty') with in_graph.connected(self): data = api.site.get(in_graph, dict(site_id=site_id)).run() propagate_graph(in_graph, [out_graph]) out_graph.data = data def async_request_azure_parameters(self, ctx, graph): return True, {_site_parameter: api.sites_search.get(graph, {})} def async_adjust_azure_parameters(self, ctx, responses): return self._update_options_response(ctx, responses, _site_parameter) def save_parameters(self, ctx): self._save_current_option(ctx.parameters[_site_parameter])
[docs] class SiteDriveGraph(AsyncGraphNode): """ Select Drive from SharePoint Site. Outputs :ref:`Graph port <graph_port_365>` with the Drive as :ref:`graph_data_365`. """ name = 'Select SharePoint Drive' nodeid = 'com.sympathyfordata.azure.graph.site.drive.select' tags = Tags(Tag.Azure.Graph) icon = 'node_graph_select.svg' parameters = node.parameters() set_drive_parameter(parameters, 'Select drive to output') inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] drive_id = ctx.parameters[_drive_parameter].value if not drive_id: raise SyConfigurationError('Drive should not be empty') with in_graph.connected(self): data = api.drive.get(in_graph, dict(drive_id=drive_id)).run() propagate_graph(in_graph, [out_graph]) out_graph.data = data def async_request_azure_parameters(self, ctx, graph): try: site_id = ctx.input[_graph_port_name].data['id'] return True, {_drive_parameter: api.site_drives.get( graph, dict(site_id=site_id))} except Exception: return False, ( 'Graph data must contain a SharePoint Site') def async_adjust_azure_parameters(self, ctx, responses): return self._update_options_response(ctx, responses, _drive_parameter) def save_parameters(self, ctx): self._save_current_option(ctx.parameters[_drive_parameter])
[docs] class DriveRoot(node.Node): """ Get Root :ref:`Folder Item <graph_drive_item_365>` from the Drive passed as Json input. Outputs :ref:`Graph port <graph_port_365>` with the Folder Item as :ref:`graph_data_365`. """ name = 'Get Drive Root Folder' related = [ 'com.sympathyfordata.azure.graph.drive.get', 'com.sympathyfordata.azure.graph.site.drive.select'] nodeid = 'com.sympathyfordata.azure.graph.drive.root' tags = Tags(Tag.Azure.Graph) icon = 'node_graph_get.svg' parameters = node.parameters() inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] item = in_graph.data ensure_drive(item) with in_graph.connected(self): drive_id = item['id'] data = api.drive_root.get(in_graph, dict(drive_id=drive_id)).run() propagate_graph(in_graph, [out_graph]) out_graph.data = data
def exclude_key(data, key): data['value'] = [v for v in data['value'] if key not in v]
[docs] class SearchDriveGraph(node.Node): """ Search for :ref:`Drive Items <graph_drive_item_365>` in Folder specified by the :ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Outputs :ref:`Graph port <graph_port_365>` with a :ref:`graph_response_list_365` of matched Drive Items as :ref:`graph_data_365`. See `Search for DriveItems within a drive <https://docs.microsoft.com/en-us/graph/api/ driveitem-search>`_ for more details. """ name = 'Search for Drive Items in Folder' nodeid = 'com.sympathyfordata.azure.graph.folder.search' tags = Tags(Tag.Azure.Graph) icon = 'node_graph_search.svg' parameters = node.parameters() parameters.set_string( 'query', value='', label='Query', editor=node.editors.lineedit_editor( placeholder='Required'), description='Enter Drive Item query') set_drive_item_output_parameters(parameters) inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] item = in_graph.data query = ctx.parameters['query'].value output_files = ctx.parameters['output_files'].value output_folders = ctx.parameters['output_folders'].value ensure_folder(item) item_id = item['id'] drive_id = item['parentReference']['driveId'] if not query: raise SyConfigurationError('Query should not be empty') with in_graph.connected(self): res = api.drive_item_search.get( in_graph, dict( drive_id=drive_id, item_id=item_id, query=query)).run() if not output_files: exclude_key(res, 'file') if not output_folders: exclude_key(res, 'folder') propagate_graph(in_graph, [out_graph]) out_graph.data = res
[docs] class SearchGraph(node.Node): """ Search for :ref:`Drive Items <graph_drive_item_365>` in OneDrive and SharePoint. Outputs :ref:`Graph port <graph_port_365>` with a :ref:`graph_response_list_365` of matched Drive Items as :ref:`graph_data_365`. See `Use the Microsoft Search API to search content in OneDrive and SharePoint <https://docs.microsoft.com/en-us/graph/search-concept-files>`_ for more details. """ name = 'Search for Drive Items' nodeid = 'com.sympathyfordata.azure.graph.search' tags = Tags(Tag.Azure.Graph) icon = 'node_graph_search.svg' parameters = node.parameters() parameters.set_string( 'query', value='', label='Query', editor=node.editors.lineedit_editor( placeholder='Required'), description='Enter Drive Item query') set_drive_item_output_parameters(parameters) inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] query = ctx.parameters['query'].value output_files = ctx.parameters['output_files'].value output_folders = ctx.parameters['output_folders'].value if not query: raise SyConfigurationError('Query should not be empty') with in_graph.connected(self): body = { "requests": [ { "entityTypes": [ "driveItem" ], "query": { "queryString": query }, } ] } res = api.search_query.post( in_graph, {}, body).run() value = res['value'] items = [] # Flatten items and remove hits info. for item in value: for container in item['hitsContainers']: for hit in container['hits']: items.append(hit['resource']) res['value'] = items if not output_files: exclude_key(res, 'file') if not output_folders: exclude_key(res, 'folder') propagate_graph(in_graph, [out_graph]) out_graph.data = res
[docs] class SelectDataItemGraph(node.Node): """ Select named Item from :ref:`graph_response_list_365` with Items. Outputs :ref:`Graph port <graph_port_365>` with selected item as :ref:`graph_data_365`. https://docs.microsoft.com/en-us/graph/api/driveitem-search """ name = 'Select Item from Response List' nodeid = 'com.sympathyfordata.azure.graph.dataitems.select' tags = Tags(Tag.Azure.Graph) icon = 'node_graph_select.svg' parameters = node.parameters() _set_option_parameter( parameters, _drive_item_parameter, 'Data Item', 'Select data item to output') inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def adjust_parameters(self, ctx): items = {'value': []} in_graph = ctx.input[_graph_port_name] if in_graph.is_valid(): items = in_graph.data return utils.update_options_response( ctx, {_drive_item_parameter: items}, _drive_item_parameter) def save_parameters(self, ctx): utils.save_current_option(ctx.parameters[_drive_item_parameter]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] item_id = ctx.parameters[_drive_item_parameter].value if not item_id: raise SyConfigurationError('Data Item should not be empty') response_list = in_graph.data ensure_response_list(response_list) items = {item['id']: item for item in response_list['value']} propagate_graph(in_graph, [out_graph]) try: out_graph.data = items[item_id] except KeyError as exc: out_graph.data = None raise SyDataError( "Selected Data Item does not exist in input" ) from exc
[docs] class ListFolderGraph(node.Node): """ List :ref:`Drive Items <graph_drive_item_365>` in Folder specified by the :ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365` input. Outputs :ref:`Graph port <graph_port_365>` with a :ref:`graph_response_list_365` of Drive Items as :ref:`graph_data_365`. """ name = 'List Drive Items in Folder' nodeid = 'com.sympathyfordata.azure.graph.folder.list' related = ['com.sympathyfordata.azure.graph.driveitem.select'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_search.svg' parameters = node.parameters() set_drive_item_output_parameters(parameters) inputs = Ports([graph_port()]) outputs = Ports([graph_port()]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_graph = ctx.output[_graph_port_name] item = in_graph.data output_files = ctx.parameters['output_files'].value output_folders = ctx.parameters['output_folders'].value ensure_folder(item) value = [] res = {'value': value} with in_graph.connected(self): item_id = item['id'] drive_id = item['parentReference']['driveId'] res = api.drive_item_children.get( in_graph, _drive_item(drive_id, item_id)).run() if not output_files: exclude_key(res, 'file') if not output_folders: exclude_key(res, 'folder') propagate_graph(in_graph, [out_graph]) out_graph.data = res
[docs] class GraphDataToJson(node.Node): """ Output Data from :ref:`graph_port_365` as Json. Makes it possible to work with the (response) data from :ref:`sympathy_graph_365` using Json nodes. Optional output propagates input :ref:`Graph port <graph_port_365>` to output. """ name = 'Graph Data to Json' nodeid = 'com.sympathyfordata.azure.graph.data.tojson' related = ['com.sympathyfordata.azure.graph.data.fromjson'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_json.svg' inputs = Ports([graph_port()]) outputs = Ports([Port.Json('Graph Data', name='data'), graph_port(opt='in')]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] out_data = ctx.output['data'] item = in_graph.data out_data.set(item) propagate_graph(in_graph, ctx.output.group(_graph_port_name))
[docs] class GraphDataFromJson(node.Node): """ Set Graph Data from as Json as :ref:`graph_port_365`. Makes it possible to work with the (response) data from :ref:`sympathy_graph_365` using Json nodes. Outputs :ref:`Graph port <graph_port_365>` with the Json data as :ref:`graph_data_365`. """ name = 'Graph Data from Json' nodeid = 'com.sympathyfordata.azure.graph.data.fromjson' related = ['com.sympathyfordata.azure.graph.data.tojson'] tags = Tags(Tag.Azure.Graph) icon = 'node_graph_json.svg' inputs = Ports([graph_port(), Port.Json('Graph Data', name='data')]) outputs = Ports([graph_port()]) def execute(self, ctx): in_graph = ctx.input[_graph_port_name] in_data = ctx.input['data'] out_graph = ctx.output[_graph_port_name] propagate_graph(in_graph, [out_graph]) out_graph.data = in_data.get()