Source code for node_correlation

# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
from packaging import version

from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import (
    SyNodeError, SyDataError, SyConfigurationError)
import scipy
from scipy.spatial.distance import cdist

from skimage import feature
from sylib.imageprocessing.image import ImagePort


scipy_version = version.Version(scipy.__version__)


[docs] class MatchTemplate(node.Node): """ Uses cross-correlation to match a template image to an image. Returns an image with maxima where the template matches. """ name = 'Match Template' author = 'Mathias Broxvall' icon = 'image_template.svg' description = ( 'Uses cross-correlation to match a template image to an image. ' 'Returns an image with maxima where the template matches.' ) nodeid = 'com.sympathyfordata.imageanalysis.match_template' tags = Tags(Tag.ImageProcessing.ImageManipulation) parameters = node.parameters() parameters.set_string( 'padding mode', value='No padding', description=( 'Method used for padding the input'), label='Padding mode', editor=node.editors.combo_editor( options=['No padding', 'constant', 'edge', 'reflect', 'wrap']) ) parameters.set_float( 'k', value=0.0, description=( 'Value used for padding mode "constant"'), label='Pad value' ) inputs = Ports([ ImagePort('image to perform matching on', name='image'), ImagePort('Template image', name='template'), ]) outputs = Ports([ ImagePort('result after matching', name='result'), ]) def execute(self, node_context): image = node_context.input['image'].get_image() template = node_context.input['template'].get_image() pad_mode = node_context.parameters['padding mode'].value pad_value = node_context.parameters['k'].value result = feature.match_template( image, template, pad_input=True if pad_mode != 'No padding' else False, mode=pad_mode if pad_mode != 'No padding' else 'constant', constant_values=pad_value) node_context.output['result'].set_image(result)
[docs] class MatchDescriptors(node.Node): """ Performs brute-force matching between keypoints in the two tables """ author = 'Mathias Broxvall' icon = 'image_match_descriptors.svg' description = ('Performs brute-force matching between keypoints in ' 'the two tables') name = 'Match Descriptors' tags = Tags(Tag.ImageProcessing.Extract) nodeid = 'com.sympathyfordata.imageanalysis.match_descriptors' parameters = node.parameters() parameters.set_string( 'rescolumns', value='X, Y', label='Result columns', description=( 'Names of columns to be ignored during matching, separated ' 'by a comma. These columns will instead be included in the ' 'final output'), ) parameters.set_string( 'prefix1', value='1:', label='Prefix 1', description=('Prefix added to output names for columns from ' 'first input. '), ) parameters.set_string( 'prefix2', value='2:', label='Prefix 2', description=('Prefix added to output names for columns from ' 'second input. '), ) _minkowski = 'minkowski' parameters.set_string( 'metric', value='', label='Metric', description=( 'Metric used for comparing keypoints. ' 'See scipy.spatial.distance.cdist for details.'), editor=node.editors.combo_editor( options=['euclidean', 'cityblock', 'correlation', _minkowski, 'hamming', '', 'braycurtis', 'canberra', 'chebyshev', 'cosine', 'dice', 'hamming', 'jaccard', 'kulsinski', 'mahalanobis', 'matching', 'rogerstanimoto', 'russellrao', 'seuclidean', 'sokalmichener', 'sokalsneath', 'sqeuclidean', 'yule']) ) parameters.set_boolean( 'cross', value=False, label='Cross-check', description=( 'Performs matching in both direction and return a match ' '(key1, key2) only if keypoint 2 is the best match for keypoint 1 ' 'and keypoint 1 is the best match for keypoint 2'), ) parameters.set_float( 'max_distance', value=np.inf, label='Max distance', description='Maximum allowed distance to be regarded as a match') parameters.set_integer( 'p', value=2, label='P', description='P norm to apply for minkowski metrics') controllers = node.controller( when=node.field('metric', 'value', value=_minkowski), action=(node.field('p', 'enabled'))) inputs = Ports([ Port.Table('Keypoints 1', name='keypoints 1'), Port.Table('Keypoints 2', name='keypoints 2'), ]) outputs = Ports([ Port.Table('Table with results', name='result'), ]) def execute(self, node_context): input1 = node_context.input['keypoints 1'] input2 = node_context.input['keypoints 2'] output = node_context.output['result'] prefix1 = node_context.parameters['prefix1'].value prefix2 = node_context.parameters['prefix2'].value pnorm = node_context.parameters['p'].value cross_check = node_context.parameters['cross'].value metric = node_context.parameters['metric'].value max_distance = node_context.parameters['max_distance'].value if input1.number_of_rows() == 0 or input2.number_of_rows() == 0: raise SyDataError("Empty table") rescol = node_context.parameters['rescolumns'].value rescol = [s.strip() for s in rescol.split(',')] res1 = [col.data for col in input1.cols() if col.name in rescol] res2 = [col.data for col in input2.cols() if col.name in rescol] res1_names = [prefix1 + col.name for col in input1.cols() if col.name in rescol] res2_names = [prefix2 + col.name for col in input2.cols() if col.name in rescol] desc1 = np.column_stack( [col.data for col in input1.cols() if col.name not in rescol]) desc2 = np.column_stack( [col.data for col in input2.cols() if col.name not in rescol]) if desc1.shape[1] != desc2.shape[1]: raise SyNodeError("Number of input descriptors does not match") if metric == '': if np.issubdtype(desc1.dtype, np.bool_): metric = 'hamming' else: metric = 'euclidean' elif metric == 'wminkowski': raise SyConfigurationError( f'Metric: {metric} is no longer supported, use ' f'{self._minkowski} instead?') if metric == 'kulsinski' and scipy_version >= version.Version('1.9.0'): metric = 'kulczynski1' if metric == self._minkowski: distances = cdist(desc1, desc2, metric=metric, p=pnorm) else: distances = cdist(desc1, desc2, metric=metric) indices1 = np.arange(desc1.shape[0]) indices2 = np.argmin(distances, axis=1) if cross_check: matches1 = np.argmin(distances, axis=0) mask = indices1 == matches1[indices2] indices1 = indices1[mask] indices2 = indices2[mask] if max_distance < np.inf: mask = distances[indices1, indices2] < max_distance indices1 = indices1[mask] indices2 = indices2[mask] match_dist = distances[indices1, indices2] output.set_column_from_array("match", match_dist) for name, col in zip(res1_names, res1): output.set_column_from_array(name, col[indices1]) for name, col in zip(res2_names, res2): output.set_column_from_array(name, col[indices2])