Source code for node_ml_application

# This file is part of Sympathy for Data.
# Copyright (c) 2021 Combine Control Systems
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import warnings

import sklearn.exceptions
import dask.dataframe as dd
import numpy as np

from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import SyDataError, sywarn
from sympathy.utils.pip_util import import_optional

from sylib.machinelearning.model import ModelPort
from sylib_aml.dataset import DatasetPort, prefix_rename
from sylib.machinelearning.utility import table_to_array

import inspect
import functools
from ast import literal_eval
from sympathy.api import qt2 as qt

QtWidgets = qt.QtWidgets


def _torch():
    return import_optional("torch", group="torch")


@functools.lru_cache()
def mem_rows(ds, model):
    ds = literal_eval(ds)
    reader_config = dict(ds["reader_config"])
    prefix = reader_config.pop('prefix', None)
    df = dd.read_csv(
        ds["paths"],
        **reader_config,
        include_path_column=True,
        assume_missing=True,
        blocksize=1000e6,
        usecols=[
            col
            for col in ds["column_config"].keys()
            if ds["column_config"][col]["orig"]
        ]
        if reader_config["header"]
        else [
            i
            for i in range(len(ds["column_config"].keys()))
            if ds["column_config"][list(ds["column_config"].keys())[i]]["orig"]
        ],
    )
    prefix_rename(df, prefix)
    # Repartition step
    if "iterator_train" in model.__dict__ and df.npartitions >= 100:
        df = df.repartition(partition_size="1000MB")
    #    # Persist changes to speed up future computations
    df = df.persist()
    return (df, df.npartitions, len(df.get_partition(0)),
            df[ds["output_col"]].nunique())


