Source code for node_powerbi

# This file is part of Sympathy for Data.
# Copyright (c) 2021 Combine Control Systems
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
import time
import contextlib
from sympathy.api import qt2 as qt_compat
from sympathy.api import node
from sympathy.api.nodeconfig import Tag, Tags, Ports, Port
from sympathy.api.exceptions import SyConfigurationError, SyDataError
from sympathy.utils import parameters as utils_parameters
from sylib_azure.clients.powerbi import powerbi
from sylib_azure import request, utils
QtWidgets = qt_compat.import_module('QtWidgets')
QtCore = qt_compat.import_module('QtCore')


_powerbi_port_name = 'powerbi'
_workspace_port_name = 'workspace'
_dataset_port_name = 'dataset'
_table_port_name = 'table'

_name_parameter = 'name'
_workspace_parameter = 'workspace'
_dataset_parameter = 'dataset'
_table_parameter = 'table'
_confirm_parameter = 'confirm'


def powerbi_port(opt=None):
    n = utils.opt_n(opt)
    return powerbi.PowerBiPort('PowerBi', name=_powerbi_port_name, n=n)


def workspace_port(opt=None):
    n = utils.opt_n(opt)
    return Port.Custom('table', 'Workspaces', name=_workspace_port_name, n=n)


def dataset_port(opt=None):
    n = utils.opt_n(opt)
    return Port.Custom('table', 'Datasets', name=_dataset_port_name, n=n)


def table_port(opt=None):
    n = utils.opt_n(opt)
    return Port.Custom('table', 'Table', name=_table_port_name, n=n)


def set_datasets_table(table, datasets, dataset_id=None):
    cols = ['id', 'name']
    utils.set_values_table(table, cols, datasets, selected_id=dataset_id)


def set_workspaces_table(table, workspaces, workspace_id=None):
    cols = ['id', 'name']
    utils.set_values_table(table, cols, workspaces, selected_id=workspace_id)


def propagate_powerbi(input, output):
    if output:
        output[0].set(input.get())


# https://powerbi.microsoft.com/en-us/blog/newdatasets/
# https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/
# entity-data-model-primitive-data-types
types = {
    'f': 'Double',
    'i': 'Int64',
    'M': 'Datetime',
    'm': 'Double',
    'U': 'String',
    'b': 'Bool',
}


def type_info(dtype):
    return types[dtype.kind]


def column_info(table):
    res = []
    for column in table.column_names():
        edm_type = 'String'
        column_type = table.column_type(column)
        try:
            edm_type = type_info(column_type)
        except KeyError as exc:
            raise SyDataError(
                f"Unsupported type {str(column_type)} in column {column}"
            ) from exc
        res.append({
            'name': column,
            'dataType': edm_type})
    return res


def table_info(name, table):
    columns = column_info(table)
    return {
        'name': name,
        'columns': columns,
    }


def table_rows(table):
    names = table.column_names()
    columns = []
    for name in names:
        column = table[name]
        if table.column_type(name).kind == 'M':
            column = np.datetime_as_string(column)
        elif table.column_type(name).kind == 'm':
            # Converted to decimal seconds.
            column = column.astype('timedelta64[us]').astype('f8') / 1000000
        columns.append(column.tolist())
    rows = []

    for values in zip(*columns):
        row = {}
        for name, value in zip(names, values):
            row[name] = value
        rows.append(row)
    return rows


class PowerBiError(SyDataError):
    pass


class MissingItemError(PowerBiError):
    def __init__(self, msg='Selected item does not exist'):
        super().__init__(msg)


class MissingDatasetError(MissingItemError):
    def __init__(self, msg='Active dataset does not exist'):
        super().__init__(msg=msg)


class MissingWorkspaceError(MissingItemError):
    def __init__(self, msg='Active workspace does not exist'):
        super().__init__(msg=msg)


class Errors:
    raise_ = 'raise'
    print_ = 'print'
    ignore = 'ignore'


@contextlib.contextmanager
def item_request(*, code=Errors.raise_, other=Errors.raise_,
                 cls=MissingItemError):
    try:
        yield
    except request.ResponseError as r:
        if r.code == 401:
            if code == Errors.raise_:
                raise cls() from r
            elif code == Errors.print_:
                print(str(cls()))
            elif code == Errors.ignore_:
                pass
            else:
                raise AssertionError() from r
        else:
            if other == Errors.raise_:
                raise
            elif other == Errors.print_:
                print(str(r))
            elif code == Errors.ignore_:
                pass
            else:
                raise AssertionError() from r


