Source code for node_merge_reports
# This file is part of Sympathy for Data.
# Copyright (c) 2015, 2017, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import json
import copy
import collections.abc
import re
from sympathy.api import node as synode
from sympathy.api.nodeconfig import Ports, Tag, Tags
from sympathy.api import report
from sylib.report import models
prefix_pattern = re.compile(r'\d+\s.+')
[docs]
class MergeReports(synode.Node):
"""Merge pages of report by appending the second input to the first."""
name = 'Merge Reports'
nodeid = 'org.sysess.sympathy.report.merge'
author = 'Stefan Larsson'
icon = 'report-merge.svg'
tags = Tags(Tag.Visual.Report)
inputs = Ports([
report.ReportPort(
'First Report Template', name='first_input_template'),
report.ReportPort(
'Second Report Template', name='second_input_template')
])
outputs = Ports([report.ReportPort('Merged Template',
name='output_template')])
def execute(self, node_context):
self.set_progress(0)
document_1 = json.loads(node_context.input[0].get())
model_1 = models.Root(document_1)
self.set_progress(5)
document_2 = json.loads(node_context.input[1].get())
model_2 = models.Root(document_2)
self.set_progress(10)
page_uuids = []
for m in (model_1, model_2):
for p in m.find_all_nodes_with_class(models.Page):
page_uuids.append(p.uuid)
second_page_uuids = []
for p in model_1.find_all_nodes_with_class(models.Page):
second_page_uuids.append(p.uuid)
# Remove all UUIDs from the first model since those are added by
# default.
for p in model_1.find_all_nodes_with_class(models.Page):
page_uuids.remove(p.uuid)
# Copy first model to become output model.
output_model = copy.deepcopy(model_1)
output_model.data['signals'] = self.merge_signals(model_1, model_2)
self.set_progress(40)
# Merge pages.
pages_2 = model_2.find_all_nodes_with_class(models.Page)
page_root = output_model.find_all_nodes_with_class(models.Pages)[0]
for page in pages_2:
if page.uuid in page_uuids:
models.insert_node(page, page_root)
page_uuids.remove(page.uuid)
if page.uuid in second_page_uuids:
# If pages from the same report are merged, update uuid
# to avoid duplicates.
page.update_uuid()
self.set_progress(70)
output_model.data['signals'] = models.compress_signals(
output_model.data)
output_model.data['scales'] = self.merge_scales(model_1, model_2,
page_root)
self.set_progress(100)
# Write result.
node_context.output[0].set(json.dumps(output_model.data))
@staticmethod
def merge_signals(model_1, model_2):
"""
Merge list of signals used.
:param model_1: first model
:param model_2: second model
:return: union of signals
"""
# Merge signal list of both documents.
signals_1 = set(model_1.data['signals'])
signals_2 = set(model_2.data['signals'])
return list(signals_1.union(signals_2))
def merge_scales(self, model_1, model_2, page_root):
"""
Merge scales into a single list of scales while modifying names in
the second model by appending _ if the scale id occurs in the first
model.
:param model_1: first model
:param model_2: second model
:param page_root: page root container
:return: list of merged scales
"""
# Find all scales of both documents.
scales_1 = model_1.data['scales']
scale_ids_1 = [x['id'] for x in scales_1]
scales_2 = model_2.data['scales']
scale_ids_2 = [x['id'] for x in scales_2]
# Ensure unique scale id:s by appending extra _ to duplicate names.
continue_search = True
while continue_search:
continue_search = False
for scale_id in scale_ids_1:
if scale_id in scale_ids_2:
index = scale_ids_2.index(scale_id)
new_id = scale_id + '_'
scales_2[index]['id'] = new_id
scale_ids_2[index] = new_id
# side-effect
self.update_scale_names_in_pages(scale_id, new_id,
page_root)
continue_search = True
break
return scales_1 + scales_2
@staticmethod
def update_scale_names_in_pages(old_scale_id, new_scale_id, page_root):
"""
Replace all instances of old scale name with new name.
:param old_scale_id: old scale id
:param new_scale_id: new scale id
:param page_root: root page collection
"""
def search_and_update(node):
if isinstance(node, str):
return node == old_scale_id
elif isinstance(node, collections.abc.Sequence):
for i, v in enumerate(node):
if search_and_update(v):
node[i] = new_scale_id
elif isinstance(node, collections.abc.Mapping):
for k, v in node.items():
if search_and_update(v):
node[k] = new_scale_id
return False
search_and_update(page_root.data)