[docs] class FitDataSet(node.Node): """ Creates a fitted model object from model input and dataset representation """ name = "Fit Dataset (Experimental)" nodeid = "com.sympathyfordata.advancedmachinelearning.fitdataset" author = "Jannes Germishuys" description = "Trains a model by fitting to dataset." icon = "fit_ds.svg" tags = Tags(Tag.MachineLearning.Apply) inputs = Ports( [ ModelPort("Input model", "in-model"), DatasetPort("Input dataset", "X"), Port.Custom("table", "sample_weights", name="sample_weights", n=(0, 1)), ] ) outputs = Ports([ModelPort("Output model", "out-model")]) def execute(self, node_context): from sylib_aml.amlnets import ImgDataSet, TabDataSet, SyConcatDataset, SubsetRandomSampler data = node_context.input["X"] in_model = node_context.input["in-model"] out_model = node_context.output["out-model"] sample_weights = node_context.input.group("sample_weights") if len(sample_weights) > 0: sample_weight = table_to_array(sample_weights[0], unitary=True) else: sample_weight = None out_model.source(in_model) out_model.load() model = out_model.get_skl() data.load() ds = data.get_ds() if ds is None or model is None: raise SyDataError("Empty dataset and/or model") if ds["dstype"] == "table": df, npartitions, nrows, ncategories = mem_rows(str(ds), model) datasets = list( map( lambda x: TabDataSet( x, df, ds["reader_config"], ds["column_config"], label_col=ds["output_col"], ), ds["paths"], ) ) cds = SyConcatDataset(datasets) X = cds Y = None if not hasattr(model, 'train_split'): model.train_split = None if model.train_split is not None: sampler = SubsetRandomSampler( indices=tuple(ds["indices"]), block_size=nrows, cv=True) model.train_split.cv = sampler else: sampler = SubsetRandomSampler( indices=tuple(ds["indices"]), block_size=nrows, cv=False) model.iterator_train__sampler = sampler model.module__output_dim = ncategories.compute() model.module__virtual_batch_size = nrows elif ds["dstype"] == "image": if "To Tensor" not in ds["transforms"]: ds["transforms"].extend(["To Tensor"]) ds["transforms_values"].extend([{}]) cds = ImgDataSet( ds["paths"], ds["transforms"], ds["transforms_values"], ds["labels"], preview=False ) X = _torch().utils.data.Subset(cds, indices=ds["indices"]) Y = None npartitions = 0 else: raise TypeError("This data type is not supported.") # Check if we can fit in a progress_update function kwargs = {} args = inspect.getfullargspec(model.fit).args if "progress_fn" in args: kwargs["progress_fn"] = lambda i: self.set_progress(i) with warnings.catch_warnings(): warnings.simplefilter( "ignore", sklearn.exceptions.ConvergenceWarning) # Filter user warning related to named tensors warnings.filterwarnings("ignore", category=UserWarning) if Y is None: if sample_weight is None: if "iterator_train" in model.__dict__: if ds["dstype"] == "table": model.module__input_dim = ( len(ds["column_config"].keys()) - sum( ds["column_config"][col]["exclude"] for col in ds["column_config"].keys() ) - 1 ) if npartitions > 1: model.fit(X, Y) else: try: X_train, Y = cds[tuple(ds["indices"])] model.iterator_train__sampler = None model.batch_size = 1024 model.fit(X_train, Y) except MemoryError( "Data too large to be loaded into " "memory."): pass else: model.fit(X, Y) else: if ds["dstype"] == "table": model.module__input_dim = ( len(ds["column_config"].keys()) - sum( ds["column_config"][col]["exclude"] for col in ds["column_config"].keys() ) - 1 ) # Add Y if possible sywarn( "This model requires an explicit Y, " "attempting to load into memory...") try: X_train, Y = cds[tuple(ds["indices"])] model.iterator_train__sampler = None model.batch_size = 1024 model.fit(X_train, Y) except MemoryError( "Data too large to be loaded into " "memory."): pass else: # Add Y if possible sywarn( "This model requires an explicit Y, " "attempting to load into memory...") try: X_train, Y = cds[ds["indices"]] model.fit(X_train, Y) except MemoryError( "Data too large to be loaded into " "memory."): pass else: model.fit(X, sample_weight=sample_weight) else: if sample_weight is None: model.fit(X, Y) else: model.fit(X, Y, sample_weight=sample_weight) desc = out_model.get_desc() # desc.set_x_names(X_tbl.column_names()) # if Y is not None: # desc.set_y_names(Y_tbl.column_names()) desc.post_fit(model) out_model.save()
[docs] class PredictDataset(node.Node): name = "Predict Dataset (Experimental)" nodeid = "com.sympathyfordata.advancedmachinelearning.predict_dataset" author = "Jannes Germishuys" icon = "predict_ds.svg" tags = Tags(Tag.MachineLearning.Apply) description = "Uses a model to predict Y given X" parameters = node.parameters() inputs = Ports( [ModelPort("Input model", "in-model"), DatasetPort("Dataset", name="dataset")] ) outputs = Ports( [ Port.Table("Table Containing Predictions", "predictions"), Port.Table("Table Containing True Values", "ground_truth"), ] ) def execute(self, node_context): from sylib_aml.amlnets import ImgDataSet, TabDataSet, SyConcatDataset, SubsetRandomSampler in_model = node_context.input["in-model"] pred_table = node_context.output["predictions"] true_table = node_context.output["ground_truth"] data = node_context.input["dataset"] in_model.load() model = in_model.get_skl() data.load() ds = data.get_ds() if ds is None or model is None: raise SyDataError("Empty dataset and/or model") if ds["dstype"] == "table": df, npartitions, nrows, ncategories = mem_rows(str(ds), model) datasets = list( map( lambda x: TabDataSet( x, df, ds["reader_config"], ds["column_config"], label_col=ds["output_col"], ), ds["paths"], ) ) cds = SyConcatDataset(datasets) X = cds # Y = None sampler = SubsetRandomSampler( indices=tuple(ds["indices"]), block_size=nrows, train=False ) model.iterator_valid__sampler = sampler elif ds["dstype"] == "image": if "To Tensor" not in ds["transforms"]: ds["transforms"].extend(["To Tensor"]) ds["transforms_values"].extend([{}]) cds = ImgDataSet( ds["paths"], ds["transforms"], ds["transforms_values"], ds["labels"], preview=False ) if len(ds["indices"]) > 0: test_ds = _torch().utils.data.Subset(cds, indices=ds["indices"]) else: test_ds = _torch().utils.data.Subset(cds, indices=ds["indices"]) X = test_ds # Y = None else: raise TypeError("This data type is not supported.") with warnings.catch_warnings(): warnings.simplefilter( "ignore", sklearn.exceptions.ConvergenceWarning) # Filter user warning related to named tensors warnings.filterwarnings("ignore", category=UserWarning) if "iterator_valid" in model.__dict__: model.batch_size = None Y_pred = model.predict(X) if ds["dstype"] == "table": _, Y_test = cds[tuple(ds["indices"])] Y_test = Y_test.numpy() else: Y_test = np.array([item[1] for item in iter(X)]) else: # Add Y if possible sywarn( "This model requires an explicit Y, attempting to load " "into memory...") try: model.iterator_valid__sampler = None model.batch_size = 1024 X, Y_test = cds[tuple(ds["indices"])] Y_test = Y_test.numpy() Y_pred = model.predict(X) except TypeError as exc: raise SyDataError( "Model does not implement the 'predict' function" ) from exc return if len(Y_pred.shape) < 2: Y_pred = Y_pred.reshape(Y_pred.shape + (1,)) if len(Y_test.shape) < 2: Y_test = Y_test.reshape(Y_test.shape + (1,)) if "labels" in ds: y_names = [str(ds["labels"])] else: y_names = [str(ds["output_col"])] if y_names is None: y_names = ["y{0}".format(i) for i in range(Y_pred.shape[1])] for i, name in enumerate(y_names): pred_table.set_column_from_array(name, Y_pred[:, i]) true_table.set_column_from_array(name, Y_test[:, i])
# json = node_context.output["dataset"] # json.set_ds(ds) # json.save()