Source code for node_pipeline

# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.

import sklearn
import sklearn.base
import sklearn.exceptions
import sklearn.pipeline

from sympathy.api import node
from sympathy.api.nodeconfig import Ports, Tag, Tags

from sylib.machinelearning.model import ModelPort
from sylib.machinelearning.pipeline import PipelineDescriptor


[docs]class Pipeline(node.Node): name = 'Pipeline' author = 'Mathias Broxvall' version = '0.1' icon = 'pipeline.svg' description = 'Applies one model on the output of another' nodeid = 'org.sysess.sympathy.machinelearning.pipeline' tags = Tags(Tag.MachineLearning.Apply) inputs = Ports([ModelPort('models', 'models', n=(2, ))]) outputs = Ports([ModelPort('Output model', 'out-model')]) descriptor = PipelineDescriptor() descriptor.name = name descriptor.set_info([]) parameters = node.parameters() parameters.set_string( 'names', value='', label='Model names', description=( 'Comma separated list of model names, eg. Rescale, SVC. ' 'If fewer names are given than models then default names ' 'will be used.')) parameters.set_boolean( 'flatten', value=True, label='Flatten', description=( 'Flattens multiple pipeline objects into a single pipeline ' 'containing all models')) def execute(self, node_context): out_model = node_context.output['out-model'] models = node_context.input.group('models') names_raw = node_context.parameters['names'].value flatten = node_context.parameters['flatten'].value name_list = [x.strip() for x in names_raw.split(', ')] name_list = list(filter(lambda x: x != "", name_list)) if len(name_list) < len(models): for i in range(len(name_list), len(models)): model = models[i] model.load() desc = model.get_desc() name_list.append(desc.name) else: name_list = name_list[:len(models)] descs = [] skls = [] names = [] for i, model in enumerate(models): model.load() desc = model.get_desc() if flatten and isinstance(desc, PipelineDescriptor): names += [tpl[0] for tpl in desc.models] descs += [tpl[1] for tpl in desc.models] skls += [tpl[2] for tpl in desc.models] else: names.append(name_list[i]) descs.append(model.get_desc()) skls.append(model.get_skl()) skl = sklearn.pipeline.Pipeline(list(zip(names, skls))) desc = self.__class__.descriptor.new(skl) out_model.set_desc(desc) out_model.set_skl(skl) desc.set_steps(names, descs) out_model.save()
[docs]class SplitPipeline(node.Node): name = 'Pipeline decomposition' author = 'Mathias Broxvall' version = '0.1' icon = 'pipeline_split.svg' description = 'Pick out given model from a fitted pipeline' nodeid = 'org.sysess.sympathy.machinelearning.pipeline_split' tags = Tags(Tag.MachineLearning.Apply) inputs = Ports([ModelPort('model', 'model')]) outputs = Ports([ModelPort('Output model', 'out-model')]) parameters = node.parameters() parameters.set_string( 'name', value='A', label='Model name or index', description=( 'Index (0 to N) or name of model to pick out from pipeline')) def execute(self, node_context): out_model = node_context.output['out-model'] model = node_context.input['model'] name = node_context.parameters['name'].value model.load() desc = model.get_desc() out_desc = None, None try: index = int(name) _, out_desc, out_skl = list(desc.get_models())[index] except ValueError: for n, d, s in desc.get_models(): if n == name: out_desc, out_skl = d, s break if out_desc is not None: out_model.set_desc(out_desc) out_model.set_skl(out_skl) out_model.save()