Source code for node_paramsearch

# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.

import numpy as np
import numpy.ma as ma
import sklearn.model_selection
import sklearn.exceptions
import copy
import math
import random
import warnings


from sympathy.api import node
from sympathy.api.nodeconfig import Port
from sympathy.api.nodeconfig import Ports
from sympathy.api.nodeconfig import Tag
from sympathy.api.nodeconfig import Tags
from sympathy.api.exceptions import SyNodeError
from sympathy.api.exceptions import SyDataError
from sympathy.api.exceptions import sywarn
from sympathy.types.sylist import is_read_through

from sylib.machinelearning.model import ModelPort
from sylib.machinelearning.descriptors import NumericType
from sylib.machinelearning.utility import table_to_array


class ParameterSearch_SuperNode(node.Node):
    author = 'Mathias Broxvall'
    version = '0.1'
    tags = Tags(Tag.MachineLearning.HyperParameters)

    inputs = Ports([
        ModelPort('in-model', name='in-model'),
        Port.Tables('param-space', name='parameter space'),
        Port.Table('X', name='X'),
        Port.Table('Y', name='Y'),
        Port.Custom('[(table,table)]', 'cross-validation',
                    name='cross-validation', n=(0, 1)),
    ])
    outputs = Ports([
        Port.Table('results', name='results'),
        Port.Table('parameters', name='parameters'),
        ModelPort('out-model', name='out-model'),
    ])
    parameters = node.parameters()
    parameters.set_integer(
        'cv', value=3, label='Cross validation splits',
        description=(
            'Number of fold in the default K-Fold cross validation. '
            'Ignored when cross-validation port is given'))

    def setup(self, node_context):
        """Utility function for loading data, model and parameters"""
        X_tbl = node_context.input['X']
        Y_tbl = node_context.input['Y']
        in_model = node_context.input['in-model']

        X = table_to_array(X_tbl)
        Y = table_to_array(Y_tbl)
        in_model.load()
        model = in_model.get_skl()

        out_model = node_context.output['out-model']
        out_model.source(in_model)
        out_model.load()
        desc = out_model.get_desc()
        desc.set_x_names(X_tbl.column_names())
        desc.set_y_names(Y_tbl.column_names())
        out_model.save()

        params = self.param_dict(desc, node_context)

        if len(node_context.input.group('cross-validation')) > 0:
            cv = [(tpl[0].get_column_to_array(tpl[0].column_names()[0]),
                   tpl[1].get_column_to_array(tpl[1].column_names()[0]))
                  for tpl in node_context.input.group('cross-validation')[0]]
        else:
            cv = None

        return X, Y, model, desc, params, cv

    def param_dict(self, desc, node_context):
        """List of parameter definitions"""
        par_tables = node_context.input['parameter space']
        if not is_read_through(par_tables):
            par_tables = [par_tables]

        pdicts = []
        for tbl in par_tables:
            pdict = {}
            for name in tbl.column_names():
                if name not in desc.types:
                    raise SyDataError('Invalid parameter {} for model {}.'
                                      .format(name, desc.name))
                col = tbl.get_column_to_array(name)
                values = list(filter(
                    lambda v: v is not ma.masked,
                    list(col)))
                values = [desc.types[name].from_string(str(val))
                          for val in values]
                pdict[name] = values
            pdicts.append(pdict)
        return pdicts

    def gen_results(self, node_context, search):
        meta_tbl = node_context.output['results']
        par_tbl = node_context.output['parameters']

        for key in sorted(search.cv_results_.keys()):
            if key != 'params':
                # sklearn sometimes give references to strings as np.arrays,
                # coerce back to normal np.arrays
                if isinstance(search.cv_results_[key], ma.core.MaskedArray):
                    try:
                        col = ma.array(list(search.cv_results_[key].data),
                                       mask=search.cv_results_[key].mask)
                    except Exception:
                        # Ragged array creation raises ValueError since numpy
                        # 1.2.4.
                        col = ma.array(list(search.cv_results_[key].data),
                                       mask=search.cv_results_[key].mask,
                                       dtype='O')
                else:
                    col = np.array(list(search.cv_results_[key]))
                # Finally, if array contains references to objects, cast to
                # strings as last resort
                if col.dtype == np.dtype('O'):
                    col = np.array([str(o) for o in col])
                if len(col.shape) > 1:
                    col = col[:, 0]
                meta_tbl.set_column_from_array(key, col)
        params = search.cv_results_['params']
        all_keys, all_cols, all_masks = self.concatenate_parameters(params)
        for key, col, mask in zip(all_keys, all_cols, all_masks):
            par_tbl.set_column_from_array(key, ma.array(col, mask=mask))

        out_model = node_context.output['out-model']
        out_model.set_skl(search.best_estimator_)
        out_model.save()

    def concatenate_parameters(self, params):
        all_keys = set([])
        for d in params:
            all_keys = all_keys.union(set(d.keys()))
        all_keys = sorted(list(all_keys))
        all_cols = []
        all_masks = []
        for key in all_keys:
            cols = []
            masks = []
            for d in params:
                if key not in d:
                    cols.append('')
                    masks.append(1)
                else:
                    cols.append(str(d[key]))
                    masks.append(0)
            all_cols.append(cols)
            all_masks.append(masks)
        return all_keys, all_cols, all_masks


