Source code for node_table_value_search_replace

# This file is part of Sympathy for Data.
# Copyright (c) 2013, 2017 Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
from sympathy.api import node as synode
from sympathy.api import table
import re
import numpy as np
from sympathy.api import node_helper, dtypes
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, adjust
from sympathy.api.exceptions import SyConfigurationError


COMMON_DOCS = r"""
Regex
=====
If *Text replace only (using regex)* is checked, the search expression may be
a regular expression (regex). For more information about how to write regex,
see :ref:`appendix_regex`. Here, it is possible to capture part of the match
using parentheses in the search expression, see
:ref:`regex_grouping_and_capturing`.

Unless configured to use regex replacement, search, replace and default values
will be read as a values of the same type as the column it is replacing in.
For details about how to enter values for different types, see
:ref:`appendix_typed_text`.

"""

GUI_DOCS = """
Replacements will be performed in all selected columns.
""" + COMMON_DOCS

TABLE_DOCS = """
The expressions table (upper port) should have the following structure:

+------------------------+------------------------------+
| Search column          | Replace column               |
+========================+==============================+
| Expression to search 1 | Replace for expression 1     |
+------------------------+------------------------------+
| Expression to search 2 | Replace for expression 2     |
+------------------------+------------------------------+
| ...                    | ...                          |
+------------------------+------------------------------+

The replacements will be performed one row at a time so it is possible for the
second search expression to find a match within the first replacement.

""" + COMMON_DOCS


class ConvertError(Exception):
    pass


_config_error_msg = (
    'Failure in column {column}: could not convert {name}: '
    '"{value}" to {type}.')


def _convert(value, dtype, name):
    try:
        return dtypes.numpy_value_from_dtype_str(dtype, value)
    except ValueError:
        raise ConvertError(
            dict(name=name, value=value, type=dtypes.typename_from_kind(
                dtype.kind)))


def _item_size(dtype):
    if dtype.kind == 'U':
        return dtype.itemsize / 4
    elif dtype.kind == 'S':
        return dtype.itemsize


def replace_array_values_regex(arr, search, replace, use_default, default,
                               re_flags):
    arr_type = dtypes.numpy_dtype_factory_for_dtype(
        arr.dtype)

    if arr_type.kind in ['U', 'S']:

        if arr_type.kind == 'S':
            search.encode('ascii')
            replace.encode('ascii')
            if use_default:
                default.encode('ascii')

        search = re.compile(search, flags=re_flags)
        values = []

        if use_default:
            for value in arr.tolist():
                value, n = search.subn(replace, value)
                value = value if n else default
                values.append(value)
            arr = np.array(values, dtype=arr_type)
        else:
            replaced = 0
            for value in arr.tolist():
                value, n = search.subn(replace, value)
                replaced += n
                values.append(value)

            if replaced:
                arr = np.array(values, dtype=arr_type)
    return arr


def replace_array_values_literal(arr, search, replace, use_default, default):
    arr_type = dtypes.numpy_dtype_factory_for_dtype(
        arr.dtype)
    search = _convert(search, arr_type, 'Search')
    replace = _convert(replace, arr_type, 'Replace')

    if arr_type.kind == 'f' and np.isnan(search):
        bindex = np.isnan(arr)
    elif arr_type.kind == 'M' and np.isnat(search):
        bindex = np.isnat(arr)
    else:
        bindex = arr == search

    if arr_type.kind in ['U', 'S']:
        # For fixed size string columns we convert to object
        # to ensure that the replacements fit.
        if use_default:
            default = _convert(default, arr_type, 'Default')
            # Copy.
            arr = np.array(arr, dtype='O')
            arr[bindex] = replace
            arr[~bindex] = default
            arr = np.array(arr, dtype=arr_type.kind)
        else:
            if np.any(bindex):
                # Copy.
                arr = np.array(arr, dtype='O')
                arr[bindex] = replace
                arr = np.array(arr, dtype=arr_type)
    else:
        if use_default:
            default = _convert(default, arr_type, 'Default')
            # Copy.
            arr = np.array(arr, dtype=arr_type)
            arr[bindex] = replace
            arr[~bindex] = default
        else:
            if np.any(bindex):
                # Copy.
                arr = np.array(arr, dtype=arr_type)
                arr[bindex] = replace
    return arr


