# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import numpy as np
import sklearn.metrics
import sklearn.model_selection
from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import sywarn
from sylib.machinelearning.model import ModelPort
from sylib.machinelearning.abstract_nodes import SyML_abstract
from sylib.machinelearning.descriptors import Descriptor
from sylib.machinelearning.descriptors import BoolType
from sylib.machinelearning.descriptors import IntListType
from sylib.machinelearning.descriptors import IntType
from sylib.machinelearning.descriptors import NoneType
from sylib.machinelearning.descriptors import StringListType
from sylib.machinelearning.descriptors import StringType
from sylib.machinelearning.descriptors import UnionType
from sylib.machinelearning.utility import table_to_array
[docs]
class ROCFromProb(SyML_abstract, node.Node):
"""The ROC (receiver operating characteristic) curve is a graphical plot which illustrates the
performance of a binary classifier system as its discrimination threshold is varied. It is
created by plotting the fraction of true positives out of the positives (TPR = true positive
rate) vs. the fraction of false positives out of the negatives (FPR = false positive rate),
at various threshold settings.
The threshold will typically vary from 1 (or infinity), where only classifications with 100%
confidence will be considered as correct, to 0 where all classifications will be considered
correct. Thus the curve will always go from (TPR=0.0, FPR=0.0) to (TPR=1.0, FPR=1.0). A good
classifier will have a high TPR for all thresholds, while a bad (random) classifier will yield
a linear roc curve from (0, 0) to (1, 1).
"""
name = 'ROC from Probabilities'
author = 'Mathias Broxvall'
icon = 'roc_curve.svg'
description = (
'Computes Receiver operating characteristics (ROC) based on '
'calculated Y-probabilities and from true Y.')
nodeid = 'org.sysess.sympathy.machinelearning.roc_prob'
tags = Tags(Tag.MachineLearning.Metrics)
descriptor = Descriptor()
descriptor.name = name
info = [
{'name': 'pos_label',
'dispname': 'Positive class label',
'type': UnionType([
NoneType(), IntType(), StringType()], default=None)},
{'name': 'drop_intermediate',
'dispname': 'Drop suboptimal thresholds',
'type': BoolType(default=True)},
{'name': 'header as label',
'desc': 'Use header of Y-prob as the target label',
'no-kw': True,
'type': BoolType(default=True)},
]
descriptor.set_info(info, doc_class=sklearn.metrics.roc_curve)
parameters = node.parameters()
SyML_abstract.generate_parameters(parameters, descriptor)
inputs = Ports([Port.Table('Y-prob', name='Y-prob'),
Port.Table('Y-true', name='Y-true')])
outputs = Ports([Port.Table('roc', name='roc')])
__doc__ += SyML_abstract.generate_docstring(
description, descriptor.info, descriptor.attributes, inputs, outputs)
def execute(self, node_context):
Y_prob_tbl = node_context.input['Y-prob']
Y_true_tbl = node_context.input['Y-true']
roc_tbl = node_context.output['roc']
header_as_label = node_context.parameters['header as label'].value
Y_prob = table_to_array(Y_prob_tbl)
Y_true = table_to_array(Y_true_tbl)
kwargs = self.__class__.descriptor.get_parameters(
node_context.parameters)
kwargs['y_true'] = Y_true
kwargs['y_score'] = Y_prob
if BoolType().from_string(header_as_label):
label = Y_prob_tbl.column_names()[0]
try:
label = int(label)
except ValueError:
pass
kwargs['pos_label'] = label
fpr, tpr, thresholds = sklearn.metrics.roc_curve(**kwargs)
roc_tbl.set_column_from_array('false positive rate', fpr)
roc_tbl.set_column_from_array('true positive rate', tpr)
roc_tbl.set_column_from_array('threshold', thresholds)
roc_tbl.set_name('ROC')
[docs]
class R2Score(SyML_abstract, node.Node):
"""The r2_score function computes the coefficient of determination. It represents the
proportion of variance (of y) that has been explained by the independent variables in the
model. It provides an indication of goodness of fit and therefore a measure of how well unseen
samples are likely to be predicted by the model, through the proportion of explained variance.
"""
name = 'R² regression score (R2)'
author = 'Mathias Broxvall'
icon = 'roc_curve.svg'
description = (
'Computes the R² regression score.\n'
'Best possible score is 1.0 and it can be negative (because the model '
'can be arbitrarily bad). A constant model that always predicts the '
'expected value of y, disregarding the input features, would get a '
'R² score of 0.0) ')
nodeid = 'org.sysess.sympathy.machinelearning.r2_score'
tags = Tags(Tag.MachineLearning.Metrics)
descriptor = Descriptor()
descriptor.name = name
descriptor.set_info([
], doc_class=sklearn.metrics.r2_score)
parameters = node.parameters()
SyML_abstract.generate_parameters(parameters, descriptor)
inputs = Ports([Port.Table('Y-prob', name='Y-prob'),
Port.Table('Y-true', name='Y-true')])
outputs = Ports([Port.Table('r2 score', name='r2 score')])
__doc__ += SyML_abstract.generate_docstring(
description, descriptor.info, descriptor.attributes, inputs, outputs)
def execute(self, node_context):
Y_prob_tbl = node_context.input['Y-prob']
Y_true_tbl = node_context.input['Y-true']
r2_tbl = node_context.output['r2 score']
Y_prob = table_to_array(Y_prob_tbl)
Y_true = table_to_array(Y_true_tbl)
kwargs = self.__class__.descriptor.get_parameters(
node_context.parameters)
kwargs['y_true'] = Y_true
kwargs['y_pred'] = Y_prob
r2_score = sklearn.metrics.r2_score(**kwargs)
if isinstance(r2_score, np.ndarray):
r2_tbl.set_column_from_array('r² score', r2_score)
else:
r2_tbl.set_column_from_array('r² score', np.array([r2_score]))
r2_tbl.set_name('R² score')
[docs]
class ConfusionFromPrediction(SyML_abstract, node.Node):
name = 'Confusion Matrix'
author = 'Mathias Broxvall'
icon = 'confusion_matrix.svg'
description = (
'Computes the confusion matrix given predictions and true Y-values.')
nodeid = 'org.sysess.sympathy.machinelearning.confusion'
tags = Tags(Tag.MachineLearning.Metrics)
descriptor = Descriptor()
descriptor.name = name
info = [
{'name': 'labels',
'dispname': 'Labels',
'type': UnionType([
NoneType(), IntListType(), StringListType()], default="")},
{'name': 'include heading',
'dispname': 'Include heading',
'desc': 'Adds a columns with used class names',
'no-kw': True,
'type': BoolType(default=True)},
]
descriptor.set_info(info, doc_class=sklearn.metrics.confusion_matrix)
parameters = node.parameters()
SyML_abstract.generate_parameters(parameters, descriptor)
inputs = Ports([Port.Table('Y-pred', name='Y-pred'),
Port.Table('Y-true', name='Y-true')])
outputs = Ports([Port.Table('confusion-matrix', name='confusion-matrix')])
__doc__ = SyML_abstract.generate_docstring(
description, descriptor.info, descriptor.attributes, inputs, outputs)
def execute(self, node_context):
Y_pred_tbl = node_context.input['Y-pred']
Y_true_tbl = node_context.input['Y-true']
cm_tbl = node_context.output['confusion-matrix']
heading = BoolType().from_string(
node_context.parameters['include heading'].value)
Y_pred = table_to_array(Y_pred_tbl)
Y_true = table_to_array(Y_true_tbl)
kwargs = self.__class__.descriptor.get_parameters(
node_context.parameters)
kwargs['y_true'] = Y_true
kwargs['y_pred'] = Y_pred
if kwargs['labels'] == []:
kwargs['labels'] = None
cm = sklearn.metrics.confusion_matrix(**kwargs)
if kwargs['labels'] is not None:
cols = kwargs['labels']
else:
cols = sorted(list(set(Y_true.ravel())))
if heading:
cm_tbl.set_column_from_array('label', np.array(cols))
for i, col in enumerate(cols):
cm_tbl.set_column_from_array(str(cols[i]), cm[:, i])
if Y_true_tbl.get_name() is None:
cm_tbl.set_name("Confusion matrix")
else:
cm_tbl.set_name(Y_true_tbl.get_name()+" confusion matrix")
[docs]
class LearningCurve(node.Node):
"""A learning curve shows the validation and training score of an estimator for varying numbers
of training samples. It is a tool to find out how much we benefit from adding more training
data and whether the estimator suffers more from a variance error or a bias error.
A cross-validation generator splits the whole dataset k times in training and test data.
Subsets of the training set with varying sizes will be used to train the estimator and a score
for each training subset size and the test set will be computed. Afterwards, the scores will
be averaged over all k runs for each training subset size.
"""
name = 'Learning Curve'
author = 'Mathias Broxvall'
icon = 'learning_curve.svg'
description = (
'Generates a learning curve by training model multiple times'
'on incrementally larger subsets of the data and using '
'cross validation for scoring. '
'Plot performance of train-mean vs. test-mean for curve.')
nodeid = 'org.sysess.sympathy.machinelearning.learningcurve'
tags = Tags(Tag.MachineLearning.Metrics)
parameters = node.parameters()
parameters.set_boolean(
'shuffle', value=True, label='Shuffle',
description='Randomizes the input dataset before passed to '
'internal cross validation')
parameters.set_float(
'smallest', value=0.1, label='Smallest fraction',
description='Size of the smallest dataset as fraction of total')
parameters.set_integer(
'steps', value=10, label='Steps',
description='Number of different sizes of training/test data measured')
parameters.set_integer(
'cv', value=3, label='Cross validation folds',
description='Number of fold of cross-validation (minimum 2)')
inputs = Ports([
ModelPort('Model', 'model'),
Port.Table('X', name='X'),
Port.Table('Y', name='Y')
])
outputs = Ports([
Port.Table('results', name='results'),
Port.Table('statistics', name='statistics')
])
def execute(self, node_context):
X_tbl = node_context.input['X']
Y_tbl = node_context.input['Y']
results = node_context.output['results']
statistics = node_context.output['statistics']
shuffle = node_context.parameters['shuffle'].value
smallest = node_context.parameters['smallest'].value
steps = node_context.parameters['steps'].value
cv = node_context.parameters['cv'].value
smallest = max(0, min(smallest, 1.0))
cv = max(2, cv)
X = table_to_array(X_tbl)
Y = table_to_array(Y_tbl)
if shuffle:
perm = np.random.permutation(X.shape[0])
X = X[perm]
Y = Y[perm]
in_model = node_context.input['model']
in_model.load()
skl = in_model.get_skl()
sizes, train_scores, test_scores = (
sklearn.model_selection.learning_curve(
skl, X, Y[:, 0], cv=cv,
train_sizes=np.linspace(smallest, 1.0, steps)))
N = train_scores.shape[1]
sizes_all = np.repeat(sizes, N)
results.set_column_from_array("sizes", sizes_all)
results.set_column_from_array("train", train_scores.ravel())
results.set_column_from_array("test", test_scores.ravel())
statistics.set_column_from_array("sizes", sizes)
statistics.set_column_from_array("train_mean",
np.mean(train_scores, axis=1))
statistics.set_column_from_array("test_mean",
np.mean(test_scores, axis=1))
statistics.set_column_from_array("train_median",
np.median(train_scores, axis=1))
statistics.set_column_from_array("test_median",
np.median(test_scores, axis=1))
[docs]
class ConditionalFromCategories(node.Node):
name = 'Conditional Probabilty from Categories'
author = 'Mathias Broxvall'
icon = 'cond_prob_cat.svg'
description = (
'Creates groups of all (categorical) features and gives probabilities'
'for Y to be true (1) for each category. All of X must be categorical and all Y binary')
nodeid = 'org.sysess.sympathy.machinelearning.cond_prob_cat'
tags = Tags(Tag.MachineLearning.Metrics)
parameters = node.parameters()
inputs = Ports([
Port.Table('X', name='X'),
Port.Table('Y', name='Y')
])
outputs = Ports([
Port.Table('results', name='results'),
])
def execute(self, node_context):
X_tbl = node_context.input['X']
Y_tbl = node_context.input['Y']
results = node_context.output['results']
X_names = X_tbl.column_names()
Y_names = Y_tbl.column_names()
dfX = X_tbl.to_dataframe()
dfXY = dfX.copy()
for col_y in Y_names:
if col_y in X_names:
sywarn('Column {} exists in both X, Y'.format(col_y))
else:
dfXY.loc[:, col_y] = Y_tbl.get_column_to_series(col_y)
means = dfXY.groupby(X_names).mean()
xy_indices = np.array(list(means.index))
y_means = np.array(means)
if len(xy_indices.shape) == 1:
xy_indices = xy_indices.reshape(xy_indices.shape+(1,))
if len(y_means.shape) == 1:
y_means = y_means.reshape(y_means.shape+(1,))
for i, col_x in enumerate(X_names):
results.set_column_from_array(col_x, xy_indices[:, i])
for i, col_y in enumerate(Y_names):
results.set_column_from_array(col_y, y_means[:, i])