@contextlib.contextmanager
def dataset_request(**kwargs):
    with item_request(cls=MissingDatasetError, **kwargs):
        yield


@contextlib.contextmanager
def workspace_request(**kwargs):
    with item_request(cls=MissingWorkspaceError, **kwargs):
        yield


def delete_workspace(powerbi, workspace_id, **kwargs):
    with workspace_request(**kwargs):
        powerbi.delete_workspace(workspace_id).run()


def delete_dataset(powerbi, dataset_id, **kwargs):
    with dataset_request(**kwargs):
        return powerbi.delete_dataset(dataset_id).run()


def get_datasets(powerbi, **kwargs):
    res = {}
    with workspace_request(**kwargs):
        res = powerbi.get_datasets().run()
    return res


def get_dataset_name(datasets, dataset_id):
    named_ids = utils.objects_to_named_ids(datasets)
    return named_ids.get(dataset_id)


class AsyncPowerBiNode(utils.AsyncAzureNode):

    azure_port_name = _powerbi_port_name
    azure_port_display = 'PowerBi'


[docs] class PowerBi(node.Node): """ Create a new PowerBi connection with My workspace as the active workspace. Optional name can be set to support multiple, different, logins. One login for each name. See :ref:`powerbi_port` for details about the PowerBi output port. """ name = 'PowerBi' nodeid = 'com.sympathyfordata.azure.powerbi.connection' icon = 'node_powerbi.svg' tags = Tags(Tag.Azure.PowerBi) parameters = node.parameters() utils_parameters.set_azure_connection( parameters, 'PowerBi', description='Azure credentials for Sympathy PowerBi Toolkit') outputs = Ports([powerbi_port()]) def execute(self, ctx): connection = ctx.parameters[ utils_parameters.azure_connection_name].value out_powerbi = ctx.output[_powerbi_port_name] out_powerbi.set({ 'connection': connection, 'workspace': None, 'workspace_name': None, 'dataset': None, 'dataset_name': None, }) with out_powerbi.connected(self): out_powerbi.ensure_token()
[docs] class GetWorkspacesPowerBi(node.Node): """ Get all available workspaces from the PowerBi input and output them as a table containing two columns: `id` and `name`. Has an :ref:`optional PowerBi output <powerbi_port>` """ name = 'Get Workspaces as Table' nodeid = 'com.sympathyfordata.azure.powerbi.workspaces.table' tags = Tags(Tag.Azure.PowerBi) icon = 'node_powerbi_table.svg' parameters = node.parameters() inputs = Ports([powerbi_port()]) outputs = Ports([powerbi_port(opt='in'), workspace_port()]) def execute(self, ctx): in_powerbi = ctx.input[_powerbi_port_name] out_workspace = ctx.output[_workspace_port_name] with in_powerbi.connected(self): workspaces = in_powerbi.get_workspaces().run() set_workspaces_table(out_workspace, workspaces) out_workspace.set_name('Workspaces') propagate_powerbi(in_powerbi, ctx.output.group(_powerbi_port_name))
[docs] class SetWorkspacePowerBi(AsyncPowerBiNode): """ Sets the active workspace in the PowerBi output for use in following nodes. Workspace is stored using its unique id, see :ref:`powerbi_workspace`. If the configured workspace is missing the id will show up. When editing instead of choosing from existing alternatives, specify the id. :ref:`Propagates PowerBi input to output <powerbi_port>` with the following changes: - Active workspace is set/replaced. - Active dataset is cleared. """ name = 'Set active Workspace' nodeid = 'com.sympathyfordata.azure.powerbi.workspaces.set' tags = Tags(Tag.Azure.PowerBi) icon = 'node_powerbi_workspace_active.svg' parameters = node.parameters() parameters.set_string( _workspace_parameter, value='', label='Workspace', editor=node.editors.combo_editor( options=[], edit=True, placeholder='Required', include_empty=True), description=( 'Choose workspace to activate, required.\n' 'Shows workspace names, when available, ' 'but stores id.\n\n' 'When workspaces are missing: ensure that the previous node is ' 'executed or type an' '\n' 'existing id.')) inputs = Ports([powerbi_port()]) outputs = Ports([powerbi_port()]) def async_request_azure_parameters(self, ctx, powerbi): return True, {_workspace_parameter: powerbi.get_workspaces()} def async_adjust_azure_parameters(self, ctx, responses): options = responses.get(_workspace_parameter) or {} self._update_options(ctx.parameters[_workspace_parameter], options) return bool(options), {_workspace_parameter} def save_parameters(self, ctx): self._save_current_option(ctx.parameters[_workspace_parameter]) def execute(self, ctx): in_powerbi = ctx.input[_powerbi_port_name] out_powerbi = ctx.output[_powerbi_port_name] workspace_id = ctx.parameters[_workspace_parameter].value workspace_name = None if not workspace_id: raise SyConfigurationError( 'Workspace must not be empty') with in_powerbi.connected(self): workspaces = in_powerbi.get_workspaces().run() named_ids = utils.objects_to_named_ids(workspaces) if workspace_id not in named_ids: raise MissingWorkspaceError( 'Selected workspace does not exist') else: workspace_name = named_ids[workspace_id] propagate_powerbi(in_powerbi, [out_powerbi]) out_powerbi.workspace = workspace_id out_powerbi.workspace_name = workspace_name out_powerbi.dataset = None
[docs] class CreateWorkspacePowerBi(node.Node): """ Creates a new :ref:`powerbi_workspace` with chosen name. Duplicate workspace names are not allowed, so you must make sure to enter a unique and unused name or select a fallback option (`When exists`) to determine what happens if a workspace with the same name already exists. :ref:`Propagates PowerBi input to output <powerbi_port>` with the following changes: - Active workspace is set/replaced to the newly created or existing workspace. - Active dataset is cleared. """ name = 'Create Workspace' nodeid = 'com.sympathyfordata.azure.powerbi.workspaces.create' tags = Tags(Tag.Azure.PowerBi) icon = 'node_powerbi_workspace.svg' inputs = Ports([powerbi_port()]) outputs = Ports([powerbi_port()]) parameters = node.parameters() parameters.set_string( _name_parameter, value='', label='Workspace name', editor=node.editors.lineedit_editor(placeholder='Required'), description='Workspace name, required. It may not be empty') _exist_options = ['use', 'error'] _use, _error = _exist_options parameters.set_string( 'exists', value=_use, label='When exists', editor=node.editors.combo_editor( options={_use: 'Use existing', _error: 'Error'}), description='Action when workspace name already exists') def execute(self, ctx): in_powerbi = ctx.input[_powerbi_port_name] out_powerbi = ctx.output[_powerbi_port_name] name = ctx.parameters[_name_parameter].value exists = ctx.parameters['exists'].value parameters = {'filter': f"name eq '{name}'"} if not name: raise SyConfigurationError('Workspace name may not be empty') workspaces = {'value': []} with in_powerbi.connected(self): try: in_powerbi.post_workspace(name).run() workspaces = in_powerbi.get_workspaces(**parameters).run() except request.ResponseError as r: if utils.get_response_code(r) == 'PowerBIEntityAlreadyExists': if exists == self._use: workspaces = in_powerbi.get_workspaces( **parameters).run() elif exists == self._error: raise SyDataError( f"Workspace {name} already exists" ) from r else: raise SyConfigurationError( f"Unknown handling for existing workspace name: " f"{exists}" ) from r value = workspaces['value'] if not len(value) == 1: raise SyDataError(f'Expected 1 workspace, got {len(value)}') propagate_powerbi(in_powerbi, [out_powerbi]) out_powerbi.workspace = value[0]['id'] out_powerbi.workspace_name = value[0]['name']
[docs] class DeleteWorkspacePowerBi(node.Node): """ Delete the active :ref:`powerbi_workspace`. This can destroy a lot of work, be careful when using this node! To avoid accidents, dry run is the default action. Has an :ref:`optional PowerBi output <powerbi_port>` which is propagated with the following changes: - Active workspace is cleared. - Active dataset is cleared. """ name = 'Delete active Workspace' nodeid = 'com.sympathyfordata.azure.powerbi.workspaces.delete' tags = Tags(Tag.Azure.PowerBi) icon = 'node_powerbi_workspace_delete.svg' parameters = node.parameters() _delete_options = ['dry_run', 'delete'] _dry, _delete = _delete_options parameters.set_string( 'confirm', value=_dry, label='Action', editor=node.editors.combo_editor( options={ _dry: 'Dry run', _delete: 'Delete workspace'}), description=( 'Delete action, do a dry-run before switching to ' 'delete workspace.' )) inputs = Ports([powerbi_port()]) outputs = Ports([powerbi_port(opt='out')]) def execute(self, ctx): in_powerbi = ctx.input[_powerbi_port_name] out_powerbi_group = ctx.output.group(_powerbi_port_name) confirm = ctx.parameters['confirm'].value workspace_id = in_powerbi.workspace if not workspace_id: raise SyDataError('Active Workspace must be set before delete') with in_powerbi.connected(self): if confirm == self._dry: workspaces = in_powerbi.get_workspaces().run() named_ids = utils.objects_to_named_ids(workspaces) workspace_name = named_ids.get(workspace_id) print('Delete workspace, dry run:') if workspace_name is None: print('Selected workspace does not exist') else: print(f'Would remove workspace: {workspace_name}') elif confirm == self._delete: delete_workspace(in_powerbi, workspace_id, code=Errors.print_, other=Errors.ignore) propagate_powerbi(in_powerbi, out_powerbi_group) if out_powerbi_group: out_powerbi_group[0].workspace = None out_powerbi_group[0].dataset = None
[docs] class GetDatasetsPowerBi(node.Node): """ Get all available datasets from the active workspace of the PowerBi input and output them as a table containing two columns: `id` and `name`. Has an :ref:`optional PowerBi output <powerbi_port>` """ name = 'Get Datasets as Table' nodeid = 'com.sympathyfordata.azure.powerbi.datasets.table' tags = Tags(Tag.Azure.PowerBi) icon = 'node_powerbi_table.svg' inputs = Ports([powerbi_port()]) outputs = Ports([dataset_port(), powerbi_port(opt='in')]) def execute(self, ctx): in_powerbi = ctx.input[_powerbi_port_name] out_dataset = ctx.output[_dataset_port_name] with in_powerbi.connected(self): datasets = get_datasets(in_powerbi) set_datasets_table(out_dataset, datasets) out_dataset.set_name('Datasets') propagate_powerbi(in_powerbi, ctx.output.group(_powerbi_port_name))
[docs] class SetDatasetPowerBi(AsyncPowerBiNode): """ Sets the active dataset in the PowerBi output for use in following nodes. Dataset is stored using its unique id, see :ref:`powerbi_dataset`. If the configured dataset is missing the id will show up. When editing instead of choosing from existing alternatives, specify the id. :ref:`Propagates PowerBi input to output <powerbi_port>` with the following changes: - Active dataset is set/replaced. """ name = 'Set active Dataset' nodeid = 'com.sympathyfordata.azure.powerbi.datasets.set' tags = Tags(Tag.Azure.PowerBi) icon = 'node_powerbi_dataset_active.svg' inputs = Ports([powerbi_port()]) outputs = Ports([powerbi_port()]) parameters = node.parameters() parameters.set_string( _dataset_parameter, value='', label='Dataset', editor=node.editors.combo_editor( options=[], edit=True, placeholder='Required', include_empty=True), description=( 'Choose dataset in the workspace to activate, required.\n' 'Shows dataset names, when available, ' 'but stores id.\n\n' 'When datasets are missing: ensure that the previous node is ' 'executed or type an' '\n' 'existing id.')) def async_request_azure_parameters(self, ctx, powerbi): return True, {_dataset_parameter: powerbi.get_datasets()} def async_adjust_azure_parameters(self, ctx, responses): options = responses.get(_dataset_parameter) or {} self._update_options(ctx.parameters[_dataset_parameter], options) return bool(options), {_dataset_parameter} def save_parameters(self, ctx): self._save_current_option(ctx.parameters[_dataset_parameter]) def execute(self, ctx): in_powerbi = ctx.input[_powerbi_port_name] out_powerbi = ctx.output[_powerbi_port_name] dataset_id = ctx.parameters[_dataset_parameter].value dataset_name = None if not dataset_id: raise SyConfigurationError('Dataset must be set') with in_powerbi.connected(self): datasets = get_datasets(in_powerbi) dataset_name = get_dataset_name(datasets, dataset_id) if dataset_name is None: raise MissingDatasetError('Selected dataset does not exist') propagate_powerbi(in_powerbi, [out_powerbi]) out_powerbi.dataset = dataset_id out_powerbi.dataset_name = dataset_name
[docs] class CreateDatasetPowerBi(node.Node): """ Create new push :ref:`powerbi_dataset` with schema from input table. Newly created datsets will contain one table with schema (column names and types) determined by the input table. Duplicate dataset names are not allowed, so you must make sure to enter a unique and unused name or select a fallback option (`When exists`) to determine what happens if a dataset with the same name already exists. :ref:`Propagates PowerBi input to output <powerbi_port>` with the following changes: - Active dataset is set/replaced. """ name = 'Create Dataset' nodeid = 'com.sympathyfordata.azure.powerbi.datasets.create' tags = Tags(Tag.Azure.PowerBi) icon = 'node_powerbi_table.svg' inputs = Ports([powerbi_port(), table_port()]) outputs = Ports([powerbi_port()]) parameters = node.parameters() parameters.set_string( _dataset_parameter, value='', label='Dataset name', editor=node.editors.lineedit_editor(placeholder='Required'), description='Dataset name, it may not be empty') parameters.set_string( 'table', value='', label='Table name', editor=node.editors.lineedit_editor(placeholder='Optional'), description=( 'Table name, if not specified, dataset name will be used ' 'as table name')) # https://docs.microsoft.com/en-us/power-bi/developer/automation/ # api-automatic-retention-policy-for-real-time-data parameters.set_string( 'retention', value=powerbi.PowerBiArguments.default_retention_policy_none, label='Retention policy', editor=node.editors.combo_editor( options={ powerbi. PowerBiArguments.default_retention_policy_basic_fifo: 'FIFO', powerbi. PowerBiArguments.default_retention_policy_none: 'None'}), description=( 'Retention policy, FIFO: (first in first out) keep most recent ' '200,000 rows. None: Keep all rows, max 5,000,000.')) _exist_options = ['create', 'error', 'use'] _create, _error, _use = _exist_options parameters.set_string( 'exists', value=_error, label='When exists', editor=node.editors.combo_editor( options={_create: 'Create new', _error: 'Error', _use: 'Use existing'}), description='Action when dataset name already exists') def execute(self, ctx): in_powerbi = ctx.input[_powerbi_port_name] in_table = ctx.input[_table_port_name] out_powerbi = ctx.output[_powerbi_port_name] dataset_name = ctx.parameters[_dataset_parameter].value table_name = ctx.parameters['table'].value exists = ctx.parameters['exists'].value retention = ctx.parameters['retention'].value if not dataset_name: raise SyConfigurationError('Dataset name must not be empty') if not table_name: table_name = dataset_name if exists not in self._exist_options: raise SyConfigurationError( f'Unknown option for existing dataset: {exists}') dataset_id = None with in_powerbi.connected(self): if exists in [self._error, self._use]: datasets = get_datasets(in_powerbi) data = utils.objects_to_named_ids(datasets) same_name = { k: v for k, v in data.items() if v == dataset_name} n_same = len(same_name) if n_same > 1: raise SyDataError( f'Multiple datasets named {dataset_name} already ' f'exists') elif n_same == 1: if exists == self._error: raise SyDataError( f'Dataset {dataset_name} already exists') elif exists == self._use: dataset_id = next(iter(same_name)) if dataset_id is None: table_data = [table_info(table_name, in_table)] with workspace_request(): dataset = in_powerbi.post_dataset( dataset_name, table_data, default_retention_policy=retention).run() dataset_id = dataset['id'] propagate_powerbi(in_powerbi, [out_powerbi]) out_powerbi.dataset = dataset_id out_powerbi.dataset_name = dataset_name
[docs] class PushTableRowsPowerBi(AsyncPowerBiNode): """ Push table rows to dataset table. Rows can be pushed to tables in a dataset if they match the table's schema (column names and types) and satisfy other requirements and limitations for pushing data: - The dataset must be a push dataset. - The amount of data and the number of requests are limited, for example, 1,000,000 rows added per hour per dataset and 4,000 characters per value for string column. - See `API limitations <https://docs.microsoft.com/en-us/power-bi/ developer/automation/api-rest-api-limitations>`_ for more details. Push datasets can be created using :ref:`com.sympathyfordata.azure.powerbi.datasets.create`. Has an :ref:`optional PowerBi output <powerbi_port>` """ name = 'Push rows to Dataset Table' nodeid = 'com.sympathyfordata.azure.powerbi.datasets.rows.push' tags = Tags(Tag.Azure.PowerBi) icon = 'node_powerbi_table.svg' inputs = Ports([powerbi_port(), table_port()]) outputs = Ports([powerbi_port(opt='out')]) parameters = node.parameters() parameters.set_string( _table_parameter, value='', label='Table', description=( 'Choose output table name in the active Dataset, required.\n\n' 'When tables are missing: ensure the previous node is ' 'executed' '\n' 'and sets active Dataset or type an existing table name.' ), editor=node.editors.combo_editor( options=[], edit=True, placeholder='Required', include_empty=True)) parameters.set_integer( 'rate', value=1000, label='Rate of requests per hour', editor=node.editors.bounded_lineedit_editor( 20, 7200), description=( 'Set the rate of requests per hour to comply with the limits:\n' '120 POST requests per minute (7200 per hour), used for dataset ' 'tables with less than 250,000 rows (always the case for FIFO).\n' '120 POST requests per hour used for tables with 250,000 or more ' 'rows.')) def async_request_azure_parameters(self, ctx, powerbi): if powerbi.dataset: res = True, {_table_parameter: powerbi.get_tables(powerbi.dataset)} else: res = False, ( 'Active Dataset is not set: table is not updated') return res def async_adjust_azure_parameters(self, ctx, responses): options = {} tables = responses.get(_table_parameter) if tables is not None: options = utils.objects_to_names(tables) ctx.parameters[_table_parameter].adjust(options) return bool(options), {_table_parameter} def save_parameters(self, ctx): ctx.parameters[_table_parameter].adjust([]) def execute(self, ctx): in_powerbi = ctx.input[_powerbi_port_name] in_table = ctx.input[_table_port_name] dataset_id = in_powerbi.dataset table_name = ctx.parameters[_table_parameter].value rate = ctx.parameters['rate'].value rate_s = 60 * 60 / rate if not dataset_id: raise SyDataError('Active Dataset must be set before pushing rows') if not table_name: raise SyDataError('Table name must be set before pushing rows') with in_powerbi.connected(self): max_rows = 10000 t0 = time.time() posts = 0 for i in range(0, in_table.number_of_rows(), max_rows): sub_table = in_table[i: i + max_rows] with dataset_request(): try: in_powerbi.post_rows( dataset_id, table_name, table_rows(sub_table)).run() except request.ResponseError as r: if utils.get_response_code(r) == "ItemNotFound": raise MissingDatasetError( f"Active dataset does not exist or do not " f"contain selected table: {table_name}" ) from r elif r.code == 500: # Retry once. time.sleep(rate_s) posts += 1 in_powerbi.post_rows( dataset_id, table_name, table_rows(sub_table)).run() else: print(f'Failure after posting {i} rows, ' f'{in_table.number_of_rows() - i} ' f'rows were not posted.') raise tn = posts * rate_s + t0 wait = tn - time.time() if wait > 0: time.sleep(wait) propagate_powerbi(in_powerbi, ctx.output.group(_powerbi_port_name))
[docs] class DeleteDatasetPowerBi(node.Node): """ Delete the active :ref:`powerbi_dataset`. This can destroy work, be careful when using this node! To avoid accidents, dry run is the default action. Has an :ref:`optional PowerBi output <powerbi_port>` which is propagated with the following changes: - Active dataset is cleared. """ name = 'Delete active Dataset' nodeid = 'com.sympathyfordata.azure.powerbi.datasets.delete' tags = Tags(Tag.Azure.PowerBi) icon = 'node_powerbi_delete.svg' inputs = Ports([powerbi_port()]) outputs = Ports([powerbi_port(opt='out')]) parameters = node.parameters() _delete_options = ['dry_run', 'delete'] _dry, _delete = _delete_options parameters.set_string( 'confirm', value=_dry, label='Action', editor=node.editors.combo_editor( options={ _dry: 'Dry run', _delete: 'Delete dataset'}), description=( 'Delete action, do a dry-run before switching to ' 'delete workspace.' )) def execute(self, ctx): in_powerbi = ctx.input[_powerbi_port_name] out_powerbi_group = ctx.output.group(_powerbi_port_name) confirm = ctx.parameters['confirm'].value dataset_id = in_powerbi.dataset if not dataset_id: raise SyDataError('Active Dataset must be set before delete') with in_powerbi.connected(self): if confirm == self._dry: print('Delete dataset, dry run:') datasets = get_datasets(in_powerbi, code=Errors.print_) dataset_name = get_dataset_name(datasets, dataset_id) if dataset_name is None: print('Active dataset does not exist') else: print(f'Would remove dataset: {dataset_name}') elif confirm == self._delete: try: delete_dataset(in_powerbi, dataset_id) except request.ResponseError as r: if utils.get_response_code(r) == 'ItemNotFound': print('Active dataset does not exist') else: raise propagate_powerbi(in_powerbi, out_powerbi_group) if out_powerbi_group: out_powerbi_group[0].dataset = None