# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import itertools
import numpy as np
from sympathy.api import node as synode
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, adjust
from sympathy.api.exceptions import SyDataError
class IHeatMapAccumulator:
def __init__(self):
self._value = None
def add_data(self, data):
raise NotImplementedError
def value(self):
return self._value
class CountAccumulator(IHeatMapAccumulator):
def add_data(self, data):
if not self._value:
self._value = 0
self._value += data.size
class SumAccumulator(IHeatMapAccumulator):
def add_data(self, data):
if not self._value:
self._value = 0
self._value += data.sum()
class MinAccumulator(IHeatMapAccumulator):
def add_data(self, data):
if not data.size:
return
if self._value is None:
self._value = data.min()
else:
self._value = min(self._value, data.min())
class MaxAccumulator(IHeatMapAccumulator):
def add_data(self, data):
if not data.size:
return
if self._value is None:
self._value = data.max()
else:
self._value = max(self._value, data.max())
class MeanAccumulator(IHeatMapAccumulator):
def __init__(self):
self._sum = 0
self._count = 0
def add_data(self, data):
self._sum += data.sum()
self._count += data.size
def value(self):
if self._count:
return self._sum / self._count
else:
return None
class MedianAccumulator(IHeatMapAccumulator):
def __init__(self):
self._values = []
def add_data(self, data):
self._values.append(data)
def value(self):
if self._values:
return np.ma.median(np.vstack(self._values))
else:
return None
REDUCTION_FUNCTIONS = dict([
('Count (histogram)', CountAccumulator),
('Sum', SumAccumulator),
('Min', MinAccumulator),
('Max', MaxAccumulator),
('Mean', MeanAccumulator),
('Median', MedianAccumulator)])
[docs]
class HeatmapCalculation(synode.Node):
"""
This node calculates a 2D histogram or other heatmap of a given signal.
The inputs X, Y and Z (if selected) data columns must be of numeric,
datetime or timedelta types. X and Y data columns may not be of type
complex.
The output consists of bin edges and bin values and can for instance be
used in a heatmap plot in the node :ref:`Figure`.
This node ignores any rows in the input where one or more of the selected
columns are masked.
"""
author = 'Magnus Sandén'
icon = 'heatmap_calculation.svg'
name = 'Heatmap calculation'
description = ('Calculate a 2d histogram or other heatmap of a given'
'signal.')
nodeid = 'org.sysess.sympathy.dataanalysis.heatmapcalc'
tags = Tags(Tag.Analysis.Statistic)
parameters = synode.parameters()
combo_editor = synode.editors.combo_editor(edit=True)
reduction_editor = synode.editors.combo_editor(
options=list(REDUCTION_FUNCTIONS.keys()))
parameters.set_string('x_data_column', label="X data column:",
editor=combo_editor,
description='Select X axis data. Requires numeric, '
'datetime or timedelta type.')
parameters.set_string('y_data_column', label="Y data column:",
editor=combo_editor,
description='Select Y axis data. Requires numeric, '
'datetime or timedelta type.')
parameters.set_string('z_data_column', label="Z data column:",
description='The data points of the z data are '
'placed in bins according to the '
'cooresponding values of x and y. They '
'are then reduced to a single bin value '
'using the selected reduction function. '
'Requires numeric, datetime or timedelta'
' type. For "{}" no z data column is '
'needed.'
''.format(
list(REDUCTION_FUNCTIONS.keys())[0]),
editor=combo_editor)
parameters.set_string('reduction', label="Reduction function:",
value=list(REDUCTION_FUNCTIONS.keys())[0],
description='A function used on all the z data '
'points in a bin. For "{}" no z data '
'column is needed.'.format(
list(REDUCTION_FUNCTIONS.keys())[0]),
editor=reduction_editor)
parameters.set_integer('x_bins', label="X Bins:", value=10,
description='Number of bins on the x axis')
parameters.set_integer('y_bins', label="Y Bins:", value=10,
description='Number of bins on the y axis')
parameters.set_boolean('auto_range', label="Auto range", value=True,
description=('When checked, use data range as '
'histogram range'))
parameters.set_float(
'x_min', label="X min:", value=0.0, description='Set minimum X value')
parameters.set_float(
'x_max', label="X max:", value=1.0, description='Set maximum X value')
parameters.set_float(
'y_min', label="Y min:", value=0.0, description='Set minimum Y value')
parameters.set_float(
'y_max', label="Y max:", value=1.0, description='Set maximum Y value')
controllers = (synode.controller(
when=synode.field('auto_range', 'checked'),
action=(synode.field('x_min', 'disabled'),
synode.field('x_max', 'disabled'),
synode.field('y_min', 'disabled'),
synode.field('y_max', 'disabled'))),
synode.controller(
when=synode.field(
'reduction', 'value', list(REDUCTION_FUNCTIONS.keys())[0]),
action=synode.field('z_data_column', 'disabled')))
inputs = Ports([Port.Table('Input data', name='in')])
outputs = Ports([Port.Table('Heatmap data', name='out')])
def adjust_parameters(self, node_context):
adjust(node_context.parameters['x_data_column'],
node_context.input['in'])
adjust(node_context.parameters['y_data_column'],
node_context.input['in'])
adjust(node_context.parameters['z_data_column'],
node_context.input['in'])
def execute(self, node_context):
parameters = node_context.parameters
x_bins = parameters['x_bins'].value
y_bins = parameters['y_bins'].value
x_data_param = parameters['x_data_column']
y_data_param = parameters['y_data_column']
z_data_param = parameters['z_data_column']
auto_range = parameters['auto_range'].value
x_data = node_context.input['in']._require_column(x_data_param)
y_data = node_context.input['in']._require_column(y_data_param)
table_attributes = {
'x_data': x_data_param.value, 'y_data': y_data_param.value}
if (list(REDUCTION_FUNCTIONS.keys())[0] ==
parameters['reduction'].value):
z_data = np.zeros_like(x_data, dtype=int)
else:
z_data = node_context.input['in']._require_column(z_data_param)
table_attributes['z_data'] = z_data_param.value
def check_numeric(data, axis, kinds):
if data.dtype.kind not in kinds:
raise SyDataError(f"{axis} data column requires data of "
f"numeric, datetime or timedelta type.")
# Handle non-numeric arrays
check_numeric(x_data, 'X', 'biufmM')
check_numeric(y_data, 'Y', 'biufmM')
check_numeric(z_data, 'Z', 'biufcmM')
# Handle masked arrays
mask = np.zeros(x_data.shape, dtype=bool)
if isinstance(x_data, np.ma.MaskedArray):
mask |= x_data.mask
if isinstance(y_data, np.ma.MaskedArray):
mask |= y_data.mask
if isinstance(z_data, np.ma.MaskedArray):
mask |= z_data.mask
if np.any(mask):
mask = np.logical_not(mask)
x_data = x_data[mask]
y_data = y_data[mask]
z_data = z_data[mask]
# Handle datetimes in x and y:
x_dtype = x_data.dtype
y_dtype = y_data.dtype
z_dtype = z_data.dtype
if x_dtype.kind in 'mM':
x_data = x_data.astype('int64')
if y_dtype.kind in 'mM':
y_data = y_data.astype('int64')
if z_dtype.kind in 'mM':
z_data = z_data.astype('int64')
if auto_range:
x_min = min(x_data)
x_max = max(x_data)
y_min = min(y_data)
y_max = max(y_data)
else:
x_min = parameters['x_min'].value
x_max = parameters['x_max'].value
y_min = parameters['y_min'].value
y_max = parameters['y_max'].value
x_bin_edges = np.linspace(x_min, x_max, x_bins + 1)
y_bin_edges = np.linspace(y_min, y_max, y_bins + 1)
Accumulator = REDUCTION_FUNCTIONS[parameters['reduction'].value]
values_buffer = np.empty((x_bins, y_bins), dtype=object)
x_bin_indices = np.digitize(x_data, x_bin_edges)
y_bin_indices = np.digitize(y_data, y_bin_edges)
# Digitize puts values on bin edges in the right bin, but for the
# rightmost bin this is not what we want. We want the rightmost bin to
# be a closed interval.
on_x_edge = x_data == x_bin_edges[-1]
on_y_edge = y_data == y_bin_edges[-1]
x_bin_indices[on_x_edge] -= 1
y_bin_indices[on_y_edge] -= 1
# Build the values buffer. The values buffer holds a list of z
# values for each bin.
for x_bin_index, y_bin_index, z in zip(
x_bin_indices, y_bin_indices, z_data):
if 0 < x_bin_index <= x_bins and 0 < y_bin_index <= y_bins:
xi = x_bin_index - 1
yi = y_bin_index - 1
else:
# print("bin doesn't exist: ({}, {})".format(
# x_bin_index, y_bin_index))
continue
if values_buffer[xi, yi] is None:
values_buffer[xi, yi] = Accumulator()
values_buffer[xi, yi].add_data(z)
# Now go through the values buffer and reduce each list into the real z
# data for that bin.
bin_values = np.ma.masked_all((x_bins, y_bins), dtype=z_data.dtype)
for xi, yi in itertools.product(range(x_bins), range(y_bins)):
z_values = values_buffer[xi, yi]
if z_values is not None:
bin_values[xi, yi] = z_values.value()
x_output = np.array([x_bin_edges[:-1]] * y_bins).reshape(-1, order='F')
y_output = np.array([y_bin_edges[:-1]] * x_bins).reshape(-1, order='C')
if x_dtype.kind in 'mM':
x_output = x_output.astype(x_dtype)
if y_dtype.kind in 'mM':
y_output = y_output.astype(y_dtype)
if z_dtype.kind in 'mM':
bin_values = bin_values.astype(z_dtype)
node_context.output['out'].set_column_from_array(
"X bin edges", x_output)
node_context.output['out'].set_column_from_array(
"Y bin edges", y_output)
node_context.output['out'].set_column_from_array(
"Bin values", bin_values.flatten())
node_context.output['out'].set_table_attributes(table_attributes)