Source code for node_metrics

# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np

import sklearn.metrics
import sklearn.model_selection

from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import sywarn

from sylib.machinelearning.model import ModelPort
from sylib.machinelearning.abstract_nodes import SyML_abstract
from sylib.machinelearning.descriptors import Descriptor

from sylib.machinelearning.descriptors import BoolType
from sylib.machinelearning.descriptors import IntListType
from sylib.machinelearning.descriptors import IntType
from sylib.machinelearning.descriptors import NoneType
from sylib.machinelearning.descriptors import StringListType
from sylib.machinelearning.descriptors import StringType
from sylib.machinelearning.descriptors import UnionType

from sylib.machinelearning.utility import table_to_array


[docs] class ROCFromProb(SyML_abstract, node.Node): """The ROC (receiver operating characteristic) curve is a graphical plot which illustrates the performance of a binary classifier system as its discrimination threshold is varied. It is created by plotting the fraction of true positives out of the positives (TPR = true positive rate) vs. the fraction of false positives out of the negatives (FPR = false positive rate), at various threshold settings. The threshold will typically vary from 1 (or infinity), where only classifications with 100% confidence will be considered as correct, to 0 where all classifications will be considered correct. Thus the curve will always go from (TPR=0.0, FPR=0.0) to (TPR=1.0, FPR=1.0). A good classifier will have a high TPR for all thresholds, while a bad (random) classifier will yield a linear roc curve from (0, 0) to (1, 1). """ name = 'ROC from Probabilities' author = 'Mathias Broxvall' icon = 'roc_curve.svg' description = ( 'Computes Receiver operating characteristics (ROC) based on ' 'calculated Y-probabilities and from true Y.') nodeid = 'org.sysess.sympathy.machinelearning.roc_prob' tags = Tags(Tag.MachineLearning.Metrics) descriptor = Descriptor() descriptor.name = name info = [ {'name': 'pos_label', 'dispname': 'Positive class label', 'type': UnionType([ NoneType(), IntType(), StringType()], default=None)}, {'name': 'drop_intermediate', 'dispname': 'Drop suboptimal thresholds', 'type': BoolType(default=True)}, {'name': 'header as label', 'desc': 'Use header of Y-prob as the target label', 'no-kw': True, 'type': BoolType(default=True)}, ] descriptor.set_info(info, doc_class=sklearn.metrics.roc_curve) parameters = node.parameters() SyML_abstract.generate_parameters(parameters, descriptor) inputs = Ports([Port.Table('Y-prob', name='Y-prob'), Port.Table('Y-true', name='Y-true')]) outputs = Ports([Port.Table('roc', name='roc')]) __doc__ += SyML_abstract.generate_docstring( description, descriptor.info, descriptor.attributes, inputs, outputs) def execute(self, node_context): Y_prob_tbl = node_context.input['Y-prob'] Y_true_tbl = node_context.input['Y-true'] roc_tbl = node_context.output['roc'] header_as_label = node_context.parameters['header as label'].value Y_prob = table_to_array(Y_prob_tbl) Y_true = table_to_array(Y_true_tbl) kwargs = self.__class__.descriptor.get_parameters( node_context.parameters) kwargs['y_true'] = Y_true kwargs['y_score'] = Y_prob if BoolType().from_string(header_as_label): label = Y_prob_tbl.column_names()[0] try: label = int(label) except ValueError: pass kwargs['pos_label'] = label fpr, tpr, thresholds = sklearn.metrics.roc_curve(**kwargs) roc_tbl.set_column_from_array('false positive rate', fpr) roc_tbl.set_column_from_array('true positive rate', tpr) roc_tbl.set_column_from_array('threshold', thresholds) roc_tbl.set_name('ROC')
[docs] class R2Score(SyML_abstract, node.Node): """The r2_score function computes the coefficient of determination. It represents the proportion of variance (of y) that has been explained by the independent variables in the model. It provides an indication of goodness of fit and therefore a measure of how well unseen samples are likely to be predicted by the model, through the proportion of explained variance. """ name = 'R² regression score (R2)' author = 'Mathias Broxvall' icon = 'roc_curve.svg' description = ( 'Computes the R² regression score.\n' 'Best possible score is 1.0 and it can be negative (because the model ' 'can be arbitrarily bad). A constant model that always predicts the ' 'expected value of y, disregarding the input features, would get a ' 'R² score of 0.0) ') nodeid = 'org.sysess.sympathy.machinelearning.r2_score' tags = Tags(Tag.MachineLearning.Metrics) descriptor = Descriptor() descriptor.name = name descriptor.set_info([ ], doc_class=sklearn.metrics.r2_score) parameters = node.parameters() SyML_abstract.generate_parameters(parameters, descriptor) inputs = Ports([Port.Table('Y-prob', name='Y-prob'), Port.Table('Y-true', name='Y-true')]) outputs = Ports([Port.Table('r2 score', name='r2 score')]) __doc__ += SyML_abstract.generate_docstring( description, descriptor.info, descriptor.attributes, inputs, outputs) def execute(self, node_context): Y_prob_tbl = node_context.input['Y-prob'] Y_true_tbl = node_context.input['Y-true'] r2_tbl = node_context.output['r2 score'] Y_prob = table_to_array(Y_prob_tbl) Y_true = table_to_array(Y_true_tbl) kwargs = self.__class__.descriptor.get_parameters( node_context.parameters) kwargs['y_true'] = Y_true kwargs['y_pred'] = Y_prob r2_score = sklearn.metrics.r2_score(**kwargs) if isinstance(r2_score, np.ndarray): r2_tbl.set_column_from_array('r² score', r2_score) else: r2_tbl.set_column_from_array('r² score', np.array([r2_score])) r2_tbl.set_name('R² score')
[docs] class ConfusionFromPrediction(SyML_abstract, node.Node): name = 'Confusion Matrix' author = 'Mathias Broxvall' icon = 'confusion_matrix.svg' description = ( 'Computes the confusion matrix given predictions and true Y-values.') nodeid = 'org.sysess.sympathy.machinelearning.confusion' tags = Tags(Tag.MachineLearning.Metrics) descriptor = Descriptor() descriptor.name = name info = [ {'name': 'labels', 'dispname': 'Labels', 'type': UnionType([ NoneType(), IntListType(), StringListType()], default="")}, {'name': 'include heading', 'dispname': 'Include heading', 'desc': 'Adds a columns with used class names', 'no-kw': True, 'type': BoolType(default=True)}, ] descriptor.set_info(info, doc_class=sklearn.metrics.confusion_matrix) parameters = node.parameters() SyML_abstract.generate_parameters(parameters, descriptor) inputs = Ports([Port.Table('Y-pred', name='Y-pred'), Port.Table('Y-true', name='Y-true')]) outputs = Ports([Port.Table('confusion-matrix', name='confusion-matrix')]) __doc__ = SyML_abstract.generate_docstring( description, descriptor.info, descriptor.attributes, inputs, outputs) def execute(self, node_context): Y_pred_tbl = node_context.input['Y-pred'] Y_true_tbl = node_context.input['Y-true'] cm_tbl = node_context.output['confusion-matrix'] heading = BoolType().from_string( node_context.parameters['include heading'].value) Y_pred = table_to_array(Y_pred_tbl) Y_true = table_to_array(Y_true_tbl) kwargs = self.__class__.descriptor.get_parameters( node_context.parameters) kwargs['y_true'] = Y_true kwargs['y_pred'] = Y_pred if kwargs['labels'] == []: kwargs['labels'] = None cm = sklearn.metrics.confusion_matrix(**kwargs) if kwargs['labels'] is not None: cols = kwargs['labels'] else: cols = sorted(list(set(Y_true.ravel()))) if heading: cm_tbl.set_column_from_array('label', np.array(cols)) for i, col in enumerate(cols): cm_tbl.set_column_from_array(str(cols[i]), cm[:, i]) if Y_true_tbl.get_name() is None: cm_tbl.set_name("Confusion matrix") else: cm_tbl.set_name(Y_true_tbl.get_name()+" confusion matrix")
[docs] class LearningCurve(node.Node): """A learning curve shows the validation and training score of an estimator for varying numbers of training samples. It is a tool to find out how much we benefit from adding more training data and whether the estimator suffers more from a variance error or a bias error. A cross-validation generator splits the whole dataset k times in training and test data. Subsets of the training set with varying sizes will be used to train the estimator and a score for each training subset size and the test set will be computed. Afterwards, the scores will be averaged over all k runs for each training subset size. """ name = 'Learning Curve' author = 'Mathias Broxvall' icon = 'learning_curve.svg' description = ( 'Generates a learning curve by training model multiple times' 'on incrementally larger subsets of the data and using ' 'cross validation for scoring. ' 'Plot performance of train-mean vs. test-mean for curve.') nodeid = 'org.sysess.sympathy.machinelearning.learningcurve' tags = Tags(Tag.MachineLearning.Metrics) parameters = node.parameters() parameters.set_boolean( 'shuffle', value=True, label='Shuffle', description='Randomizes the input dataset before passed to ' 'internal cross validation') parameters.set_float( 'smallest', value=0.1, label='Smallest fraction', description='Size of the smallest dataset as fraction of total') parameters.set_integer( 'steps', value=10, label='Steps', description='Number of different sizes of training/test data measured') parameters.set_integer( 'cv', value=3, label='Cross validation folds', description='Number of fold of cross-validation (minimum 2)') inputs = Ports([ ModelPort('Model', 'model'), Port.Table('X', name='X'), Port.Table('Y', name='Y') ]) outputs = Ports([ Port.Table('results', name='results'), Port.Table('statistics', name='statistics') ]) def execute(self, node_context): X_tbl = node_context.input['X'] Y_tbl = node_context.input['Y'] results = node_context.output['results'] statistics = node_context.output['statistics'] shuffle = node_context.parameters['shuffle'].value smallest = node_context.parameters['smallest'].value steps = node_context.parameters['steps'].value cv = node_context.parameters['cv'].value smallest = max(0, min(smallest, 1.0)) cv = max(2, cv) X = table_to_array(X_tbl) Y = table_to_array(Y_tbl) if shuffle: perm = np.random.permutation(X.shape[0]) X = X[perm] Y = Y[perm] in_model = node_context.input['model'] in_model.load() skl = in_model.get_skl() sizes, train_scores, test_scores = ( sklearn.model_selection.learning_curve( skl, X, Y[:, 0], cv=cv, train_sizes=np.linspace(smallest, 1.0, steps))) N = train_scores.shape[1] sizes_all = np.repeat(sizes, N) results.set_column_from_array("sizes", sizes_all) results.set_column_from_array("train", train_scores.ravel()) results.set_column_from_array("test", test_scores.ravel()) statistics.set_column_from_array("sizes", sizes) statistics.set_column_from_array("train_mean", np.mean(train_scores, axis=1)) statistics.set_column_from_array("test_mean", np.mean(test_scores, axis=1)) statistics.set_column_from_array("train_median", np.median(train_scores, axis=1)) statistics.set_column_from_array("test_median", np.median(test_scores, axis=1))
[docs] class ConditionalFromCategories(node.Node): name = 'Conditional Probabilty from Categories' author = 'Mathias Broxvall' icon = 'cond_prob_cat.svg' description = ( 'Creates groups of all (categorical) features and gives probabilities' 'for Y to be true (1) for each category. All of X must be categorical and all Y binary') nodeid = 'org.sysess.sympathy.machinelearning.cond_prob_cat' tags = Tags(Tag.MachineLearning.Metrics) parameters = node.parameters() inputs = Ports([ Port.Table('X', name='X'), Port.Table('Y', name='Y') ]) outputs = Ports([ Port.Table('results', name='results'), ]) def execute(self, node_context): X_tbl = node_context.input['X'] Y_tbl = node_context.input['Y'] results = node_context.output['results'] X_names = X_tbl.column_names() Y_names = Y_tbl.column_names() dfX = X_tbl.to_dataframe() dfXY = dfX.copy() for col_y in Y_names: if col_y in X_names: sywarn('Column {} exists in both X, Y'.format(col_y)) else: dfXY.loc[:, col_y] = Y_tbl.get_column_to_series(col_y) means = dfXY.groupby(X_names).mean() xy_indices = np.array(list(means.index)) y_means = np.array(means) if len(xy_indices.shape) == 1: xy_indices = xy_indices.reshape(xy_indices.shape+(1,)) if len(y_means.shape) == 1: y_means = y_means.reshape(y_means.shape+(1,)) for i, col_x in enumerate(X_names): results.set_column_from_array(col_x, xy_indices[:, i]) for i, col_y in enumerate(Y_names): results.set_column_from_array(col_y, y_means[:, i])