Source code for node_metrics

# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
import numpy as np

import sklearn
import sklearn.base
import sklearn.metrics
import sklearn.exceptions
import sklearn.model_selection
import sklearn.feature_selection

from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import sywarn

from sylib.machinelearning.model import ModelPort
from sylib.machinelearning.abstract_nodes import SyML_abstract
from sylib.machinelearning.descriptors import Descriptor

from sylib.machinelearning.descriptors import BoolType
from sylib.machinelearning.descriptors import IntListType
from sylib.machinelearning.descriptors import IntType
from sylib.machinelearning.descriptors import NoneType
from sylib.machinelearning.descriptors import StringListType
from sylib.machinelearning.descriptors import StringType
from sylib.machinelearning.descriptors import UnionType

from sylib.machinelearning.utility import table_to_array


[docs]class ROCFromProb(SyML_abstract, node.Node): name = 'ROC from Probabilities' author = 'Mathias Broxvall' version = '0.1' icon = 'roc_curve.svg' description = ( 'Computes Receiver operating characteristics (ROC) based on ' 'calculated Y-probabilities and from true Y.') nodeid = 'org.sysess.sympathy.machinelearning.roc_prob' tags = Tags(Tag.MachineLearning.Metrics) descriptor = Descriptor() descriptor.name = name info = [ {'name': 'pos_label', 'dispname': 'Positive class label', 'type': UnionType([ NoneType(), IntType(), StringType()], default=None)}, {'name': 'drop_intermediate', 'dispname': 'Drop suboptimal thresholds', 'type': BoolType(default=True)}, {'name': 'header as label', 'desc': 'Use header of Y-prob as the target label', 'no-kw': True, 'type': BoolType(default=True)}, ] descriptor.set_info(info, doc_class=sklearn.metrics.roc_curve) parameters = node.parameters() SyML_abstract.generate_parameters(parameters, descriptor) inputs = Ports([Port.Table('Y-prob', name='Y-prob'), Port.Table('Y-true', name='Y-true')]) outputs = Ports([Port.Table('roc', name='roc')]) __doc__ = SyML_abstract.generate_docstring( description, descriptor.info, descriptor.attributes, inputs, outputs) def execute(self, node_context): Y_prob_tbl = node_context.input['Y-prob'] Y_true_tbl = node_context.input['Y-true'] roc_tbl = node_context.output['roc'] header_as_label = node_context.parameters['header as label'].value Y_prob = table_to_array(Y_prob_tbl) Y_true = table_to_array(Y_true_tbl) kwargs = self.__class__.descriptor.get_parameters( node_context.parameters) kwargs['y_true'] = Y_true kwargs['y_score'] = Y_prob if BoolType().from_string(header_as_label): label = Y_prob_tbl.column_names()[0] try: label = int(label) except ValueError: pass kwargs['pos_label'] = label fpr, tpr, thresholds = sklearn.metrics.roc_curve(**kwargs) roc_tbl.set_column_from_array('false positive rate', fpr) roc_tbl.set_column_from_array('true positive rate', tpr) roc_tbl.set_column_from_array('threshold', thresholds) roc_tbl.set_name('ROC')
[docs]class R2Score(SyML_abstract, node.Node): name = 'R² regression score (R2)' author = 'Mathias Broxvall' version = '0.1' icon = 'roc_curve.svg' description = ( 'Computes the R² regression score.\n' 'Best possible score is 1.0 and it can be negative (because the model ' 'can be arbitrarily bad). A constant model that always predicts the ' 'expected value of y, disregarding the input features, would get a ' 'R² score of 0.0) ') nodeid = 'org.sysess.sympathy.machinelearning.r2_score' tags = Tags(Tag.MachineLearning.Metrics) descriptor = Descriptor() descriptor.name = name descriptor.set_info([ ], doc_class=sklearn.metrics.r2_score) parameters = node.parameters() SyML_abstract.generate_parameters(parameters, descriptor) inputs = Ports([Port.Table('Y-prob', name='Y-prob'), Port.Table('Y-true', name='Y-true')]) outputs = Ports([Port.Table('r2 score', name='r2 score')]) __doc__ = SyML_abstract.generate_docstring( description, descriptor.info, descriptor.attributes, inputs, outputs) def execute(self, node_context): Y_prob_tbl = node_context.input['Y-prob'] Y_true_tbl = node_context.input['Y-true'] r2_tbl = node_context.output['r2 score'] Y_prob = table_to_array(Y_prob_tbl) Y_true = table_to_array(Y_true_tbl) kwargs = self.__class__.descriptor.get_parameters( node_context.parameters) kwargs['y_true'] = Y_true kwargs['y_pred'] = Y_prob r2_score = sklearn.metrics.r2_score(**kwargs) if isinstance(r2_score, np.ndarray): r2_tbl.set_column_from_array('r² score', r2_score) else: r2_tbl.set_column_from_array('r² score', np.array([r2_score])) r2_tbl.set_name('R² score')
[docs]class ConfusionFromPrediction(SyML_abstract, node.Node): name = 'Confusion Matrix' author = 'Mathias Broxvall' version = '0.1' icon = 'confusion_matrix.svg' description = ( 'Computes the confusion matrix given predictions and true Y-values.') nodeid = 'org.sysess.sympathy.machinelearning.confusion' tags = Tags(Tag.MachineLearning.Metrics) descriptor = Descriptor() descriptor.name = name info = [ {'name': 'labels', 'dispname': 'Labels', 'type': UnionType([ NoneType(), IntListType(), StringListType()], default="")}, {'name': 'include heading', 'dispname': 'Include heading', 'desc': 'Adds a columns with used class names', 'no-kw': True, 'type': BoolType(default=True)}, ] descriptor.set_info(info, doc_class=sklearn.metrics.confusion_matrix) parameters = node.parameters() SyML_abstract.generate_parameters(parameters, descriptor) inputs = Ports([Port.Table('Y-pred', name='Y-pred'), Port.Table('Y-true', name='Y-true')]) outputs = Ports([Port.Table('confusion-matrix', name='confusion-matrix')]) __doc__ = SyML_abstract.generate_docstring( description, descriptor.info, descriptor.attributes, inputs, outputs) def execute(self, node_context): Y_pred_tbl = node_context.input['Y-pred'] Y_true_tbl = node_context.input['Y-true'] cm_tbl = node_context.output['confusion-matrix'] heading = BoolType().from_string( node_context.parameters['include heading'].value) Y_pred = table_to_array(Y_pred_tbl) Y_true = table_to_array(Y_true_tbl) kwargs = self.__class__.descriptor.get_parameters( node_context.parameters) kwargs['y_true'] = Y_true kwargs['y_pred'] = Y_pred if kwargs['labels'] == []: kwargs['labels'] = None cm = sklearn.metrics.confusion_matrix(**kwargs) if kwargs['labels'] is not None: cols = kwargs['labels'] else: cols = sorted(list(set(Y_true.ravel()))) if heading: cm_tbl.set_column_from_array('label', np.array(cols)) for i, col in enumerate(cols): cm_tbl.set_column_from_array(str(cols[i]), cm[:, i]) if Y_true_tbl.get_name() is None: cm_tbl.set_name("Confusion matrix") else: cm_tbl.set_name(Y_true_tbl.get_name()+" confusion matrix")
[docs]class LearningCurve(node.Node): name = 'Learning Curve' author = 'Mathias Broxvall' version = '0.1' icon = 'learning_curve.svg' description = ( 'Generates a learning curve by training model multiple times' 'on incrementally larger subsets of the data and using ' 'cross validation for scoring. ' 'Plot performance of train-mean vs. test-mean for curve.') nodeid = 'org.sysess.sympathy.machinelearning.learningcurve' tags = Tags(Tag.MachineLearning.Metrics) parameters = node.parameters() parameters.set_boolean( 'shuffle', value=True, label='Shuffle', description='Randomizes the input dataset before passed to ' 'internal cross validation') parameters.set_float( 'smallest', value=0.1, label='Smallest fraction', description='Size of the smallest dataset as fraction of total') parameters.set_integer( 'steps', value=10, label='Steps', description='Number of different sizes of training/test data measured') parameters.set_integer( 'cv', value=3, label='Cross validation folds', description='Number of fold of cross-validation (minimum 2)') inputs = Ports([ ModelPort('Model', 'model'), Port.Table('X', name='X'), Port.Table('Y', name='Y') ]) outputs = Ports([ Port.Table('results', name='results'), Port.Table('statistics', name='statistics') ]) def execute(self, node_context): X_tbl = node_context.input['X'] Y_tbl = node_context.input['Y'] results = node_context.output['results'] statistics = node_context.output['statistics'] shuffle = node_context.parameters['shuffle'].value smallest = node_context.parameters['smallest'].value steps = node_context.parameters['steps'].value cv = node_context.parameters['cv'].value smallest = max(0, min(smallest, 1.0)) cv = max(2, cv) X = table_to_array(X_tbl) Y = table_to_array(Y_tbl) if shuffle: perm = np.random.permutation(X.shape[0]) X = X[perm] Y = Y[perm] in_model = node_context.input['model'] in_model.load() skl = in_model.get_skl() sizes, train_scores, test_scores = ( sklearn.model_selection.learning_curve( skl, X, Y[:, 0], cv=cv, train_sizes=np.linspace(smallest, 1.0, steps))) N = train_scores.shape[1] sizes_all = np.repeat(sizes, N) results.set_column_from_array("sizes", sizes_all) results.set_column_from_array("train", train_scores.ravel()) results.set_column_from_array("test", test_scores.ravel()) statistics.set_column_from_array("sizes", sizes) statistics.set_column_from_array("train_mean", np.mean(train_scores, axis=1)) statistics.set_column_from_array("test_mean", np.mean(test_scores, axis=1)) statistics.set_column_from_array("train_median", np.median(train_scores, axis=1)) statistics.set_column_from_array("test_median", np.median(test_scores, axis=1))
[docs]class ConditionalFromCategories(node.Node): name = 'Conditional Probabilty from Categories' author = 'Mathias Broxvall' version = '0.1' icon = 'cond_prob_cat.svg' description = ( 'Creates groups of all (categorical) features and gives probabilities' 'for Y. All of X must be categorical and all Y binary') nodeid = 'org.sysess.sympathy.machinelearning.cond_prob_cat' tags = Tags(Tag.MachineLearning.Metrics) parameters = node.parameters() inputs = Ports([ Port.Table('X', name='X'), Port.Table('Y', name='Y') ]) outputs = Ports([ Port.Table('results', name='results'), ]) def execute(self, node_context): X_tbl = node_context.input['X'] Y_tbl = node_context.input['Y'] results = node_context.output['results'] X_names = X_tbl.column_names() Y_names = Y_tbl.column_names() dfX = X_tbl.to_dataframe() dfXY = dfX.copy() for col_y in Y_names: if col_y in X_names: sywarn('Column {} exists in both X, Y'.format(col_y)) else: dfXY.loc[:, col_y] = Y_tbl.get_column_to_series(col_y) means = dfXY.groupby(X_names).mean() xy_indices = np.array(list(means.index)) y_means = np.array(means) if len(xy_indices.shape) == 1: xy_indices = xy_indices.reshape(xy_indices.shape+(1,)) if len(y_means.shape) == 1: y_means = y_means.reshape(y_means.shape+(1,)) for i, col_x in enumerate(X_names): results.set_column_from_array(col_x, xy_indices[:, i]) for i, col_y in enumerate(Y_names): results.set_column_from_array(col_y, y_means[:, i])