# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data. If not, see <http://www.gnu.org/licenses/>.
import numpy as np
import numpy.ma as ma
import sklearn.model_selection
import sklearn.exceptions
import copy
import math
import random
import warnings
from sympathy.api import node
from sympathy.api.nodeconfig import Port
from sympathy.api.nodeconfig import Ports
from sympathy.api.nodeconfig import Tag
from sympathy.api.nodeconfig import Tags
from sympathy.api.exceptions import SyNodeError
from sympathy.api.exceptions import SyDataError
from sympathy.api.exceptions import sywarn
from sympathy.types.sylist import sylistReadThrough
from sylib.machinelearning.model import ModelPort
from sylib.machinelearning.descriptors import NumericType
from sylib.machinelearning.utility import table_to_array
class ParameterSearch_SuperNode(node.Node):
author = 'Mathias Broxvall'
version = '0.1'
tags = Tags(Tag.MachineLearning.HyperParameters)
inputs = Ports([
ModelPort('in-model', name='in-model'),
Port.Tables('param-space', name='parameter space'),
Port.Table('X', name='X'),
Port.Table('Y', name='Y'),
Port.Custom('[(table,table)]', 'cross-validation',
name='cross-validation', n=(0, 1)),
])
outputs = Ports([
Port.Table('results', name='results'),
Port.Table('parameters', name='parameters'),
ModelPort('out-model', name='out-model'),
])
parameters = node.parameters()
parameters.set_integer(
'cv', value=3, label='Cross validation splits',
description=(
'Number of fold in the default K-Fold cross validation. '
'Ignored when cross-validation port is given'))
def setup(self, node_context):
"""Utility function for loading data, model and parameters"""
X_tbl = node_context.input['X']
Y_tbl = node_context.input['Y']
in_model = node_context.input['in-model']
X = table_to_array(X_tbl)
Y = table_to_array(Y_tbl)
in_model.load()
model = in_model.get_skl()
out_model = node_context.output['out-model']
out_model.source(in_model)
out_model.load()
desc = out_model.get_desc()
desc.set_x_names(X_tbl.column_names())
desc.set_y_names(Y_tbl.column_names())
out_model.save()
params = self.param_dict(desc, node_context)
if len(node_context.input.group('cross-validation')) > 0:
cv = [(tpl[0].get_column_to_array(tpl[0].column_names()[0]),
tpl[1].get_column_to_array(tpl[1].column_names()[0]))
for tpl in node_context.input.group('cross-validation')[0]]
else:
cv = None
return X, Y, model, desc, params, cv
def param_dict(self, desc, node_context):
"""List of parameter definitions"""
par_tables = node_context.input['parameter space']
if not isinstance(par_tables, sylistReadThrough):
par_tables = [par_tables]
pdicts = []
for tbl in par_tables:
pdict = {}
for name in tbl.column_names():
if name not in desc.types:
raise SyDataError('Invalid parameter {} for model {}.'
.format(name, desc.name))
col = tbl.get_column_to_array(name)
values = list(filter(
lambda v: v is not ma.masked,
list(col)))
values = [desc.types[name].from_string(str(val))
for val in values]
pdict[name] = values
pdicts.append(pdict)
return pdicts
def gen_results(self, node_context, search):
meta_tbl = node_context.output['results']
par_tbl = node_context.output['parameters']
for key in sorted(search.cv_results_.keys()):
if key != 'params':
# sklearn sometimes give references to strings as np.arrays,
# coerce back to normal np.arrays
if isinstance(search.cv_results_[key], ma.core.MaskedArray):
col = ma.array(list(search.cv_results_[key].data),
mask=search.cv_results_[key].mask)
else:
col = np.array(list(search.cv_results_[key]))
# Finally, if array contains references to objects, cast to
# strings as last resort
if col.dtype == np.dtype('O'):
col = np.array([str(o) for o in col])
if len(col.shape) > 1:
col = col[:, 0]
meta_tbl.set_column_from_array(key, col)
params = search.cv_results_['params']
all_keys, all_cols, all_masks = self.concatenate_parameters(params)
for key, col, mask in zip(all_keys, all_cols, all_masks):
par_tbl.set_column_from_array(key, ma.array(col, mask=mask))
out_model = node_context.output['out-model']
out_model.set_skl(search.best_estimator_)
out_model.save()
def concatenate_parameters(self, params):
all_keys = set([])
for d in params:
all_keys = all_keys.union(set(d.keys()))
all_keys = sorted(list(all_keys))
all_cols = []
all_masks = []
for key in all_keys:
cols = []
masks = []
for d in params:
if key not in d:
cols.append('')
masks.append(1)
else:
cols.append(str(d[key]))
masks.append(0)
all_cols.append(cols)
all_masks.append(masks)
return all_keys, all_cols, all_masks
[docs]class ParameterSearch_Randomized(ParameterSearch_SuperNode):
name = 'Randomized Parameter Search'
icon = 'random_hyperparam.svg'
description = (
'Performs a randomized parameter search returning scores '
'and best model found. Uses table as '
'a "hypercube" of parameters to sample from. Ie. all combinations '
'of parameters from all columns have equal chance of beeing sampled. '
'Use masked values for parameter/columns with different lengths. ')
nodeid = 'org.sysess.sympathy.machinelearning.randomized_parsearch'
inputs = Ports([
ModelPort('in-model', name='in-model'),
Port.Table('param-space', name='parameter space'),
Port.Table('X', name='X'),
Port.Table('Y', name='Y'),
Port.Custom('[(table,table)]', 'cross-validation',
name='cross-validation', n=(0, 1)),
])
parameters = copy.deepcopy(ParameterSearch_SuperNode.parameters)
parameters.set_integer(
'n_iter', value=10, label='iterations',
description='Number of randomized searches done')
def execute(self, node_context):
X, Y, model, desc, params, cv = self.setup(node_context)
if len(params) > 1:
sywarn('Randomized parameter search uses only'
'first hypercube of given parameters')
if cv is None:
cv = node_context.parameters['cv'].value
n_cv = cv
else:
n_cv = len(cv)
n_iter = node_context.parameters['n_iter'].value
self.progress_i = 0
def score_and_progress(estimator, X, y):
self.progress_i += 1
self.set_progress((100.0 * self.progress_i) / (n_iter * n_cv * 2))
return estimator.score(X, y)
# Catch some behaviour for specific scikit-learn versions
if (sklearn.__version__) < '0.22':
search = sklearn.model_selection.RandomizedSearchCV(
model, params[0],
cv=cv,
n_iter=n_iter,
scoring=score_and_progress,
iid=False,
return_train_score=True
)
else:
search = sklearn.model_selection.RandomizedSearchCV(
model, params[0],
cv=cv,
n_iter=n_iter,
scoring=score_and_progress,
return_train_score=True
)
if len(Y.shape) > 1:
Y = Y[:, 0].ravel()
with warnings.catch_warnings():
warnings.simplefilter(
'ignore', sklearn.exceptions.ConvergenceWarning)
search.fit(X, Y)
self.gen_results(node_context, search)
[docs]class ParameterSearch_Grid(ParameterSearch_SuperNode):
name = 'Grid Parameter Search'
icon = 'hyperparam.svg'
description = (
'Evaluates each parameter combination in the set of parameter '
'hypercubes given by each table in the list of parameter tables. '
'Each parameter table specify a hypercube of dimension N where N is '
'the number of columns in the table. Every combination of paramater '
'values in each such cube will be sampled.')
nodeid = 'org.sysess.sympathy.machinelearning.grid_parsearch'
def count_cases(self, params, cv):
cases = 0
for hypercube in params:
cases += np.prod(np.array(
[len(pars) for pars in hypercube.values()]))
if isinstance(cv, list):
cases *= len(cv)
else:
cases *= cv
cases *= 2
return cases
def execute(self, node_context):
X, Y, model, desc, params, cv = self.setup(node_context)
if cv is None:
cv = node_context.parameters['cv'].value
cases = self.count_cases(params, cv)
self.progress_i = 0
def score_and_progress(estimator, X, y):
self.progress_i += 1
self.set_progress((100.0 * self.progress_i) / cases)
return estimator.score(X, y)
Y = Y[:, 0]
# Catch some behaviour for specific scikit-learn versions
from packaging import version as pversion
if pversion.parse(sklearn.__version__) < pversion.Version('0.22'):
search = sklearn.model_selection.GridSearchCV(
model, params,
cv=cv,
scoring=score_and_progress,
iid=False,
return_train_score=True
)
else:
search = sklearn.model_selection.GridSearchCV(
model, params,
cv=cv,
scoring=score_and_progress,
return_train_score=True
)
with warnings.catch_warnings():
warnings.simplefilter(
'ignore', sklearn.exceptions.ConvergenceWarning)
search.fit(X, Y)
self.gen_results(node_context, search)
class _SimAnnealing_Results(object):
def __init__(self, types, model, params, temperature, scores):
self.types = types
self.best_estimator_ = copy.deepcopy(model)
self.cv_results_ = {
'params': []
}
self.cv_results_['temperature'] = [temperature]
self.cv_results_['mean_test_scores'] = [np.mean(scores)]
self.cv_results_['best_test_scores'] = [np.mean(scores)]
self.cv_results_['state_change'] = [True]
self.cv_results_['std_test_scores'] = [np.std(scores)]
for i, score in enumerate(scores):
self.cv_results_[
'split{}_test_score'.format(i)] = [score]
self.cv_results_['params'] = [params]
for name, value in params.items():
self.cv_results_[name] = [value]
def update(self, params, temperature, scores):
self.cv_results_['params'].append(params)
self.cv_results_['temperature'].append(temperature)
self.cv_results_['mean_test_scores'].append(np.mean(scores))
self.cv_results_['std_test_scores'].append(np.std(scores))
for i, score in enumerate(scores):
self.cv_results_[
'split{}_test_score'.format(i)].append(score)
for name, value in params.items():
self.cv_results_[name].append(value)
def update_best(self, params, best_score, changed):
self.cv_results_['best_test_scores'].append(best_score)
self.cv_results_['state_change'].append(changed)
# for name, value in params.items():
# self.cv_results_['best_{}'.format(name)].append(value)
def cast_to_arrays(self):
for key, value in self.cv_results_.items():
if key == 'params':
continue
elif (key in self.types.keys()
and not isinstance(self.types[key], NumericType)):
strs = [self.types[key].to_string(val)
for val in self.cv_results_[key]]
self.cv_results_[key] = np.array(strs)
else:
self.cv_results_[key] = np.array(self.cv_results_[key])
rank = np.zeros(len(self.cv_results_['mean_test_scores']), dtype=int)
sortpos = np.argsort(self.cv_results_['mean_test_scores'])
rank[sortpos] = len(rank) - np.arange(len(rank))
self.cv_results_['rank'] = rank
[docs]class ParameterSearch_SimulatedAnnealing(ParameterSearch_SuperNode):
name = 'Simulated Annealing Parameter Search'
icon = 'annealing_hyperparam.svg'
description = (
'Uses simulated annealing to find the optimal parameters by '
'considering a hyper cube of all possible indices to the given '
'parameter table. Each column of the parameter table corresponds to '
'one axis of this cube with a range corresponding to the non-masked '
'rows of the parameter table. '
'The radius for the annealing process assumes that all axes have unit '
'length regardless of the number of non-masked rows. '
'This node should be considered _experimental_ and may change in '
'the future')
nodeid = 'org.sysess.sympathy.machinelearning.sim_anneal_parsearch'
inputs = Ports([
ModelPort('in-model', name='in-model'),
Port.Table('param-space', name='parameter space'),
Port.Table('X', name='X'),
Port.Table('Y', name='Y'),
Port.Custom('[(table,table)]', 'cross-validation',
name='cross-validation', n=(0, 1)),
])
parameters = copy.deepcopy(ParameterSearch_SuperNode.parameters)
parameters.set_integer(
'n_iter', value=50, label='iterations',
description='Number of randomized searches done')
parameters.set_string(
'cooling', value='Exponential', label='Cooling method',
description='Method for lowering temperature',
editor=node.Util.combo_editor(
options=['Exponential', 'Linear', 'Logarithmic']))
parameters.set_float(
'cooling_arg', value=0.9, label='Cooling argument',
description=(
'Argument A to cooling method. '
'Exponential: T=A^t '
'Linear ignores A '
'Logarithmic: T=A/log(1+t)'))
# parameters.set_boolean(
# 'decradius', value=False, label='Decreasing radius',
# description='Reduces the search radius as temperature drops')
def execute(self, node_context):
cooling = node_context.parameters['cooling'].value
coolarg = node_context.parameters['cooling_arg'].value
# Temporarily disabled due to unclear references to methods viability
# decradius = node_context.parameters['decradius'].value
decradius = False
param_table = node_context.input['parameter space']
X, Y, model, desc, _, cv = self.setup(node_context)
params = [ma.compressed(param_table.get_column_to_array(name))
for name in param_table.column_names()]
param_names = param_table.column_names()
param_types = {name: desc.parameters[name]['type']
for name in param_names}
n_params = np.array([len(arr) for arr in params])
if cv is None:
cv = node_context.parameters['cv'].value
n_cv = cv
else:
n_cv = len(cv)
n_iter = node_context.parameters['n_iter'].value
def param_dict(indices):
return {name:
param_types[name].from_string(
params[i][indices[i]])
for i, name in enumerate(param_names)}
def eval_model(step):
self.progress_i = 0
def score_and_progress(estimator, X, y):
self.progress_i += 1
self.set_progress(step*100.0/n_iter +
self.progress_i*100.0 / (n_iter*n_cv*2))
return estimator.score(X, y)
return sklearn.model_selection.cross_val_score(
model, X, Y, cv=cv, scoring=score_and_progress)
self.set_progress(0.0)
best_par = (np.random.random(len(params)) * n_params).astype(int)
dct = param_dict(best_par)
model.set_params(**dct)
scores = eval_model(0)
score = np.mean(scores)
best_score = score
results = _SimAnnealing_Results(param_types, model, dct, 1.0, scores)
for n in range(1, n_iter):
if cooling == 'Exponential':
temperature = pow(coolarg, n)
elif cooling == 'Logarithmic':
temperature = coolarg / math.log(1+n)
elif cooling == 'Linear':
temperature = 1 - float(n)/(n_iter+1)
else:
raise SyNodeError(
'Invalid cooling option "{}" in node'.format(cooling))
new_idx = np.array(best_par)
axis = random.randrange(len(params))
# Non-categorical, use gaussian neighbourhood function
offset = np.random.randn() / 3
if decradius:
offset = offset * np.sqrt(temperature)
r = int(best_par[axis] + 0.5 + offset*n_params[axis])
r = max(0, min(n_params[axis]-1, r))
new_idx[axis] = r
dct = param_dict(new_idx)
model.set_params(**dct)
scores = eval_model(n)
score = np.mean(scores)
if score > best_score:
prob = 1.0
else:
prob = np.exp((score - best_score)/temperature)
changed = False
if random.random() <= prob:
best_score = score
best_par = new_idx
results.best_estimator_ = copy.deepcopy(model)
changed = True
results.update(dct, temperature, scores)
dct = param_dict(best_par)
results.update_best(dct, best_score, changed)
results.cast_to_arrays()
self.gen_results(node_context, results)