[docs]class ParameterSearch_Randomized(ParameterSearch_SuperNode): name = 'Randomized Parameter Search' icon = 'random_hyperparam.svg' description = ( 'Performs a randomized parameter search returning scores ' 'and best model found. Uses table as ' 'a "hypercube" of parameters to sample from. Ie. all combinations ' 'of parameters from all columns have equal chance of beeing sampled. ' 'Use masked values for parameter/columns with different lengths. ') nodeid = 'org.sysess.sympathy.machinelearning.randomized_parsearch' inputs = Ports([ ModelPort('in-model', name='in-model'), Port.Table('param-space', name='parameter space'), Port.Table('X', name='X'), Port.Table('Y', name='Y'), Port.Custom('[(table,table)]', 'cross-validation', name='cross-validation', n=(0, 1)), ]) parameters = copy.deepcopy(ParameterSearch_SuperNode.parameters) parameters.set_integer( 'n_iter', value=10, label='iterations', description='Number of randomized searches done') def execute(self, node_context): X, Y, model, desc, params, cv = self.setup(node_context) if len(params) > 1: sywarn('Randomized parameter search uses only' 'first hypercube of given parameters') if cv is None: cv = node_context.parameters['cv'].value n_cv = cv else: n_cv = len(cv) n_iter = node_context.parameters['n_iter'].value self.progress_i = 0 def score_and_progress(estimator, X, y): self.progress_i += 1 self.set_progress((100.0 * self.progress_i) / (n_iter * n_cv * 2)) return estimator.score(X, y) search = sklearn.model_selection.RandomizedSearchCV( model, params[0], cv=cv, n_iter=n_iter, scoring=score_and_progress, return_train_score=True ) if len(Y.shape) > 1: Y = Y[:, 0].ravel() with warnings.catch_warnings(): warnings.simplefilter( 'ignore', sklearn.exceptions.ConvergenceWarning) search.fit(X, Y) self.gen_results(node_context, search)
[docs]class ParameterSearch_Grid(ParameterSearch_SuperNode): name = 'Grid Parameter Search' icon = 'hyperparam.svg' description = ( 'Evaluates each parameter combination in the set of parameter ' 'hypercubes given by each table in the list of parameter tables. ' 'Each parameter table specify a hypercube of dimension N where N is ' 'the number of columns in the table. Every combination of parameter ' 'values in each such cube will be sampled.') nodeid = 'org.sysess.sympathy.machinelearning.grid_parsearch' def count_cases(self, params, cv): cases = 0 for hypercube in params: cases += np.prod(np.array( [len(pars) for pars in hypercube.values()])) if isinstance(cv, list): cases *= len(cv) else: cases *= cv cases *= 2 return cases def execute(self, node_context): X, Y, model, desc, params, cv = self.setup(node_context) if cv is None: cv = node_context.parameters['cv'].value cases = self.count_cases(params, cv) self.progress_i = 0 def score_and_progress(estimator, X, y): self.progress_i += 1 self.set_progress((100.0 * self.progress_i) / cases) return estimator.score(X, y) Y = Y[:, 0] search = sklearn.model_selection.GridSearchCV( model, params, cv=cv, scoring=score_and_progress, return_train_score=True ) with warnings.catch_warnings(): warnings.simplefilter( 'ignore', sklearn.exceptions.ConvergenceWarning) search.fit(X, Y) self.gen_results(node_context, search)
class _SimAnnealing_Results: def __init__(self, types, model, params, temperature, scores): self.types = types self.best_estimator_ = copy.deepcopy(model) self.cv_results_ = { 'params': [] } self.cv_results_['temperature'] = [temperature] self.cv_results_['mean_test_scores'] = [np.mean(scores)] self.cv_results_['best_test_scores'] = [np.mean(scores)] self.cv_results_['state_change'] = [True] self.cv_results_['std_test_scores'] = [np.std(scores)] for i, score in enumerate(scores): self.cv_results_[ 'split{}_test_score'.format(i)] = [score] self.cv_results_['params'] = [params] for name, value in params.items(): self.cv_results_[name] = [value] def update(self, params, temperature, scores): self.cv_results_['params'].append(params) self.cv_results_['temperature'].append(temperature) self.cv_results_['mean_test_scores'].append(np.mean(scores)) self.cv_results_['std_test_scores'].append(np.std(scores)) for i, score in enumerate(scores): self.cv_results_[ 'split{}_test_score'.format(i)].append(score) for name, value in params.items(): self.cv_results_[name].append(value) def update_best(self, params, best_score, changed): self.cv_results_['best_test_scores'].append(best_score) self.cv_results_['state_change'].append(changed) # for name, value in params.items(): # self.cv_results_['best_{}'.format(name)].append(value) def cast_to_arrays(self): for key, value in self.cv_results_.items(): if key == 'params': continue elif (key in self.types.keys() and not isinstance(self.types[key], NumericType)): strs = [self.types[key].to_string(val) for val in self.cv_results_[key]] self.cv_results_[key] = np.array(strs) else: self.cv_results_[key] = np.array(self.cv_results_[key]) rank = np.zeros(len(self.cv_results_['mean_test_scores']), dtype=int) sortpos = np.argsort(self.cv_results_['mean_test_scores']) rank[sortpos] = len(rank) - np.arange(len(rank)) self.cv_results_['rank'] = rank
[docs]class ParameterSearch_SimulatedAnnealing(ParameterSearch_SuperNode): name = 'Simulated Annealing Parameter Search' icon = 'annealing_hyperparam.svg' description = ( 'Uses simulated annealing to find the optimal parameters by ' 'considering a hyper cube of all possible indices to the given ' 'parameter table. Each column of the parameter table corresponds to ' 'one axis of this cube with a range corresponding to the non-masked ' 'rows of the parameter table. ' 'The radius for the annealing process assumes that all axes have unit ' 'length regardless of the number of non-masked rows. ' 'This node should be considered _experimental_ and may change in ' 'the future') nodeid = 'org.sysess.sympathy.machinelearning.sim_anneal_parsearch' inputs = Ports([ ModelPort('in-model', name='in-model'), Port.Table('param-space', name='parameter space'), Port.Table('X', name='X'), Port.Table('Y', name='Y'), Port.Custom('[(table,table)]', 'cross-validation', name='cross-validation', n=(0, 1)), ]) parameters = copy.deepcopy(ParameterSearch_SuperNode.parameters) parameters.set_integer( 'n_iter', value=50, label='iterations', description='Number of randomized searches done') parameters.set_string( 'cooling', value='Exponential', label='Cooling method', description='Method for lowering temperature', editor=node.editors.combo_editor( options=['Exponential', 'Linear', 'Logarithmic'])) parameters.set_float( 'cooling_arg', value=0.9, label='Cooling argument', description=( 'Argument A to cooling method. ' 'Exponential: T=A^t ' 'Linear ignores A ' 'Logarithmic: T=A/log(1+t)')) # parameters.set_boolean( # 'decradius', value=False, label='Decreasing radius', # description='Reduces the search radius as temperature drops') def execute(self, node_context): cooling = node_context.parameters['cooling'].value coolarg = node_context.parameters['cooling_arg'].value # Temporarily disabled due to unclear references to methods viability # decradius = node_context.parameters['decradius'].value decradius = False param_table = node_context.input['parameter space'] X, Y, model, desc, _, cv = self.setup(node_context) params = [ma.compressed(param_table.get_column_to_array(name)) for name in param_table.column_names()] param_names = param_table.column_names() param_types = {name: desc.parameters[name]['type'] for name in param_names} n_params = np.array([len(arr) for arr in params]) if cv is None: cv = node_context.parameters['cv'].value n_cv = cv else: n_cv = len(cv) n_iter = node_context.parameters['n_iter'].value def param_dict(indices): return {name: param_types[name].from_string( params[i][indices[i]]) for i, name in enumerate(param_names)} def eval_model(step): self.progress_i = 0 def score_and_progress(estimator, X, y): self.progress_i += 1 self.set_progress(step*100.0/n_iter + self.progress_i*100.0 / (n_iter*n_cv*2)) return estimator.score(X, y) return sklearn.model_selection.cross_val_score( model, X, Y, cv=cv, scoring=score_and_progress) self.set_progress(0.0) best_par = (np.random.random(len(params)) * n_params).astype(int) dct = param_dict(best_par) model.set_params(**dct) scores = eval_model(0) score = np.mean(scores) best_score = score results = _SimAnnealing_Results(param_types, model, dct, 1.0, scores) for n in range(1, n_iter): if cooling == 'Exponential': temperature = pow(coolarg, n) elif cooling == 'Logarithmic': temperature = coolarg / math.log(1+n) elif cooling == 'Linear': temperature = 1 - float(n)/(n_iter+1) else: raise SyNodeError( 'Invalid cooling option "{}" in node'.format(cooling)) new_idx = np.array(best_par) axis = random.randrange(len(params)) # Non-categorical, use gaussian neighbourhood function offset = np.random.randn() / 3 if decradius: offset = offset * np.sqrt(temperature) r = int(best_par[axis] + 0.5 + offset*n_params[axis]) r = max(0, min(n_params[axis]-1, r)) new_idx[axis] = r dct = param_dict(new_idx) model.set_params(**dct) scores = eval_model(n) score = np.mean(scores) if score > best_score: prob = 1.0 else: prob = np.exp((score - best_score)/temperature) changed = False if random.random() <= prob: best_score = score best_par = new_idx results.best_estimator_ = copy.deepcopy(model) changed = True results.update(dct, temperature, scores) dct = param_dict(best_par) results.update_best(dct, best_score, changed) results.cast_to_arrays() self.gen_results(node_context, results)