def replace_table_values(out_table, columns, search, replace, use_default,
                         default, regex, ignore_case):
    out_table_tmp = table.File()
    re_flags = re.IGNORECASE if ignore_case else 0

    for col_name in columns:
        col = out_table[col_name]
        mask = None

        if isinstance(col, np.ma.MaskedArray):
            mask = col.mask
            col = col.data

        if regex:
            new_col = replace_array_values_regex(
                col, search, replace, use_default, default, re_flags)
        else:
            try:
                new_col = replace_array_values_literal(
                    col, search, replace, use_default, default)
            except ConvertError as ce:
                raise SyConfigurationError(_config_error_msg.format(
                    column=col_name, **ce.args[0]))

        if new_col is not col:

            if mask is not None:
                new_col = np.ma.MaskedArray(new_col, mask)

            out_table_tmp.set_column_from_array(col_name, new_col)

    out_table_tmp.set_attributes(out_table.get_attributes())
    out_table_tmp.set_name(out_table.get_name())
    out_table.update(out_table_tmp)
    return out_table


def _set_literal(params, value=True):
    params.set_boolean(
        'literal', label='Text replace only (using regex)',
        description=(
            'Perform regex replacements in string columns, i.e., columns with '
            'types text and bytes, other columns are ignored. '
            'Disable this option to replace full values, without using '
            'regex across all types of columns. Learn more about Regular '
            'expression syntax in the documentation appendix.'),
        value=value)


def common_params(parameters):
    _set_literal(parameters)
    parameters.set_boolean(
        'ignore_case', label='Ignore case',
        description='Ignore case when searching', value=False)
    return parameters


class TableSearchBase(synode.Node):
    author = 'Greger Cronquist'
    icon = 'search_replace.svg'
    tags = Tags(Tag.DataProcessing.TransformData)

    parameters = synode.parameters()
    editor = synode.editors.multilist_editor(edit=True)

    parameters.set_list(
        'columns', label='Select columns',
        description='Select the columns to use perform replace on',
        value=[], editor=editor)
    parameters.set_string(
        'find', label='Search',
        value='',
        description='Specify the search pattern that will be replaced.')
    parameters.set_string('replace', label='Replace',
                          value='',
                          description='Specify the replacement string.')
    parameters = common_params(parameters)
    parameters.set_boolean('use_default', label='Use default',
                           value=False,
                           description='Use default value when not found.')
    parameters.set_string('default', label='Default value',
                          value='',
                          description='Specify default value.')

    controllers = (
        synode.controller(
            when=synode.field('use_default', state='checked'),
            action=synode.field('default', state='enabled')
        ),
        synode.controller(
            when=synode.field('literal', state='checked'),
            action=synode.field('ignore_case', state='enabled')
        ),
    )

    def update_parameters(self, old_params):
        cols = old_params['columns']
        if not cols.editor.get('mode', False):
            cols.multiselect_mode = 'selected_exists'

    def adjust_parameters(self, node_context):
        adjust(node_context.parameters['columns'], node_context.input[0])

    def _get_params(self, node_context):
        params = node_context.parameters
        search = params['find'].value
        replace = params['replace'].value
        use_default = params['use_default'].value
        default = params['default'].value
        ignore_case = params['ignore_case'].value
        regex = params['literal'].value
        return search, replace, use_default, default, regex, ignore_case

    def _replace_in_table(self, in_table, columns, params):
        if in_table.number_of_rows() == 0:
            # Table contained no values (but perhaps empty columns).
            # Return original table to not mess up column types.
            return in_table

        out_table = table.File()
        out_table.source(in_table)
        replace_table_values(out_table, columns, *params)
        return out_table


