Source code for node_merge_tables

# This file is part of Sympathy for Data.
# Copyright (c) 2013, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
from sympathy.api import node
from sympathy.api import node_helper
from sympathy.api import table
from sympathy.api import exceptions
from sympathy.api.nodeconfig import Tag, Tags, adjust, Ports, Port
import pandas
import itertools


MERGE_OPERATIONS = {
    'Union': 'outer',
    'Intersection': 'inner',
    'Index from A': 'left',
    'Index from B': 'right'
}


[docs]class MergeTable(node.Node): """ Merge table on a shared `index column` in `input A` and `input B`. A specific index can be present in either A or B, or in both A and B. The `join operation` determines which rows to include in the output. - Intersection, output rows in A x B (cartesian product), where the two indices match. Corresponds to an intersection of indices from both A and B. - Index from A, output rows with index present in A and not in B. Also includes the `intersection`. - Index from B, output rows with index present in B and not in A. Also includes the `intersection`. - Union, output the rows for `index from A`, `index from B` and the `intersection` (once). Corresponds to a union of indices from both A and B. Output contains the index column (once) and one additional column for each column in A and in B. Column names, except the index, that appear in both A and B are made unique by adding a `suffix`. Rows with an index that is only present in either of A or B will contain masked values (or NaN) in columns missing data. Example ======= :Suffix A: _A :Suffix B: _B :A: +----+-------+-----------+ | Id | Price | Inventory | +====+=======+===========+ | 0 | 10 | 5 | +----+-------+-----------+ | 1 | 15 | 0 | +----+-------+-----------+ | 2 | 25 | 2 | +----+-------+-----------+ :B: +----+------+-------+ | Id | Sold | Price | +====+======+=======+ | 1 | 1 | 10 | +----+------+-------+ | 1 | 4 | 15 | +----+------+-------+ | 2 | 3 | 25 | +----+------+-------+ | 3 | 1 | 45 | +----+------+-------+ :Intersection: +----+---------+-----------+------+---------+ | Id | Price_A | Inventory | Sold | Price_B | +====+=========+===========+======+=========+ | 1 | 15 | 0 | 1 | 10 | +----+---------+-----------+------+---------+ | 1 | 15 | 0 | 4 | 15 | +----+---------+-----------+------+---------+ | 2 | 25 | 2 | 3 | 25 | +----+---------+-----------+------+---------+ :Index from A: +----+---------+-----------+------+---------+ | Id | Price_A | Inventory | Sold | Price_B | +====+=========+===========+======+=========+ | 0 | 10 | 5 | NaN | NaN | +----+---------+-----------+------+---------+ | ... 3 intersection rows | +----+---------+-----------+------+---------+ :Index from B: +----+---------+-----------+------+---------+ | Id | Price_A | Inventory | Sold | Price_B | +====+=========+===========+======+=========+ | ... 3 intersection rows | +----+---------+-----------+------+---------+ | 3 | NaN | NaN | 1 | 45 | +----+---------+-----------+------+---------+ :Union: +----+---------+-----------+------+---------+ | Id | Price_A | Inventory | Sold | Price_B | +====+=========+===========+======+=========+ | 0 | 10 | 5 | NaN | NaN | +----+---------+-----------+------+---------+ | ... 3 intersection rows | +----+---------+-----------+------+---------+ | 3 | NaN | NaN | 1 | 45 | +----+---------+-----------+------+---------+ """ author = 'Greger Cronquist' description = 'Merge tables on a shared index column' tags = Tags(Tag.DataProcessing.TransformStructure) icon = 'merge.svg' name = 'Merge Table' nodeid = 'org.sysess.data.table.mergetable' inputs = Ports([ Port.Table('Input A', name='Input A'), Port.Table('Input B', name='Input B'), ]) outputs = Ports([ Port.Table('Output', name='Output'), ]) parameters = node.parameters() parameters.set_list( 'index', label='Index column', values=[0], description='Index to join on, should exist in both input A and B.', editor=node.editors.combo_editor(edit=True)) parameters.set_list( 'operation', label='Join operation', description='Join operation, determines which rows to output.', list=list(MERGE_OPERATIONS.keys()), value=[0], editor=node.editors.combo_editor()) parameters.set_string( 'suffix_a', label='Suffix A', value='_A', description='Suffix for column names in A appearing in both A and B.') parameters.set_string( 'suffix_b', label='Suffix B', value='_B', description='Suffix for column names in B appearing in both A and B.') def adjust_parameters(self, ctx): adjust(ctx.parameters['index'], ctx.input['Input A']) def execute(self, ctx): index_param = ctx.parameters['index'] index_column = index_param.selected operation = ctx.parameters['operation'].selected table_a = ctx.input['Input A'] table_b = ctx.input['Input B'] out_table = ctx.output['Output'] suffixes = (ctx.parameters['suffix_a'].value, ctx.parameters['suffix_b'].value) if (table_a.is_empty() and not table_b.is_empty()): out_table.source(table_b) elif (table_b.is_empty() and not table_a.is_empty()): out_table.source(table_a) elif (table_b.is_empty() and table_a.is_empty()): return else: dataframe_a = table_a.to_dataframe() dataframe_b = table_b.to_dataframe() try: new_table = pandas.merge( dataframe_a, dataframe_b, how=MERGE_OPERATIONS[operation], on=index_column, suffixes=suffixes) except Exception: col_a = table_a._require_column(index_param) col_b = table_b._require_column(index_param) if col_a.dtype.kind != col_b.dtype.kind: # Assume problem due to unmatched types. raise exceptions.SyDataError( 'Failed to merge, are the two index columns of the ' 'compatible types?' ) raise out_table.source(table.File.from_dataframe(new_table)) attributes_a = table_a.get_attributes() attributes_b = table_b.get_attributes() attributes_c = tuple(dict(itertools.chain(attributes_a[i].items(), attributes_b[i].items())) for i in range(2)) out_table.set_attributes(attributes_c)
[docs]@node_helper.list_node_decorator(['Input A', 'Input B'], ['Output']) class MergeTables(MergeTable): name = 'Merge Tables' nodeid = 'org.sysess.data.table.mergetables'