Source code for node_pipeline
# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import sklearn
import sklearn.base
import sklearn.exceptions
import sklearn.pipeline
from sympathy.api import node
from sympathy.api.nodeconfig import Ports, Tag, Tags
from sylib.machinelearning.model import ModelPort
from sylib.machinelearning.pipeline import PipelineDescriptor
[docs]
class Pipeline(node.Node):
name = 'Pipeline'
author = 'Mathias Broxvall'
icon = 'pipeline.svg'
description = 'Applies one model on the output of another'
nodeid = 'org.sysess.sympathy.machinelearning.pipeline'
tags = Tags(Tag.MachineLearning.Apply)
inputs = Ports([ModelPort('models', 'models', n=(2, ))])
outputs = Ports([ModelPort('Output model', 'out-model')])
descriptor = PipelineDescriptor()
descriptor.name = name
descriptor.set_info([])
parameters = node.parameters()
parameters.set_string(
'names', value='', label='Model names',
description=(
'Comma separated list of model names, eg. Rescale, SVC. '
'If fewer names are given than models then default names '
'will be used.'))
parameters.set_boolean(
'flatten', value=True, label='Flatten',
description=(
'Flattens multiple pipeline objects into a single pipeline '
'containing all models'))
def execute(self, node_context):
out_model = node_context.output['out-model']
models = node_context.input.group('models')
names_raw = node_context.parameters['names'].value
flatten = node_context.parameters['flatten'].value
name_list = [x.strip() for x in names_raw.split(', ')]
name_list = list(filter(lambda x: x != "", name_list))
if len(name_list) < len(models):
for i in range(len(name_list), len(models)):
model = models[i]
model.load()
desc = model.get_desc()
name_list.append(desc.name)
else:
name_list = name_list[:len(models)]
descs = []
skls = []
names = []
for i, model in enumerate(models):
model.load()
desc = model.get_desc()
if flatten and isinstance(desc, PipelineDescriptor):
names += [tpl[0] for tpl in desc.models]
descs += [tpl[1] for tpl in desc.models]
skls += [tpl[2] for tpl in desc.models]
else:
names.append(name_list[i])
descs.append(model.get_desc())
skls.append(model.get_skl())
skl = sklearn.pipeline.Pipeline(list(zip(names, skls)))
desc = self.__class__.descriptor.new(skl)
out_model.set_desc(desc)
out_model.set_skl(skl)
desc.set_steps(names, descs)
out_model.save()
[docs]
class SplitPipeline(node.Node):
name = 'Pipeline decomposition'
author = 'Mathias Broxvall'
icon = 'pipeline_split.svg'
description = 'Pick out given model from a fitted pipeline'
nodeid = 'org.sysess.sympathy.machinelearning.pipeline_split'
tags = Tags(Tag.MachineLearning.Apply)
inputs = Ports([ModelPort('model', 'model')])
outputs = Ports([ModelPort('Output model', 'out-model')])
parameters = node.parameters()
parameters.set_string(
'name', value='A', label='Model name or index',
description=(
'Index (0 to N) or name of model to pick out from pipeline'))
def execute(self, node_context):
out_model = node_context.output['out-model']
model = node_context.input['model']
name = node_context.parameters['name'].value
model.load()
desc = model.get_desc()
out_desc = None, None
try:
index = int(name)
_, out_desc, out_skl = list(desc.get_models())[index]
except ValueError:
for n, d, s in desc.get_models():
if n == name:
out_desc, out_skl = d, s
break
if out_desc is not None:
out_model.set_desc(out_desc)
out_model.set_skl(out_skl)
out_model.save()