[docs]class TableValueSearchReplace(TableSearchBase): __doc__ = GUI_DOCS name = 'Replace values in Table' description = 'Search and replace values in Table.' nodeid = 'org.sysess.sympathy.data.table.tablevaluesearchreplace' inputs = Ports([Port.Table('Input Table', name='table')]) outputs = Ports([Port.Table('Table with replaced values', name='table')]) def adjust_parameters(self, node_context): adjust(node_context.parameters['columns'], node_context.input[0]) def execute(self, node_context): in_table = node_context.input[0] columns = node_context.parameters['columns'].selected_names( in_table.names()) out_table = self._replace_in_table( in_table, columns, self._get_params(node_context)) node_context.output[0].update(out_table)
[docs]@node_helper.list_node_decorator( {'table': {'name': 'tables'}}, {'table': {'name': 'tables'}}) class TableValueSearchReplaceMultiple(TableValueSearchReplace): __doc__ = GUI_DOCS name = 'Replace values in Tables' nodeid = 'org.sysess.sympathy.data.table.tablevaluesearchreplacemultiple'
def _get_single_col_editor(): return synode.editors.combo_editor('', filter=True, edit=True) def _get_multi_col_editor(): return synode.editors.multilist_editor(edit=True) class TableValueSearchReplaceWithTableSuper(synode.Node): description = ( 'Search and replace values in specified columns ' 'table with search expressions from a table') author = ( 'Greger Cronquist <greger.cronquist@combine.se>, ' 'Andreas Tågerud <andreas.tagerud@combine.se>') version = '1.0' icon = 'search_replace.svg' tags = Tags(Tag.DataProcessing.TransformData) parameters = synode.parameters() editor_cols = _get_multi_col_editor() editor_col = _get_single_col_editor() parameters.set_list( 'column', label='Columns to replace values in', description='Select in which to perform replace', value=[], editor=editor_cols) parameters.set_list( 'find', label='Column with search expressions', description='Select which column contains search expressions', value=[], editor=editor_col) parameters.set_list( 'replace', label='Column with replace expressions', description='Select which column contains replacements', value=[], editor=editor_col) parameters = common_params(parameters) def update_parameters(self, old_params): for param in ['find', 'replace']: if param in old_params: old_params[param].editor = _get_single_col_editor() param = 'column' if param in old_params: old_params[param].editor = _get_multi_col_editor() def adjust_parameters(self, node_context): adjust(node_context.parameters['find'], node_context.input['expressions']) adjust(node_context.parameters['replace'], node_context.input['expressions']) adjust(node_context.parameters['column'], node_context.input['data']) def execute_once(self, node_context, in_table): parameters = node_context.parameters exp = node_context.input['expressions'] regex = parameters['literal'].value ignore_case = parameters['ignore_case'].value try: search = exp.get_column_to_array(parameters['find'].selected) replace = exp.get_column_to_array(parameters['replace'].selected) except (KeyError, ValueError): raise SyConfigurationError( 'One or more of the selected columns do not seem to exist') out_table = table.File() selected_names = parameters['column'].selected_names( in_table.column_names()) out_table.source(in_table) for search, replace in zip(list(search), list(replace)): replace_table_values( out_table, selected_names, search, replace, False, None, regex, ignore_case) return out_table
[docs]class TableValueSearchReplaceWithTable(TableValueSearchReplaceWithTableSuper): __doc__ = GUI_DOCS name = 'Replace values in Table with Table' nodeid = 'org.sysess.sympathy.data.table.tablevaluesearchreplacewithtable' inputs = Ports([ Port.Table('Expressions', name='expressions'), Port.Table('Table Data', name='data')]) outputs = Ports([Port.Table('Table with replaced values', name='data')]) def execute(self, node_context): in_table = node_context.input['data'] if not in_table.is_empty(): node_context.output['data'].source( self.execute_once(node_context, in_table))
[docs]@node_helper.list_node_decorator(['data'], ['data']) class TableValueSearchReplaceWithTableMultiple( TableValueSearchReplaceWithTable): __doc__ = GUI_DOCS name = 'Replace values in Tables with Table' nodeid = 'org.sysess.sympathy.data.table.tablesvaluesearchreplacewithtable'