# This file is part of Sympathy for Data.
# Copyright (c) 2021 Combine Control Systems
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import time
import pathlib
import os
import enum
import http
from collections import abc
from datetime import datetime, timedelta, UTC
from sympathy.api import qt2 as qt_compat
from sympathy.api import node
from sympathy.api.nodeconfig import Tag, Tags, Ports, Port
from sympathy.api.exceptions import SyConfigurationError, SyDataError
from sympathy.utils import parameters as utils_parameters
from sympathy.utils.parameters import set_output_directory
from sympathy.utils import credentials
from sympathy.platform import item_models
from sympathy.platform import widget_library as sywidgets
from sylib_azure.clients.graph import graph
from sylib_azure import utils
QtWidgets = qt_compat.import_module('QtWidgets')
QtGui = qt_compat.import_module('QtGui')
QtCore = qt_compat.import_module('QtCore')
_graph_port_name = 'graph'
_name_parameter = 'name'
_site_parameter = 'site'
_drive_parameter = 'drive'
_drive_item_parameter = 'drive_item'
_confirm_delete_parameter = 'confirm_delete'
def _set_name_parameter(parameters, name=_name_parameter, label='Name',
description='Set name'):
parameters.set_string(
name,
value='',
label=label,
editor=node.editors.lineedit_editor(
placeholder='Required'),
description=description)
def _set_option_parameter(parameters, name, label, description):
parameters.set_string(
name,
value='',
label=label,
editor=node.editors.combo_editor(
options=[],
edit=True,
placeholder='Required',
include_empty=True),
description=description)
def _set_confirm_delete_parameter(parameters):
parameters.set_boolean(
_confirm_delete_parameter, label='Confirm Delete',
value=False,
description=(
'For safety, ensure all looks good before confirming delete'))
def set_drive_parameter(
parameters, description='Select drive'):
_set_option_parameter(parameters, _drive_parameter, 'Drive', description)
def set_drive_item_parameter(
parameters, description='Select drive item'):
_set_option_parameter(
parameters, _drive_item_parameter, 'Drive Item', description)
def set_drive_item_output_parameters(parameters):
parameters.set_boolean(
'output_files',
value=True,
label='Output files',
description='Include files in the output')
parameters.set_boolean(
'output_folders',
value=True,
label='Output folders',
description='Include folders in the output')
def graph_port(opt=None):
n = utils.opt_n(opt)
return graph.GraphPort('Graph', name=_graph_port_name, n=n)
def propagate_graph(input, output):
if output:
output[0].set(input.get())
def _drive_item(drive_id, item_id):
return dict(drive_id=drive_id, item_id=item_id)
def _drive_item_file(drive_id, item_id, filename):
res = _drive_item(drive_id, item_id)
res.update(dict(filename=filename))
return res
def _has_all_keys(item, keys):
return item is not None and all(v in item for v in keys)
def check_item(item):
return _has_all_keys(item, ['id', 'name'])
def check_file(item):
return _has_all_keys(item, ['id', 'name', 'parentReference', 'file'])
def check_folder(item):
return _has_all_keys(item, ['id', 'name', 'parentReference', 'folder'])
def check_drive(item):
return _has_all_keys(item, ['id', 'name', 'driveType'])
def check_response_list(item):
return 'value' in item and isinstance(item['value'], list)
def ensure_file(item):
if not check_file(item):
raise SyDataError('File Item is not valid')
def ensure_folder(item):
if not check_folder(item):
raise SyDataError('Folder Item is not valid')
def ensure_drive(item):
if not check_drive(item):
raise SyDataError('Drive is not valid')
def ensure_response_list(item):
if not check_response_list(item):
raise SyDataError('Response List is not valid')
def _put_upload_range_data(graph, upload_url, body, **kwargs):
return _put_fq_content_json(graph, upload_url, body, **kwargs)
def _get_response_headers(response_error):
return response_error.response.headers
def _get_json(graph, resource, **kwargs):
return graph.get_json(resource, **kwargs)
def _get_json_all(graph, resource, **kwargs):
return graph.get_json_all(resource, **kwargs)
def _get_iter_content(graph, resource, **kwargs):
return graph.get_iter_content(resource, **kwargs)
def _delete(graph, resource, **kwargs):
return graph.delete(resource, **kwargs)
def _post_json(graph, resource, body=None, **kwargs):
return graph.post_json(resource, body=body, **kwargs)
def _post_json_all(graph, resource, body=None, **kwargs):
return graph.post_json_all(resource, body=body, **kwargs)
def _put_fq_content_json(graph, resource, body=None, **kwargs):
return graph.put_fq_content_json(resource, body=body, **kwargs)
_get_me_drive = _get_json_all
_get_drive = _get_json_all
class Path:
def __init__(self, path, name, paths=None, get=None, delete=None,
post=None,
put=None):
self.path = path
self.name = name
self.paths = list(paths or [])
self._get = get
self._post = post
self._put = put
self._delete = delete
def resource(self, **kwargs):
return self.path.format(**kwargs)
def _check_set(self, method):
if method is None:
raise AssertionError(f'Method is not set for path: {self.name}')
def _call_method(self, method, graph, path_args,
*args,
**kwargs):
self._check_set(method)
resource = self.resource(**path_args)
return method(graph, resource, *args, **kwargs)
def get(self, *args, **kwargs):
return self._call_method(self._get, *args, **kwargs)
def post(self, *args, **kwargs):
return self._call_method(self._post, *args, **kwargs)
def put(self, *args, **kwargs):
return self._call_method(self._put, *args, **kwargs)
def delete(self, *args, **kwargs):
return self._call_method(self._delete, *args, **kwargs)
def __str__(self):
return f'Path(path={repr(self.path)}, name={repr(self.name)})'
def copy(self):
return Path(
self.path, name=self.name, paths=self.paths, get=self._get,
delete=self._delete, post=self._post, put=self._put)
class API(abc.Mapping):
def __init__(self, paths):
self._paths = self._path_tree_to_dict(paths)
def _flatten_path(self, path):
paths = []
def inner(prefix, path):
prefix = list(prefix)
prefix.append(path)
paths.append(prefix)
for sub in path.paths:
inner(prefix, sub)
inner([], path)
res = {}
for segs in paths:
last = segs[-1]
new = last.copy()
new.path = '/'.join([''] + [s.path for s in segs])
res[new.name] = new
return res
def _path_tree_to_dict(self, paths):
return {k: p for path in paths
for k, p in self._flatten_path(path).items()}
def __getattr__(self, key):
return self._paths[key]
def __getitem__(self, key):
return self._paths[key]
def __iter__(self):
return iter(self._paths)
def __len__(self):
return len(self._paths)
_drive_item_path = Path(
'{item_id}', name='drive_item', delete=_delete, get=_get_json, paths=[
Path('children', name='drive_item_children',
get=_get_json_all, post=_post_json),
Path('content', name='drive_item_content',
get=_get_iter_content),
Path('createLink', name='drive_item_create_link', post=_post_json),
Path('checkout', name='drive_item_checkout', post=_post_json),
Path('checkin', name='drive_item_checkin', post=_post_json),
])
api = API([
Path('me/drive', name='me_drive', get=_get_json),
Path('drives', name='drives', paths=[
Path('{drive_id}', name='drive', get=_get_json, paths=[
Path('root', name='drive_root', get=_get_json),
Path('items', name='drive_items', paths=[
_drive_item_path,
Path('{item_id}:/{filename}', name='drive_item_relative_file',
get=_get_json),
Path('{item_id}:/{filename}:/createUploadSession',
name='drive_item_upload_session',
post=_post_json_all),
Path("{item_id}/search(q='{query}')", name='drive_item_search',
get=_get_json_all),
])
])
]),
Path('sites', name='sites', paths=[
Path('{site_id}', name='site', get=_get_json, paths=[
Path('drives', name='site_drives', get=_get_json_all),
])
]),
Path('sites?search=*', name='sites_search', get=_get_json_all, paths=[]),
Path('search/query', name='search_query', post=_post_json_all),
])
DataRole = QtCore.Qt.ItemDataRole.UserRole + 1
PathRole = QtCore.Qt.ItemDataRole.UserRole + 2
FetchedRole = QtCore.Qt.ItemDataRole.UserRole + 3
class FileItem(item_models.TreeItem):
def __init__(self, key, data, style, parent=None):
self._key = key
self._data = data
self._parent = parent
self._style = style
def data(self, role=QtCore.Qt.ItemDataRole.DisplayRole):
res = None
if role == QtCore.Qt.ItemDataRole.DisplayRole:
res = self.name()
elif role == QtCore.Qt.ItemDataRole.DecorationRole:
res = self.icon()
elif role == QtCore.Qt.ItemDataRole.ToolTipRole:
res = self.tool_tip()
elif role == DataRole:
res = self._data
elif role == FetchedRole:
res = self.fetched()
elif role == PathRole:
res = []
curr = self
while curr._parent:
res.append(curr._key)
curr = curr._parent
res = list(reversed(res))
return res
def name(self):
return self._data['name']
def fetched(self):
return True
def status(self):
return True
def set_status(self, ok):
return True
def remove_children(self):
pass
def set_children(self, data):
pass
def icon(self):
return sywidgets.file_icon(self._style)
def tool_tip(self):
return ''
def parent(self):
return self._parent
def flags(self):
return (QtCore.Qt.ItemFlag.ItemIsEnabled
| QtCore.Qt.ItemFlag.ItemIsSelectable)
def header_data(self, section, orientation, role):
pass
class FolderItem(FileItem):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._fetched = False
self._children_dict = {}
self._keys = list(range(self.child_count()))
self._values = [{}] * len(self._keys)
self._ok = True
def index(self, child):
return self._values.index(child._data)
def child_count(self):
return self._data['folder']['childCount']
def _set_child_count(self, count):
self._data['folder']['childCount'] = count
def _make_item(self, key, value):
if check_file(value):
cls = FileItem
elif check_folder(value):
cls = FolderItem
elif check_item(value):
cls = UnsupportedItem
else:
cls = LoadingItem
return cls(key, value, self._style, parent=self)
def child(self, row):
res = self._children_dict.get(row)
if res is None:
key = self._keys[row]
value = self._values[row]
res = self._make_item(key, value)
self._children_dict[row] = res
return res
def path_to_rows(self, path):
rows = []
item = self
for seg in path or []:
if isinstance(item, FolderItem):
try:
row = item._keys.index(seg)
rows.append(row)
item = item.child(row)
except Exception:
rows.clear()
break
else:
rows.clear()
break
return rows
def fetched(self):
return self._fetched
def remove_children(self):
self._children_dict.clear()
self.set_values([])
def set_children(self, data):
self.remove_children()
self.set_values(data)
self._fetched = True
def set_values(self, values):
self._set_child_count(len(values))
self._values[:] = values
self._keys[:] = range(len(values))
def set_status(self, ok):
self._ok = ok
def status(self):
return self._ok
def icon(self):
if self._ok:
return sywidgets.folder_icon(self._style)
else:
return sywidgets.browser_reload_icon(self._style)
class UnsupportedItem(FileItem):
def data(self, role=QtCore.Qt.ItemDataRole.DisplayRole):
res = None
if role == QtCore.Qt.ItemDataRole.DisplayRole:
res = self.name()
elif role == QtCore.Qt.ItemDataRole.DecorationRole:
res = self.icon()
elif role == QtCore.Qt.ItemDataRole.ToolTipRole:
res = self.tool_tip()
elif role == PathRole:
res = []
curr = self
while curr._parent:
res.append(curr._key)
curr = curr._parent
res = list(reversed(res))
return res
def name(self):
return f'{self._data["name"]} (unsupported)'
def fetched(self):
return False
def icon(self):
return sywidgets.messagebox_question_icon(self._style)
def tool_tip(self):
return ''
def parent(self):
return self._parent
def flags(self):
return QtCore.Qt.ItemFlag.NoItemFlags
def header_data(self, section, orientation, role):
pass
class LoadingItem(UnsupportedItem):
def name(self):
return 'Loading ...'
def icon(self):
return sywidgets.messagebox_warning_icon(self._style)
def _create_root(items, style):
res = FolderItem(
'root', {'name': '', 'folder': {'childCount': len(items)}}, style)
res.set_values(items)
return res
class DriveItemTreeView(sywidgets.HtmlTreeView):
selection_changed = QtCore.Signal(QtCore.QModelIndex)
fetch_requested = QtCore.Signal(QtCore.QModelIndex)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.setContextMenuPolicy(
QtCore.Qt.ContextMenuPolicy.CustomContextMenu)
self.customContextMenuRequested.connect(self._context_menu)
self.expanded.connect(self.fetch_requested)
self._retry_action = QtGui.QAction('Load', parent=self)
def _context_menu(self, pos):
index = self.indexAt(pos)
if index.isValid():
item = self.model().get_item(index)
if item.child_count() > 0 and not item.fetched():
context_menu = QtWidgets.QMenu(self)
context_menu.addAction(self._retry_action)
def retry_triggered(*args):
self.fetch_requested.emit(index)
self._retry_action.triggered.connect(retry_triggered)
context_menu.exec(self.viewport().mapToGlobal(pos))
self._retry_action.triggered.disconnect(retry_triggered)
def selectionChanged(self, selected, deselected):
selected_indexes = selected.indexes()
index = QtCore.QModelIndex()
super().selectionChanged(selected, deselected)
if selected_indexes:
for selected_index in selected_indexes:
index = selected_index
self.selection_changed.emit(index)
class AsyncGraphNode(utils.AsyncAzureNode):
azure_port_name = _graph_port_name
azure_port_display = 'Graph'
class DriveItemTreeModel(item_models.TreeModel):
def get_item(self, index):
return self._get_item(index)
def remove_items(self, items):
item_to_index = {
index: item for index, item in self._index_to_item.items()
if item in items}
self._clear_items()
self._index_to_item.update(item_to_index)
class AsyncDriveItemView(utils.AsyncAzureParameterView):
def __init__(self, node, ctx, azure_port, parent=None):
super().__init__(node, ctx, azure_port, parent=parent)
self.ctx = ctx
self._token = None
self._data = {}
if azure_port.is_valid():
self._data = azure_port.data
self._drive_item = ctx.parameters[_drive_item_parameter]
self._drive = ctx.parameters[_drive_parameter]
label = ''
current_option = utils.get_current_option(self._drive_item)
if current_option:
label = next(iter(current_option.values()))
root_data = []
if check_folder(self._data):
root_data = [self._data]
self._root = _create_root(root_data, self.style())
self._model = DriveItemTreeModel(self._root)
self._tree_view = DriveItemTreeView()
self._tree_view.setHeaderHidden(True)
self._tree_view.setModel(self._model)
self._status_label = QtWidgets.QLabel()
self._set_status(label)
self._requests = {}
self._timeout = 4
layout = QtWidgets.QVBoxLayout()
self._tree_view.fetch_requested.connect(self._handle_fetch_tree)
self._tree_view.selection_changed.connect(
self._handle_selection_changed)
self.setLayout(layout)
layout.addWidget(self._tree_view)
layout.addWidget(self._status_label)
def _handle_token(self, token):
super()._handle_token(token)
self._token = token
def _set_item_status(self, index, item, status):
item.set_status(status)
self._model.dataChanged.emit(
index, index, [
QtCore.Qt.ItemDataRole.DisplayRole,
QtCore.Qt.ItemDataRole.DecorationRole])
def _handle_fetch_tree(self, index):
item = self._model.get_item(index)
data = item.data(DataRole)
fetched = item.data(FetchedRole)
if not self._token:
self._set_item_status(index, item, False)
elif not fetched:
folder_id = data['id']
drive_id = data['parentReference']['driveId']
resource = api.drive_item_children.resource(
drive_id=drive_id, item_id=folder_id)
if resource not in self._requests:
with self._azure.connected_client(self._token_manager):
delayed = self._azure.get_json_all(resource)
self._worker_manager.add_delayed_request(
resource, delayed, timeout=self._timeout)
self._requests[resource] = ('fetch_folder', item)
def _handle_response(self, resource, ok, result):
resource_data = self._requests.pop(resource)
if resource_data:
action = resource_data[0]
if action == 'fetch_folder':
item = resource_data[1]
path = item.data(PathRole)
index = QtCore.QModelIndex()
for p in path:
index = self._model.index(p, 0, parent=index)
self._set_item_status(index, item, ok)
child_count = item.child_count()
if ok and child_count:
child_items = [item.child(i) for i in range(child_count)]
self._model.beginRemoveRows(index, 0, child_count - 1)
item.remove_children()
self._model.endRemoveRows()
self._model.remove_items(child_items)
children_data = result['value']
self._model.beginInsertRows(
index, 0, len(children_data) - 1)
item.set_children(children_data)
self._model.endInsertRows()
def _set_status(self, path):
if path:
self._status_label.setText(f"<b>Drive Item</b>: {path}")
else:
self._status_label.setText('No item selected')
def _handle_selection_changed(self, index):
if index.isValid():
item = self._model.get_item(index)
data = item.data(DataRole)
drive_id = data['parentReference']['driveId']
drive = {drive_id: self._data['name']}
self._drive.value = drive_id
self._drive.adjust(drive)
drive_item_id = data['id']
names = []
curr = item
while curr:
names.append(curr.name())
curr = curr.parent()
name = '/'.join(reversed(names))
drive_item = {drive_item_id: name}
self._drive_item.value = drive_item_id
self._drive_item.adjust(drive_item)
self._set_status(name)
else:
self._drive.adjust([])
self._drive.value = ''
self._drive_item.adjust([])
self._drive_item.value = ''
self._set_status('')
class ChoiceParameterSetter:
options = None
name = None
@classmethod
def set_definition(cls, parameter, **kwargs):
raise NotImplementedError()
@classmethod
def option_list(cls):
return [v.value for v in cls.options]
@classmethod
def get_parameter(cls, parameters):
return parameters[cls.name]
@classmethod
def get_option(cls, parameters):
return cls.options(parameters[cls.name].value)
class IfFileExistsOptions(enum.StrEnum):
skip = 'Skip'
overwrite = 'Overwrite'
raise_exc = 'Raise exception'
class IfFileExists(ChoiceParameterSetter):
options = IfFileExistsOptions
name = 'if_file_exists'
@classmethod
def set_definition(cls, parameters, **kwargs):
parameters.set_string(
cls.name, label='If file already exists',
value=cls.options.skip.value,
description=('What to do if the file already exists'),
editor=node.editors.combo_editor(
options=cls.option_list()))
class IfFolderExistsOptions(enum.StrEnum):
skip = 'Skip'
raise_exc = 'Raise exception'
class IfFolderExists(ChoiceParameterSetter):
options = IfFolderExistsOptions
name = 'if_folder_exists'
@classmethod
def set_definition(cls, parameters, **kwargs):
parameters.set_string(
cls.name, label='If folder already exists',
value=cls.options.skip.value,
description=('What to do if the folder already exists'),
editor=node.editors.combo_editor(
options=cls.option_list()))
class LinkTypeOptions(enum.StrEnum):
view = 'view'
edit = 'edit'
embed = 'embed'
class LinkType(ChoiceParameterSetter):
options = LinkTypeOptions
name = 'link_type'
@classmethod
def set_definition(cls, parameters, **kwargs):
parameters.set_string(
cls.name, label='Type',
value=cls.options.view.value,
description=(
'Type options, see '
'https://docs.microsoft.com/en-us/graph/api/'
'driveitem-createlink#link-types'),
editor=node.editors.combo_editor(
options=cls.option_list()))
class LinkScopeOptions(enum.StrEnum):
anonymous = 'anonymous'
organization = 'organization'
class LinkScope(ChoiceParameterSetter):
options = LinkScopeOptions
name = 'link_scope'
@classmethod
def set_definition(cls, parameters, **kwargs):
parameters.set_string(
cls.name, label='Scope',
value=cls.options.organization.value,
description=(
'Scope options, see '
'https://docs.microsoft.com/en-us/graph/api/'
'driveitem-createlink#scope-types'),
editor=node.editors.combo_editor(
options=cls.option_list()))
def _isoformat_to_datetime(isoformat):
utc = False
modified = isoformat.rstrip('zZ')
if isoformat != modified:
utc = True
isoformat = modified
prefix, _, suffix = isoformat.partition('.')
if suffix:
suffix = (suffix + '0' * 6)[:6]
isoformat = f'{prefix}.{suffix}'
dt = datetime.fromisoformat(isoformat)
if utc:
dt = dt.replace(tzinfo=UTC)
return dt
[docs]
class SelectDriveItemAsyncGraph(node.Node):
"""
Select :ref:`graph_drive_item_365` in Folder specified by the
:ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365`
input.
Allows interactive selection of a Drive Item contained under the Folder
Item passed as :ref:`graph_data_365` input. For example the Root Folder of
a Drive.
Outputs :ref:`Graph port <graph_port_365>` with the selected
Drive Item as :ref:`graph_data_365`.
"""
name = 'Select Drive Item in Folder'
nodeid = 'com.sympathyfordata.azure.graph.driveitem.select'
related = ['com.sympathyfordata.azure.graph.drive.root']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_select.svg'
parameters = node.parameters()
set_drive_parameter(parameters)
set_drive_item_parameter(parameters)
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def exec_parameter_view(self, ctx):
return AsyncDriveItemView(self, ctx, ctx.input[_graph_port_name])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
drive_id = ctx.parameters[_drive_parameter].value
item_id = ctx.parameters[_drive_item_parameter].value
if not (drive_id or item_id):
raise SyConfigurationError('A Drive Item should be selected')
with in_graph.connected(self):
data = api.drive_item.get(
in_graph, _drive_item(drive_id, item_id)).run()
propagate_graph(in_graph, [out_graph])
out_graph.data = data
[docs]
class Graph(node.Node):
"""
Create a new Graph connection.
This is the starting point for processing of data from SharePoint or
OneDrive via :ref:`sympathy_graph_365`.
Optional name can be set to support multiple, different, logins.
One login for each name.
See :ref:`graph_port_365` for details about the Graph output port.
"""
name = 'Graph 365'
nodeid = 'com.sympathyfordata.azure.graph.connection'
icon = 'node_graph.svg'
tags = Tags(Tag.Azure.Graph)
parameters = node.parameters()
utils_parameters.set_azure_connection(
parameters, 'Graph',
description='Azure credentials for Sympathy Azure Toolkit')
outputs = Ports([graph_port()])
def execute(self, ctx):
connection = ctx.parameters[
utils_parameters.azure_connection_name].value
out_graph = ctx.output[_graph_port_name]
out_graph.set({
'connection': connection,
'data': None,
})
with out_graph.connected(self):
out_graph.ensure_token()
[docs]
class GetResourceGraph(node.Node):
"""
Get Custom Resource by specifying the path and query part of the URL.
For example, to output the OneDrive Drive, enter `/me/drive`.
Outputs :ref:`Graph port <graph_port_365>` with the output
as :ref:`graph_data_365`.
"""
name = 'Get Custom Resource'
nodeid = 'com.sympathyfordata.azure.graph.resource.get'
related = ['com.sympathyfordata.azure.graph.connection']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph.svg'
parameters = node.parameters()
parameters.set_string(
'resource',
value='',
label='Resource',
editor=node.editors.lineedit_editor(
placeholder='Required'))
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
resource = ctx.parameters['resource'].value
if not resource:
raise SyConfigurationError('Resource should not be empty')
with in_graph.connected(self):
res = in_graph.get_json_all(resource).run()
propagate_graph(in_graph, [out_graph])
out_graph.data = res
[docs]
class DownloadFileItemGraph(node.Node):
"""
Download File specified by the :ref:`File Item <graph_drive_item_365>`
passed as :ref:`graph_data_365` input to configured output directory.
Output Datasource pointing to downloaded file.
Optionally outputs :ref:`Graph port <graph_port_365>` with File Item as
:ref:`graph_data_365`.
"""
name = 'Download File'
nodeid = 'com.sympathyfordata.azure.graph.file.download'
related = ['com.sympathyfordata.azure.graph.driveitem.select']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_file.svg'
parameters = node.parameters()
set_output_directory(
parameters,
description='Select output directory for downloaded files.')
IfFileExists.set_definition(parameters)
inputs = Ports([graph_port()])
outputs = Ports([graph_port(opt='in'),
Port.Datasource('File', name='file')])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_file = ctx.output['file']
out_graphs = ctx.output.group(_graph_port_name)
if_exists = IfFileExists.get_option(ctx.parameters)
item = in_graph.data
ensure_file(item)
directory = pathlib.Path(ctx.parameters['directory'].value)
item_name = item['name']
item_path = directory / item_name
if item_path.is_file():
if if_exists == IfFileExists.options.skip:
print(f'File {item_path} already exists, skip download')
elif if_exists == IfFileExists.options.raise_exc:
raise SyDataError(f'File {item_path} already exists')
item_id = item['id']
drive_id = item['parentReference']['driveId']
with in_graph.connected(self):
with item_path.open(mode='wb') as f:
for seg in api.drive_item_content.get(
in_graph, _drive_item(drive_id, item_id)).run():
f.write(seg)
item_file_system_info = item.get('fileSystemInfo')
if item_file_system_info:
modified_iso = item_file_system_info.get(
'lastModifiedDateTime')
if modified_iso:
modified_dt = _isoformat_to_datetime(modified_iso)
modified = modified_dt.timestamp()
os.utime(str(item_path), times=(time.time(), modified))
out_file.encode_path(str(item_path))
propagate_graph(in_graph, out_graphs)
def expiration(data):
return _isoformat_to_datetime(data['expirationDateTime'])
def range_header(start, chunk_size, file_size):
assert chunk_size > 0
end = start + chunk_size - 1
return {
'Content-Range': f'bytes {start}-{end}/{file_size}',
'Content-Length': str(chunk_size),
}
def is_complete(response):
return _has_all_keys(response, ['id', 'name', 'size', 'file'])
def has_ranges(response):
return _has_all_keys(
response,
["expirationDateTime", "nextExpectedRanges"])
[docs]
class UploadFileGraph(node.Node):
"""
Upload File to the Folder specified by the
:ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365`
input.
Optionally outputs :ref:`Graph port <graph_port_365>` with the Folder Item
as :ref:`graph_data_365`.
"""
name = 'Upload File'
nodeid = 'com.sympathyfordata.azure.graph.file.upload'
related = ['com.sympathyfordata.azure.graph.driveitem.select']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_file.svg'
parameters = node.parameters()
IfFileExists.set_definition(parameters)
inputs = Ports([graph_port(),
Port.Datasource('File', name='file')])
outputs = Ports([graph_port(opt='out')])
def _upload_chunks(self, in_graph, path, session):
upload_url = session['uploadUrl']
expiration_dt = expiration(session)
file_size = path.stat().st_size
# https://docs.microsoft.com/en-us/graph/api/driveitem-
# createuploadsession?view=graph-rest-1.0#upload-bytes-to-the-
# upload-session
base_chunk_size = 2 ** 10 * 320
# 3840 KiB
max_chunk_size = base_chunk_size * 12
start = 0
chunk_size = min(file_size, max_chunk_size)
end = file_size
done = file_size == 0
res = None
with path.open(mode='rb') as f:
while not done:
if datetime.now(UTC) > expiration_dt:
break
f.seek(start)
res = _put_upload_range_data(
in_graph,
upload_url,
f.read(chunk_size),
headers=range_header(start, chunk_size, file_size)).run()
if is_complete(res):
break
elif has_ranges(res):
expiration_dt = expiration(res)
next_expected_ranges = res["nextExpectedRanges"]
for next_range in next_expected_ranges:
start, end = next_range.split('-')
start = int(start)
if end:
end = int(end) + 1
else:
end = file_size
chunk_size = min(end - start, max_chunk_size)
break
else:
raise AssertionError()
return res
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
in_file = ctx.input['file']
out_graphs = ctx.output.group(_graph_port_name)
if_exists = IfFileExists.get_option(ctx.parameters)
item = in_graph.data
conflict_behavior = 'fail'
if if_exists == IfFileExists.options.overwrite:
conflict_behavior = 'replace'
ensure_folder(item)
item_id = item['id']
drive_id = item['parentReference']['driveId']
path = pathlib.Path(in_file.decode_path())
filename = path.name
res = None
with in_graph.connected(self):
body = {
"item": {
"@microsoft.graph.conflictBehavior": conflict_behavior,
},
}
try:
session = api.drive_item_upload_session.post(
in_graph,
_drive_item_file(drive_id, item_id, filename),
body).run()
except utils.ResponseError as r:
if (if_exists == IfFileExists.options.skip and
utils.get_response_code(r) == 'nameAlreadyExists'):
print(f'Item named {filename} already exists, skip upload')
res = api.drive_item_relative_file.get(
in_graph,
_drive_item_file(drive_id, item_id, filename)).run()
else:
raise
else:
res = self._upload_chunks(in_graph, path, session)
assert res and is_complete(res)
propagate_graph(in_graph, out_graphs)
if out_graphs:
out_graphs[0].data = res
[docs]
class CheckoutFileGraph(node.Node):
"""
Check out the File specified by the
:ref:`File Item <graph_drive_item_365>` passed as :ref:`graph_data_365`
input.
Optionally outputs :ref:`Graph port <graph_port_365>`
"""
name = 'Check out File'
nodeid = 'com.sympathyfordata.azure.graph.file.checkout'
related = [
'com.sympathyfordata.azure.graph.driveitem.select',
'com.sympathyfordata.azure.graph.file.checkin'
]
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_check.svg'
parameters = node.parameters()
inputs = Ports([graph_port()])
outputs = Ports([graph_port(opt='out')])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graphs = ctx.output.group(_graph_port_name)
item = in_graph.data
ensure_file(item)
item_id = item['id']
drive_id = item['parentReference']['driveId']
with in_graph.connected(self):
body = None
api.drive_item_checkout.post(
in_graph, _drive_item(drive_id, item_id),
body, statuses=[http.HTTPStatus.NO_CONTENT],
response_type=None).run()
propagate_graph(in_graph, out_graphs)
[docs]
class CheckInFileGraph(node.Node):
"""
Check in the File specified by the
:ref:`File Item <graph_drive_item_365>` passed as :ref:`graph_data_365`
input.
Optionally outputs :ref:`Graph port <graph_port_365>`
"""
name = 'Check in File'
nodeid = 'com.sympathyfordata.azure.graph.file.checkin'
related = ['com.sympathyfordata.azure.graph.driveitem.select']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_check.svg'
parameters = node.parameters()
parameters.set_string(
'comment', label='Comment',
description='Comment for the checked in version.',
value='')
parameters.set_boolean(
'check_in_as_published', label='Check in as published',
value=False,
description='Published status after check in')
inputs = Ports([graph_port()])
outputs = Ports([graph_port(opt='out')])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graphs = ctx.output.group(_graph_port_name)
item = in_graph.data
ensure_file(item)
comment = ctx.parameters['comment'].value
item_id = item['id']
drive_id = item['parentReference']['driveId']
with in_graph.connected(self):
body = {
"comment": comment,
}
if ctx.parameters['check_in_as_published'].value:
body["checkInAs"] = 'published'
api.drive_item_checkin.post(
in_graph, _drive_item(drive_id, item_id),
body, statuses=[http.HTTPStatus.NO_CONTENT],
response_type=None).run()
propagate_graph(in_graph, out_graphs)
[docs]
class CreateFileLinkGraph(node.Node):
"""
Create File Link to the File specified by the
:ref:`File Item <graph_drive_item_365>` passed as :ref:`graph_data_365`
input.
Optionally outputs :ref:`Graph port <graph_port_365>` with the
Link Item as :ref:`graph_data_365`.
"""
name = 'Create File Link'
nodeid = 'com.sympathyfordata.azure.graph.file.createlink'
related = ['com.sympathyfordata.azure.graph.driveitem.select']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_create.svg'
parameters = node.parameters()
LinkType.set_definition(parameters)
LinkScope.set_definition(parameters)
utils_parameters.set_secret_connection(
parameters,
name='password',
label='Password',
description='Optional password, empty means no password.')
parameters.set_integer(
'expire_after',
label='Expire after (days)',
description=('Set expiration time of link (days) from time of '
'creation (when the node is run).\n'
'0 means no expiration or organization limit'),
value=30)
inputs = Ports([graph_port()])
outputs = Ports([graph_port(opt='in'),
Port.Text('Web URL', name='web_url')])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_url = ctx.output['web_url']
out_graphs = ctx.output.group(_graph_port_name)
password_connection = ctx.parameters['password'].value
expire_after = ctx.parameters['expire_after'].value
link_type = LinkType.get_option(ctx.parameters)
link_scope = LinkScope.get_option(ctx.parameters)
item = in_graph.data
ensure_file(item)
body = {
"type": link_type.value,
"scope": link_scope.value,
}
password = password_connection.resource
if password:
if (password_connection.credentials.mode ==
credentials.secrets_mode):
secrets = credentials.get_secret_credentials(
self, password_connection)
password = credentials.expand_secrets(
password, secrets)
body["password"] = password
if expire_after:
now = datetime.now(UTC)
now += timedelta(days=expire_after)
body["expirationDateTime"] = now.strftime('%Y-%m-%dT%H:%M:%SZ')
item_id = item['id']
drive_id = item['parentReference']['driveId']
with in_graph.connected(self):
data = api.drive_item_create_link.post(
in_graph, _drive_item(drive_id, item_id), body).run()
propagate_graph(in_graph, out_graphs)
if out_graphs:
out_graphs[0].data = data
out_url.set(data['link']['webUrl'])
[docs]
class DeleteFileItemGraph(node.Node):
"""
Delete File specified by the
:ref:`File Item <graph_drive_item_365>` passed as :ref:`graph_data_365`
input.
Optionally outputs :ref:`Graph port <graph_port_365>`
"""
name = 'Delete File in Graph'
nodeid = 'com.sympathyfordata.azure.graph.file.delete'
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_delete.svg'
parameters = node.parameters()
_set_confirm_delete_parameter(parameters)
inputs = Ports([graph_port()])
outputs = Ports([graph_port(opt='out')])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graphs = ctx.output.group(_graph_port_name)
item = in_graph.data
ensure_file(item)
item_id = item['id']
drive_id = item['parentReference']['driveId']
with in_graph.connected(self):
if not ctx.parameters[_confirm_delete_parameter].value:
print('Delete file, dry run:')
try:
item = api.drive_item.get(
in_graph, _drive_item(drive_id, item_id)).run()
print(f"Would remove file: {item['name']}")
except Exception:
print('Specified file does not exist')
else:
api.drive_item.delete(
in_graph,
_drive_item(drive_id, item_id)).run()
propagate_graph(in_graph, out_graphs)
[docs]
class DeleteFolderItemGraph(node.Node):
"""
Delete Folder specified by the
:ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365`
input.
Optionally outputs :ref:`Graph port <graph_port_365>`
"""
name = 'Delete Folder'
nodeid = 'com.sympathyfordata.azure.graph.folder.delete'
related = ['com.sympathyfordata.azure.graph.driveitem.select']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_delete.svg'
parameters = node.parameters()
_set_confirm_delete_parameter(parameters)
inputs = Ports([graph_port()])
outputs = Ports([graph_port(opt='out')])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graphs = ctx.output.group(_graph_port_name)
item = in_graph.data
ensure_folder(item)
item_id = item['id']
drive_id = item['parentReference']['driveId']
with in_graph.connected(self):
if not ctx.parameters[_confirm_delete_parameter].value:
print('Delete folder, dry run:')
try:
item = api.drive_item.get(
in_graph, _drive_item(drive_id, item_id)).run()
print(f"Would remove folder: {item['name']}")
except Exception:
print('Specified folder does not exist')
else:
item = api.drive_item.delete(
in_graph, _drive_item(drive_id, item_id)).run()
propagate_graph(in_graph, out_graphs)
[docs]
class CreateFolderItemGraph(node.Node):
"""
Create Folder in the Folder specified by the
:ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365`
input.
Optionally outputs :ref:`Graph port <graph_port_365>` with the
Folder Item as :ref:`graph_data_365`.
"""
name = 'Create Folder'
nodeid = 'com.sympathyfordata.azure.graph.folder.create'
related = ['com.sympathyfordata.azure.graph.driveitem.select']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_create.svg'
parameters = node.parameters()
IfFolderExists.set_definition(parameters)
_set_name_parameter(parameters, description='Set folder name')
inputs = Ports([graph_port()])
outputs = Ports([graph_port(opt='out')])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graphs = ctx.output.group(_graph_port_name)
if_exists = IfFolderExists.get_option(ctx.parameters)
name = ctx.parameters['name'].value
if not name:
raise SyConfigurationError('Name should not be empty')
item = in_graph.data
ensure_folder(item)
item_id = item['id']
drive_id = item['parentReference']['driveId']
with in_graph.connected(self):
conflict_behavior = 'fail'
res = None
if if_exists == IfFolderExists.options.skip:
# https://github.com/OneDrive/onedrive-api-docs/issues/1144
# Only fileSystemInfo gets modified, the folder is not
# re-created.
conflict_behavior = 'replace'
# Skip could also be implemented using 'fail' with a follow up
# query for the named item.
body = {
"@microsoft.graph.conflictBehavior": conflict_behavior,
"folder": {"childCount": 0},
"name": name,
}
res = api.drive_item_children.post(
in_graph, _drive_item(drive_id, item_id), body).run()
propagate_graph(in_graph, out_graphs)
if out_graphs:
out_graphs[0].data = res
[docs]
class DriveGraph(node.Node):
"""
Get OneDrive Drive.
Outputs :ref:`Graph port <graph_port_365>` with the Drive as
:ref:`graph_data_365`.
"""
name = 'Get OneDrive Drive'
nodeid = 'com.sympathyfordata.azure.graph.drive.get'
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_get.svg'
parameters = node.parameters()
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
with in_graph.connected(self):
data = api.me_drive.get(in_graph, {}).run()
propagate_graph(in_graph, [out_graph])
out_graph.data = data
[docs]
class SiteGraph(AsyncGraphNode):
"""
Select SharePoint Site.
Outputs :ref:`Graph port <graph_port_365>` with the Site as
:ref:`graph_data_365`.
"""
name = 'Select SharePoint Site'
nodeid = 'com.sympathyfordata.azure.graph.site.select'
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_select.svg'
parameters = node.parameters()
parameters.set_string(
_site_parameter,
value='',
label='SharePoint Site',
editor=node.editors.combo_editor(
options=[],
edit=True,
placeholder='Required',
include_empty=True),
description='Select site to output')
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
site_id = ctx.parameters[_site_parameter].value
if not site_id:
raise SyConfigurationError('SharePoint site should not be empty')
with in_graph.connected(self):
data = api.site.get(in_graph, dict(site_id=site_id)).run()
propagate_graph(in_graph, [out_graph])
out_graph.data = data
def async_request_azure_parameters(self, ctx, graph):
return True, {_site_parameter: api.sites_search.get(graph, {})}
def async_adjust_azure_parameters(self, ctx, responses):
return self._update_options_response(ctx, responses, _site_parameter)
def save_parameters(self, ctx):
self._save_current_option(ctx.parameters[_site_parameter])
[docs]
class SiteDriveGraph(AsyncGraphNode):
"""
Select Drive from SharePoint Site.
Outputs :ref:`Graph port <graph_port_365>` with the Drive as
:ref:`graph_data_365`.
"""
name = 'Select SharePoint Drive'
nodeid = 'com.sympathyfordata.azure.graph.site.drive.select'
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_select.svg'
parameters = node.parameters()
set_drive_parameter(parameters, 'Select drive to output')
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
drive_id = ctx.parameters[_drive_parameter].value
if not drive_id:
raise SyConfigurationError('Drive should not be empty')
with in_graph.connected(self):
data = api.drive.get(in_graph, dict(drive_id=drive_id)).run()
propagate_graph(in_graph, [out_graph])
out_graph.data = data
def async_request_azure_parameters(self, ctx, graph):
try:
site_id = ctx.input[_graph_port_name].data['id']
return True, {_drive_parameter: api.site_drives.get(
graph, dict(site_id=site_id))}
except Exception:
return False, (
'Graph data must contain a SharePoint Site')
def async_adjust_azure_parameters(self, ctx, responses):
return self._update_options_response(ctx, responses, _drive_parameter)
def save_parameters(self, ctx):
self._save_current_option(ctx.parameters[_drive_parameter])
[docs]
class DriveRoot(node.Node):
"""
Get Root :ref:`Folder Item <graph_drive_item_365>` from the Drive passed as
Json input.
Outputs :ref:`Graph port <graph_port_365>` with the Folder Item as
:ref:`graph_data_365`.
"""
name = 'Get Drive Root Folder'
related = [
'com.sympathyfordata.azure.graph.drive.get',
'com.sympathyfordata.azure.graph.site.drive.select']
nodeid = 'com.sympathyfordata.azure.graph.drive.root'
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_get.svg'
parameters = node.parameters()
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
item = in_graph.data
ensure_drive(item)
with in_graph.connected(self):
drive_id = item['id']
data = api.drive_root.get(in_graph, dict(drive_id=drive_id)).run()
propagate_graph(in_graph, [out_graph])
out_graph.data = data
def exclude_key(data, key):
data['value'] = [v for v in data['value'] if key not in v]
[docs]
class SearchDriveGraph(node.Node):
"""
Search for :ref:`Drive Items <graph_drive_item_365>` in Folder specified by
the :ref:`Folder Item <graph_drive_item_365>` passed as
:ref:`graph_data_365` input.
Outputs :ref:`Graph port <graph_port_365>` with a
:ref:`graph_response_list_365` of matched Drive Items as
:ref:`graph_data_365`.
See `Search for DriveItems within a drive
<https://docs.microsoft.com/en-us/graph/api/
driveitem-search>`_ for more details.
"""
name = 'Search for Drive Items in Folder'
nodeid = 'com.sympathyfordata.azure.graph.folder.search'
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_search.svg'
parameters = node.parameters()
parameters.set_string(
'query',
value='',
label='Query',
editor=node.editors.lineedit_editor(
placeholder='Required'),
description='Enter Drive Item query')
set_drive_item_output_parameters(parameters)
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
item = in_graph.data
query = ctx.parameters['query'].value
output_files = ctx.parameters['output_files'].value
output_folders = ctx.parameters['output_folders'].value
ensure_folder(item)
item_id = item['id']
drive_id = item['parentReference']['driveId']
if not query:
raise SyConfigurationError('Query should not be empty')
with in_graph.connected(self):
res = api.drive_item_search.get(
in_graph, dict(
drive_id=drive_id, item_id=item_id, query=query)).run()
if not output_files:
exclude_key(res, 'file')
if not output_folders:
exclude_key(res, 'folder')
propagate_graph(in_graph, [out_graph])
out_graph.data = res
[docs]
class SearchGraph(node.Node):
"""
Search for :ref:`Drive Items <graph_drive_item_365>` in OneDrive and
SharePoint.
Outputs :ref:`Graph port <graph_port_365>` with a
:ref:`graph_response_list_365` of matched Drive Items as
:ref:`graph_data_365`.
See `Use the Microsoft Search API to search content in OneDrive and
SharePoint <https://docs.microsoft.com/en-us/graph/search-concept-files>`_
for more details.
"""
name = 'Search for Drive Items'
nodeid = 'com.sympathyfordata.azure.graph.search'
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_search.svg'
parameters = node.parameters()
parameters.set_string(
'query',
value='',
label='Query',
editor=node.editors.lineedit_editor(
placeholder='Required'),
description='Enter Drive Item query')
set_drive_item_output_parameters(parameters)
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
query = ctx.parameters['query'].value
output_files = ctx.parameters['output_files'].value
output_folders = ctx.parameters['output_folders'].value
if not query:
raise SyConfigurationError('Query should not be empty')
with in_graph.connected(self):
body = {
"requests": [
{
"entityTypes": [
"driveItem"
],
"query": {
"queryString": query
},
}
]
}
res = api.search_query.post(
in_graph, {}, body).run()
value = res['value']
items = []
# Flatten items and remove hits info.
for item in value:
for container in item['hitsContainers']:
for hit in container['hits']:
items.append(hit['resource'])
res['value'] = items
if not output_files:
exclude_key(res, 'file')
if not output_folders:
exclude_key(res, 'folder')
propagate_graph(in_graph, [out_graph])
out_graph.data = res
[docs]
class SelectDataItemGraph(node.Node):
"""
Select named Item from :ref:`graph_response_list_365` with Items.
Outputs :ref:`Graph port <graph_port_365>` with selected item as
:ref:`graph_data_365`.
https://docs.microsoft.com/en-us/graph/api/driveitem-search
"""
name = 'Select Item from Response List'
nodeid = 'com.sympathyfordata.azure.graph.dataitems.select'
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_select.svg'
parameters = node.parameters()
_set_option_parameter(
parameters, _drive_item_parameter, 'Data Item',
'Select data item to output')
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def adjust_parameters(self, ctx):
items = {'value': []}
in_graph = ctx.input[_graph_port_name]
if in_graph.is_valid():
items = in_graph.data
return utils.update_options_response(
ctx, {_drive_item_parameter: items}, _drive_item_parameter)
def save_parameters(self, ctx):
utils.save_current_option(ctx.parameters[_drive_item_parameter])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
item_id = ctx.parameters[_drive_item_parameter].value
if not item_id:
raise SyConfigurationError('Data Item should not be empty')
response_list = in_graph.data
ensure_response_list(response_list)
items = {item['id']: item for item in response_list['value']}
propagate_graph(in_graph, [out_graph])
try:
out_graph.data = items[item_id]
except KeyError as exc:
out_graph.data = None
raise SyDataError(
"Selected Data Item does not exist in input"
) from exc
[docs]
class ListFolderGraph(node.Node):
"""
List :ref:`Drive Items <graph_drive_item_365>` in Folder specified by the
:ref:`Folder Item <graph_drive_item_365>` passed as :ref:`graph_data_365`
input.
Outputs :ref:`Graph port <graph_port_365>` with a
:ref:`graph_response_list_365` of Drive Items as :ref:`graph_data_365`.
"""
name = 'List Drive Items in Folder'
nodeid = 'com.sympathyfordata.azure.graph.folder.list'
related = ['com.sympathyfordata.azure.graph.driveitem.select']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_search.svg'
parameters = node.parameters()
set_drive_item_output_parameters(parameters)
inputs = Ports([graph_port()])
outputs = Ports([graph_port()])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_graph = ctx.output[_graph_port_name]
item = in_graph.data
output_files = ctx.parameters['output_files'].value
output_folders = ctx.parameters['output_folders'].value
ensure_folder(item)
value = []
res = {'value': value}
with in_graph.connected(self):
item_id = item['id']
drive_id = item['parentReference']['driveId']
res = api.drive_item_children.get(
in_graph,
_drive_item(drive_id, item_id)).run()
if not output_files:
exclude_key(res, 'file')
if not output_folders:
exclude_key(res, 'folder')
propagate_graph(in_graph, [out_graph])
out_graph.data = res
[docs]
class GraphDataToJson(node.Node):
"""
Output Data from :ref:`graph_port_365` as Json.
Makes it possible to work with the (response) data from
:ref:`sympathy_graph_365` using Json nodes.
Optional output propagates input :ref:`Graph port <graph_port_365>`
to output.
"""
name = 'Graph Data to Json'
nodeid = 'com.sympathyfordata.azure.graph.data.tojson'
related = ['com.sympathyfordata.azure.graph.data.fromjson']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_json.svg'
inputs = Ports([graph_port()])
outputs = Ports([Port.Json('Graph Data', name='data'),
graph_port(opt='in')])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
out_data = ctx.output['data']
item = in_graph.data
out_data.set(item)
propagate_graph(in_graph, ctx.output.group(_graph_port_name))
[docs]
class GraphDataFromJson(node.Node):
"""
Set Graph Data from as Json as :ref:`graph_port_365`.
Makes it possible to work with the (response) data from
:ref:`sympathy_graph_365` using Json nodes.
Outputs :ref:`Graph port <graph_port_365>` with the Json data as
:ref:`graph_data_365`.
"""
name = 'Graph Data from Json'
nodeid = 'com.sympathyfordata.azure.graph.data.fromjson'
related = ['com.sympathyfordata.azure.graph.data.tojson']
tags = Tags(Tag.Azure.Graph)
icon = 'node_graph_json.svg'
inputs = Ports([graph_port(), Port.Json('Graph Data', name='data')])
outputs = Ports([graph_port()])
def execute(self, ctx):
in_graph = ctx.input[_graph_port_name]
in_data = ctx.input['data']
out_graph = ctx.output[_graph_port_name]
propagate_graph(in_graph, [out_graph])
out_graph.data = in_data.get()