Source code for node_merge_reports

# This file is part of Sympathy for Data.
# Copyright (c) 2015, 2017, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
import json
import copy
import collections.abc
import re
from sympathy.api import node as synode
from sympathy.api.nodeconfig import Ports, Tag, Tags
from sympathy.api import report
from sylib.report import models


prefix_pattern = re.compile(r'\d+\s.+')


[docs]class MergeReports(synode.Node): """Merge pages of report by appending the second input to the first.""" name = 'Merge Reports' nodeid = 'org.sysess.sympathy.report.merge' author = 'Stefan Larsson' version = '1.0' icon = 'report-merge.svg' tags = Tags(Tag.Visual.Report) inputs = Ports([ report.ReportPort( 'First Report Template', name='first_input_template'), report.ReportPort( 'Second Report Template', name='second_input_template') ]) outputs = Ports([report.ReportPort('Merged Template', name='output_template')]) def execute(self, node_context): self.set_progress(0) document_1 = json.loads(node_context.input[0].get()) model_1 = models.Root(document_1) self.set_progress(5) document_2 = json.loads(node_context.input[1].get()) model_2 = models.Root(document_2) self.set_progress(10) page_uuids = [] for m in (model_1, model_2): for p in m.find_all_nodes_with_class(models.Page): page_uuids.append(p.uuid) second_page_uuids = [] for p in model_1.find_all_nodes_with_class(models.Page): second_page_uuids.append(p.uuid) # Remove all UUIDs from the first model since those are added by # default. for p in model_1.find_all_nodes_with_class(models.Page): page_uuids.remove(p.uuid) # Copy first model to become output model. output_model = copy.deepcopy(model_1) output_model.data['signals'] = self.merge_signals(model_1, model_2) self.set_progress(40) # Merge pages. pages_2 = model_2.find_all_nodes_with_class(models.Page) page_root = output_model.find_all_nodes_with_class(models.Pages)[0] for page in pages_2: if page.uuid in page_uuids: models.insert_node(page, page_root) page_uuids.remove(page.uuid) if page.uuid in second_page_uuids: # If pages from the same report are merged, update uuid # to avoid duplicates. page.update_uuid() self.set_progress(70) output_model.data['signals'] = models.compress_signals( output_model.data) output_model.data['scales'] = self.merge_scales(model_1, model_2, page_root) self.set_progress(100) # Write result. node_context.output[0].set(json.dumps(output_model.data)) @staticmethod def merge_signals(model_1, model_2): """ Merge list of signals used. :param model_1: first model :param model_2: second model :return: union of signals """ # Merge signal list of both documents. signals_1 = set(model_1.data['signals']) signals_2 = set(model_2.data['signals']) return list(signals_1.union(signals_2)) def merge_scales(self, model_1, model_2, page_root): """ Merge scales into a single list of scales while modifying names in the second model by appending _ if the scale id occurs in the first model. :param model_1: first model :param model_2: second model :param page_root: page root container :return: list of merged scales """ # Find all scales of both documents. scales_1 = model_1.data['scales'] scale_ids_1 = [x['id'] for x in scales_1] scales_2 = model_2.data['scales'] scale_ids_2 = [x['id'] for x in scales_2] # Ensure unique scale id:s by appending extra _ to duplicate names. continue_search = True while continue_search: continue_search = False for scale_id in scale_ids_1: if scale_id in scale_ids_2: index = scale_ids_2.index(scale_id) new_id = scale_id + '_' scales_2[index]['id'] = new_id scale_ids_2[index] = new_id # side-effect self.update_scale_names_in_pages(scale_id, new_id, page_root) continue_search = True break return scales_1 + scales_2 @staticmethod def update_scale_names_in_pages(old_scale_id, new_scale_id, page_root): """ Replace all instances of old scale name with new name. :param old_scale_id: old scale id :param new_scale_id: new scale id :param page_root: root page collection """ def search_and_update(node): if isinstance(node, str): return node == old_scale_id elif isinstance(node, collections.abc.Sequence): for i, v in enumerate(node): if search_and_update(v): node[i] = new_scale_id elif isinstance(node, collections.abc.Mapping): for k, v in node.items(): if search_and_update(v): node[k] = new_scale_id return False search_and_update(page_root.data)