# This file is part of Sympathy for Data.
# Copyright (c) 2013, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data. If not, see <http://www.gnu.org/licenses/>.
from sympathy.api import node
from sympathy.api import node_helper
from sympathy.api import table
from sympathy.api import exceptions
from sympathy.api.nodeconfig import Tag, Tags, adjust, Ports, Port
import pandas
import itertools
MERGE_OPERATIONS = {
'Union': 'outer',
'Intersection': 'inner',
'Index from A': 'left',
'Index from B': 'right'
}
[docs]
class MergeTable(node.Node):
"""
Merge table on a shared `index column` in `input A` and `input B`.
A specific index can be present in either A or B, or in both A and B.
The `join operation` determines which rows to include in the output.
- Intersection, output rows in A x B (cartesian product), where the two
indices match. Corresponds to an intersection of indices from both A
and B.
- Index from A, output rows with index present in A and not in
B. Also includes the `intersection`.
- Index from B, output rows with index present in B and not in
A. Also includes the `intersection`.
- Union, output the rows for `index from A`, `index from B` and the
`intersection` (once). Corresponds to a union of indices from both A
and B.
Output contains the index column (once) and one additional column for each
column in A and in B. Column names, except the index, that appear in both A
and B are made unique by adding a `suffix`. Rows with an index that is only
present in either of A or B will contain masked values (or NaN) in columns
missing data.
Example
=======
:Suffix A: _A
:Suffix B: _B
:A:
+----+-------+-----------+
| Id | Price | Inventory |
+====+=======+===========+
| 0 | 10 | 5 |
+----+-------+-----------+
| 1 | 15 | 0 |
+----+-------+-----------+
| 2 | 25 | 2 |
+----+-------+-----------+
:B:
+----+------+-------+
| Id | Sold | Price |
+====+======+=======+
| 1 | 1 | 10 |
+----+------+-------+
| 1 | 4 | 15 |
+----+------+-------+
| 2 | 3 | 25 |
+----+------+-------+
| 3 | 1 | 45 |
+----+------+-------+
:Intersection:
+----+---------+-----------+------+---------+
| Id | Price_A | Inventory | Sold | Price_B |
+====+=========+===========+======+=========+
| 1 | 15 | 0 | 1 | 10 |
+----+---------+-----------+------+---------+
| 1 | 15 | 0 | 4 | 15 |
+----+---------+-----------+------+---------+
| 2 | 25 | 2 | 3 | 25 |
+----+---------+-----------+------+---------+
:Index from A:
+----+---------+-----------+------+---------+
| Id | Price_A | Inventory | Sold | Price_B |
+====+=========+===========+======+=========+
| 0 | 10 | 5 | NaN | NaN |
+----+---------+-----------+------+---------+
| ... 3 intersection rows |
+----+---------+-----------+------+---------+
:Index from B:
+----+---------+-----------+------+---------+
| Id | Price_A | Inventory | Sold | Price_B |
+====+=========+===========+======+=========+
| ... 3 intersection rows |
+----+---------+-----------+------+---------+
| 3 | NaN | NaN | 1 | 45 |
+----+---------+-----------+------+---------+
:Union:
+----+---------+-----------+------+---------+
| Id | Price_A | Inventory | Sold | Price_B |
+====+=========+===========+======+=========+
| 0 | 10 | 5 | NaN | NaN |
+----+---------+-----------+------+---------+
| ... 3 intersection rows |
+----+---------+-----------+------+---------+
| 3 | NaN | NaN | 1 | 45 |
+----+---------+-----------+------+---------+
"""
author = 'Greger Cronquist'
description = 'Merge tables on a shared index column'
tags = Tags(Tag.DataProcessing.TransformStructure)
icon = 'merge.svg'
name = 'Merge Table'
nodeid = 'org.sysess.data.table.mergetable'
inputs = Ports([
Port.Table('Input A', name='Input A'),
Port.Table('Input B', name='Input B'),
])
outputs = Ports([
Port.Table('Output', name='Output'),
])
parameters = node.parameters()
parameters.set_list(
'index', label='Index column',
values=[0],
description='Index to join on, should exist in both input A and B.',
editor=node.editors.combo_editor(edit=True))
parameters.set_list(
'operation', label='Join operation',
description='Join operation, determines which rows to output.',
list=list(MERGE_OPERATIONS.keys()),
value=[0],
editor=node.editors.combo_editor())
parameters.set_string(
'suffix_a', label='Suffix A', value='_A',
description='Suffix for column names in A appearing in both A and B.')
parameters.set_string(
'suffix_b', label='Suffix B', value='_B',
description='Suffix for column names in B appearing in both A and B.')
def adjust_parameters(self, ctx):
adjust(ctx.parameters['index'], ctx.input['Input A'])
def execute(self, ctx):
index_param = ctx.parameters['index']
index_column = index_param.selected
operation = ctx.parameters['operation'].selected
table_a = ctx.input['Input A']
table_b = ctx.input['Input B']
out_table = ctx.output['Output']
suffixes = (ctx.parameters['suffix_a'].value,
ctx.parameters['suffix_b'].value)
if (table_a.is_empty() and not
table_b.is_empty()):
out_table.source(table_b)
elif (table_b.is_empty() and not
table_a.is_empty()):
out_table.source(table_a)
elif (table_b.is_empty() and
table_a.is_empty()):
return
else:
dataframe_a = table_a.to_dataframe()
dataframe_b = table_b.to_dataframe()
try:
new_table = pandas.merge(
dataframe_a, dataframe_b, how=MERGE_OPERATIONS[operation],
on=index_column, suffixes=suffixes)
except Exception:
col_a = table_a._require_column(index_param)
col_b = table_b._require_column(index_param)
if col_a.dtype.kind != col_b.dtype.kind:
# Assume problem due to unmatched types.
raise exceptions.SyDataError(
'Failed to merge, are the two index columns of the '
'compatible types?'
)
raise
out_table.source(table.File.from_dataframe(new_table))
attributes_a = table_a.get_attributes()
attributes_b = table_b.get_attributes()
attributes_c = tuple(dict(itertools.chain(attributes_a[i].items(),
attributes_b[i].items()))
for i in range(2))
out_table.set_attributes(attributes_c)
[docs]
@node_helper.list_node_decorator(['Input A', 'Input B'], ['Output'])
class MergeTables(MergeTable):
name = 'Merge Tables'
nodeid = 'org.sysess.data.table.mergetables'