# This file is part of Sympathy for Data.
# Copyright (c) 2013, 2017 Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
from sympathy.api import node as synode
from sympathy.api import table
import re
import numpy as np
from sympathy.api import node_helper, dtypes
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, adjust
from sympathy.api.exceptions import SyConfigurationError
COMMON_DOCS = r"""
Regex
=====
If *Text replace only (using regex)* is checked, the search expression may be
a regular expression (regex). For more information about how to write regex,
see :ref:`appendix_regex`. Here, it is possible to capture part of the match
using parentheses in the search expression, see
:ref:`regex_grouping_and_capturing`.
Unless configured to use regex replacement, search, replace and default values
will be read as a values of the same type as the column it is replacing in.
For details about how to enter values for different types, see
:ref:`appendix_typed_text`.
"""
GUI_DOCS = """
Replacements will be performed in all selected columns.
""" + COMMON_DOCS
TABLE_DOCS = """
The expressions table (upper port) should have the following structure:
+------------------------+------------------------------+
| Search column | Replace column |
+========================+==============================+
| Expression to search 1 | Replace for expression 1 |
+------------------------+------------------------------+
| Expression to search 2 | Replace for expression 2 |
+------------------------+------------------------------+
| ... | ... |
+------------------------+------------------------------+
The replacements will be performed one row at a time so it is possible for the
second search expression to find a match within the first replacement.
""" + COMMON_DOCS
class ConvertError(Exception):
pass
_config_error_msg = (
'Failure in column {column}: could not convert {name}: '
'"{value}" to {type}.')
def _convert(value, dtype, name):
try:
return dtypes.numpy_value_from_dtype_str(dtype, value)
except ValueError as exc:
raise ConvertError(
dict(
name=name,
value=value,
type=dtypes.typename_from_kind(dtype.kind),
)
) from exc
def _item_size(dtype):
if dtype.kind == 'U':
return dtype.itemsize / 4
elif dtype.kind == 'S':
return dtype.itemsize
def replace_array_values_regex(arr, search, replace, use_default, default,
re_flags):
arr_type = dtypes.numpy_dtype_factory_for_dtype(
arr.dtype)
if arr_type.kind in ['U', 'S']:
if arr_type.kind == 'S':
search.encode('ascii')
replace.encode('ascii')
if use_default:
default.encode('ascii')
search = re.compile(search, flags=re_flags)
values = []
if use_default:
for value in arr.tolist():
value, n = search.subn(replace, value)
value = value if n else default
values.append(value)
arr = np.array(values, dtype=arr_type)
else:
replaced = 0
for value in arr.tolist():
value, n = search.subn(replace, value)
replaced += n
values.append(value)
if replaced:
arr = np.array(values, dtype=arr_type)
return arr
def replace_array_values_literal(arr, search, replace, use_default, default):
arr_type = dtypes.numpy_dtype_factory_for_dtype(
arr.dtype)
search = _convert(search, arr_type, 'Search')
replace = _convert(replace, arr_type, 'Replace')
if arr_type.kind == 'f' and np.isnan(search):
bindex = np.isnan(arr)
elif arr_type.kind == 'M' and np.isnat(search):
bindex = np.isnat(arr)
else:
bindex = arr == search
if arr_type.kind in ['U', 'S']:
# For fixed size string columns we convert to object
# to ensure that the replacements fit.
if use_default:
default = _convert(default, arr_type, 'Default')
# Copy.
arr = np.array(arr, dtype='O')
arr[bindex] = replace
arr[~bindex] = default
arr = np.array(arr, dtype=arr_type.kind)
else:
if np.any(bindex):
# Copy.
arr = np.array(arr, dtype='O')
arr[bindex] = replace
arr = np.array(arr, dtype=arr_type)
else:
if use_default:
default = _convert(default, arr_type, 'Default')
# Copy.
arr = np.array(arr, dtype=arr_type)
arr[bindex] = replace
arr[~bindex] = default
else:
if np.any(bindex):
# Copy.
arr = np.array(arr, dtype=arr_type)
arr[bindex] = replace
return arr
def replace_table_values(out_table, columns, search, replace, use_default,
default, regex, ignore_case):
out_table_tmp = table.File()
re_flags = re.IGNORECASE if ignore_case else 0
for col_name in columns:
col = out_table[col_name]
mask = None
if isinstance(col, np.ma.MaskedArray):
mask = col.mask
col = col.data
if regex:
new_col = replace_array_values_regex(
col, search, replace, use_default, default, re_flags)
else:
try:
new_col = replace_array_values_literal(
col, search, replace, use_default, default)
except ConvertError as ce:
raise SyConfigurationError(
_config_error_msg.format(column=col_name, **ce.args[0])
) from ce
if new_col is not col:
if mask is not None:
new_col = np.ma.MaskedArray(new_col, mask)
out_table_tmp.set_column_from_array(col_name, new_col)
out_table_tmp.set_attributes(out_table.get_attributes())
out_table_tmp.set_name(out_table.get_name())
out_table.update(out_table_tmp)
return out_table
def _set_literal(params, value=True):
params.set_boolean(
'literal', label='Text replace only (using regex)',
description=(
'Perform regex replacements in string columns, i.e., columns with '
'types text and bytes, other columns are ignored. '
'Disable this option to replace full values, without using '
'regex across all types of columns. Learn more about Regular '
'expression syntax in the documentation appendix.'),
value=value)
def common_params(parameters):
_set_literal(parameters)
parameters.set_boolean(
'ignore_case', label='Ignore case',
description='Ignore case when searching', value=False)
return parameters
class TableSearchBase(synode.Node):
author = 'Greger Cronquist'
icon = 'search_replace.svg'
tags = Tags(Tag.DataProcessing.TransformData)
parameters = synode.parameters()
editor = synode.editors.multilist_editor(edit=True)
parameters.set_list(
'columns', label='Select columns',
description='Select the columns to use perform replace on',
value=[], editor=editor)
parameters.set_string(
'find', label='Search',
value='',
description='Specify the search pattern that will be replaced.')
parameters.set_string('replace', label='Replace',
value='',
description='Specify the replacement string.')
parameters = common_params(parameters)
parameters.set_boolean('use_default', label='Use default',
value=False,
description='Use default value when not found.')
parameters.set_string('default', label='Default value',
value='',
description='Specify default value.')
controllers = (
synode.controller(
when=synode.field('use_default', state='checked'),
action=synode.field('default', state='enabled')
),
synode.controller(
when=synode.field('literal', state='checked'),
action=synode.field('ignore_case', state='enabled')
),
)
def update_parameters(self, old_params):
cols = old_params['columns']
if not cols.editor.get('mode', False):
cols.multiselect_mode = 'selected_exists'
def adjust_parameters(self, node_context):
adjust(node_context.parameters['columns'], node_context.input[0])
def _get_params(self, node_context):
params = node_context.parameters
search = params['find'].value
replace = params['replace'].value
use_default = params['use_default'].value
default = params['default'].value
ignore_case = params['ignore_case'].value
regex = params['literal'].value
return search, replace, use_default, default, regex, ignore_case
def _replace_in_table(self, in_table, columns, params):
if in_table.number_of_rows() == 0:
# Table contained no values (but perhaps empty columns).
# Return original table to not mess up column types.
return in_table
out_table = table.File()
out_table.source(in_table)
replace_table_values(out_table, columns, *params)
return out_table
[docs]
class TableValueSearchReplace(TableSearchBase):
__doc__ = GUI_DOCS
name = 'Replace values in Table'
description = 'Search and replace values in Table.'
nodeid = 'org.sysess.sympathy.data.table.tablevaluesearchreplace'
inputs = Ports([Port.Table('Input Table', name='table')])
outputs = Ports([Port.Table('Table with replaced values', name='table')])
def adjust_parameters(self, node_context):
adjust(node_context.parameters['columns'], node_context.input[0])
def execute(self, node_context):
in_table = node_context.input[0]
columns = node_context.parameters['columns'].selected_names(
in_table.names())
out_table = self._replace_in_table(
in_table, columns, self._get_params(node_context))
node_context.output[0].update(out_table)
[docs]
@node_helper.list_node_decorator(
{'table': {'name': 'tables'}}, {'table': {'name': 'tables'}})
class TableValueSearchReplaceMultiple(TableValueSearchReplace):
__doc__ = GUI_DOCS
name = 'Replace values in Tables'
nodeid = 'org.sysess.sympathy.data.table.tablevaluesearchreplacemultiple'
def _get_single_col_editor():
return synode.editors.combo_editor('', filter=True, edit=True)
def _get_multi_col_editor():
return synode.editors.multilist_editor(edit=True)
class TableValueSearchReplaceWithTableSuper(synode.Node):
description = (
'Search and replace values in specified columns '
'table with search expressions from a table')
author = (
'Greger Cronquist <greger.cronquist@combine.se>, '
'Andreas Tågerud <andreas.tagerud@combine.se>')
icon = 'search_replace.svg'
tags = Tags(Tag.DataProcessing.TransformData)
parameters = synode.parameters()
editor_cols = _get_multi_col_editor()
editor_col = _get_single_col_editor()
parameters.set_list(
'column', label='Columns to replace values in',
description='Select in which to perform replace', value=[],
editor=editor_cols)
parameters.set_list(
'find', label='Column with search expressions',
description='Select which column contains search expressions',
value=[], editor=editor_col)
parameters.set_list(
'replace', label='Column with replace expressions',
description='Select which column contains replacements', value=[],
editor=editor_col)
parameters = common_params(parameters)
def update_parameters(self, old_params):
for param in ['find', 'replace']:
if param in old_params:
old_params[param].editor = _get_single_col_editor()
param = 'column'
if param in old_params:
old_params[param].editor = _get_multi_col_editor()
def adjust_parameters(self, node_context):
adjust(node_context.parameters['find'],
node_context.input['expressions'])
adjust(node_context.parameters['replace'],
node_context.input['expressions'])
adjust(node_context.parameters['column'],
node_context.input['data'])
def execute_once(self, node_context, in_table):
parameters = node_context.parameters
exp = node_context.input['expressions']
regex = parameters['literal'].value
ignore_case = parameters['ignore_case'].value
try:
search = exp.get_column_to_array(parameters['find'].selected)
replace = exp.get_column_to_array(parameters['replace'].selected)
except (KeyError, ValueError) as exc:
raise SyConfigurationError(
"One or more of the selected columns do not seem to exist"
) from exc
out_table = table.File()
selected_names = parameters['column'].selected_names(
in_table.column_names())
out_table.source(in_table)
for search, replace in zip(list(search), list(replace)):
replace_table_values(
out_table, selected_names, search, replace, False, None, regex,
ignore_case)
return out_table
[docs]
class TableValueSearchReplaceWithTable(TableValueSearchReplaceWithTableSuper):
__doc__ = GUI_DOCS
name = 'Replace values in Table with Table'
nodeid = 'org.sysess.sympathy.data.table.tablevaluesearchreplacewithtable'
inputs = Ports([
Port.Table('Expressions', name='expressions'),
Port.Table('Table Data', name='data')])
outputs = Ports([Port.Table('Table with replaced values', name='data')])
def execute(self, node_context):
in_table = node_context.input['data']
if not in_table.is_empty():
node_context.output['data'].source(
self.execute_once(node_context, in_table))
[docs]
@node_helper.list_node_decorator(['data'], ['data'])
class TableValueSearchReplaceWithTableMultiple(
TableValueSearchReplaceWithTable):
__doc__ = GUI_DOCS
name = 'Replace values in Tables with Table'
nodeid = 'org.sysess.sympathy.data.table.tablesvaluesearchreplacewithtable'