Source code for node_ml_application

# This file is part of Sympathy for Data.
# Copyright (c) 2021 Combine Control Systems
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import warnings

import sklearn.exceptions
import sklearn.model_selection
import dask.dataframe as dd
import numpy as np

from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import SyDataError, sywarn
from sympathy.utils.pip_util import import_optional

from sylib.machinelearning.model import ModelPort
from sylib_aml.dataset import DatasetPort, prefix_rename
from sylib.machinelearning.utility import table_to_array

import inspect
import functools
from ast import literal_eval
from sympathy.api import qt2 as qt

QtWidgets = qt.QtWidgets


def _torch():
    return import_optional("torch", group="torch")


@functools.lru_cache()
def mem_rows(ds, model):
    ds = literal_eval(ds)
    reader_config = dict(ds["reader_config"])
    prefix = reader_config.pop('prefix', None)
    df = dd.read_csv(
        ds["paths"],
        **reader_config,
        include_path_column=True,
        assume_missing=True,
        blocksize=1000e6,
        usecols=[
            col
            for col in ds["column_config"].keys()
            if ds["column_config"][col]["orig"]
        ]
        if reader_config["header"]
        else [
            i
            for i in range(len(ds["column_config"].keys()))
            if ds["column_config"][list(ds["column_config"].keys())[i]]["orig"]
        ],
    )
    prefix_rename(df, prefix)
    # Repartition step
    if "iterator_train" in model.__dict__ and df.npartitions >= 100:
        df = df.repartition(partition_size="1000MB")
    #    # Persist changes to speed up future computations
    df = df.persist()
    return (df, df.npartitions, len(df.get_partition(0)),
            df[ds["output_col"]].nunique())


[docs] class FitDataSet(node.Node): """ Creates a fitted model object from model input and dataset representation """ name = "Fit Dataset (Experimental)" nodeid = "com.sympathyfordata.advancedmachinelearning.fitdataset" author = "Jannes Germishuys" description = "Trains a model by fitting to dataset." icon = "fit_ds.svg" tags = Tags(Tag.MachineLearning.Apply) inputs = Ports( [ ModelPort("Input model", "in-model"), DatasetPort("Input dataset", "X"), Port.Custom("table", "sample_weights", name="sample_weights", n=(0, 1)), ] ) outputs = Ports([ModelPort("Output model", "out-model")]) def execute(self, node_context): from sylib_aml.amlnets import ImgDataSet, TabDataSet, SyConcatDataset, SubsetRandomSampler data = node_context.input["X"] in_model = node_context.input["in-model"] out_model = node_context.output["out-model"] sample_weights = node_context.input.group("sample_weights") if len(sample_weights) > 0: sample_weight = table_to_array(sample_weights[0], unitary=True) else: sample_weight = None out_model.source(in_model) out_model.load() model = out_model.get_skl() data.load() ds = data.get_ds() if ds is None or model is None: raise SyDataError("Empty dataset and/or model") if ds["dstype"] == "table": df, npartitions, nrows, ncategories = mem_rows(str(ds), model) datasets = list( map( lambda x: TabDataSet( x, df, ds["reader_config"], ds["column_config"], label_col=ds["output_col"], ), ds["paths"], ) ) cds = SyConcatDataset(datasets) X = cds Y = None if not hasattr(model, 'train_split'): model.train_split = None if model.train_split is not None: sampler = SubsetRandomSampler( indices=tuple(ds["indices"]), block_size=nrows, cv=True) model.train_split.cv = sampler else: sampler = SubsetRandomSampler( indices=tuple(ds["indices"]), block_size=nrows, cv=False) model.iterator_train__sampler = sampler model.module__output_dim = ncategories.compute() model.module__virtual_batch_size = nrows elif ds["dstype"] == "image": if "To Tensor" not in ds["transforms"]: ds["transforms"].extend(["To Tensor"]) ds["transforms_values"].extend([{}]) cds = ImgDataSet( ds["paths"], ds["transforms"], ds["transforms_values"], ds["labels"], preview=False ) X = _torch().utils.data.Subset(cds, indices=ds["indices"]) Y = None npartitions = 0 else: raise TypeError("This data type is not supported.") # Check if we can fit in a progress_update function kwargs = {} args = inspect.getfullargspec(model.fit).args if "progress_fn" in args: kwargs["progress_fn"] = lambda i: self.set_progress(i) with warnings.catch_warnings(): warnings.simplefilter( "ignore", sklearn.exceptions.ConvergenceWarning) # Filter user warning related to named tensors warnings.filterwarnings("ignore", category=UserWarning) if Y is None: if sample_weight is None: if "iterator_train" in model.__dict__: if ds["dstype"] == "table": model.module__input_dim = ( len(ds["column_config"].keys()) - sum( ds["column_config"][col]["exclude"] for col in ds["column_config"].keys() ) - 1 ) if npartitions > 1: model.fit(X, Y) else: try: X_train, Y = cds[tuple(ds["indices"])] model.iterator_train__sampler = None model.batch_size = 1024 model.fit(X_train, Y) except MemoryError( "Data too large to be loaded into " "memory."): pass else: model.fit(X, Y) else: if ds["dstype"] == "table": model.module__input_dim = ( len(ds["column_config"].keys()) - sum( ds["column_config"][col]["exclude"] for col in ds["column_config"].keys() ) - 1 ) # Add Y if possible sywarn( "This model requires an explicit Y, " "attempting to load into memory...") try: X_train, Y = cds[tuple(ds["indices"])] model.iterator_train__sampler = None model.batch_size = 1024 model.fit(X_train, Y) except MemoryError( "Data too large to be loaded into " "memory."): pass else: # Add Y if possible sywarn( "This model requires an explicit Y, " "attempting to load into memory...") try: X_train, Y = cds[ds["indices"]] model.fit(X_train, Y) except MemoryError( "Data too large to be loaded into " "memory."): pass else: model.fit(X, sample_weight=sample_weight) else: if sample_weight is None: model.fit(X, Y) else: model.fit(X, Y, sample_weight=sample_weight) desc = out_model.get_desc() # desc.set_x_names(X_tbl.column_names()) # if Y is not None: # desc.set_y_names(Y_tbl.column_names()) desc.post_fit(model) out_model.save()
[docs] class PredictDataset(node.Node): name = "Predict Dataset (Experimental)" nodeid = "com.sympathyfordata.advancedmachinelearning.predict_dataset" author = "Jannes Germishuys" icon = "predict_ds.svg" tags = Tags(Tag.MachineLearning.Apply) description = "Uses a model to predict Y given X" parameters = node.parameters() inputs = Ports( [ModelPort("Input model", "in-model"), DatasetPort("Dataset", name="dataset")] ) outputs = Ports( [ Port.Table("Table Containing Predictions", "predictions"), Port.Table("Table Containing True Values", "ground_truth"), ] ) def execute(self, node_context): from sylib_aml.amlnets import ImgDataSet, TabDataSet, SyConcatDataset, SubsetRandomSampler in_model = node_context.input["in-model"] pred_table = node_context.output["predictions"] true_table = node_context.output["ground_truth"] data = node_context.input["dataset"] in_model.load() model = in_model.get_skl() data.load() ds = data.get_ds() if ds is None or model is None: raise SyDataError("Empty dataset and/or model") if ds["dstype"] == "table": df, npartitions, nrows, ncategories = mem_rows(str(ds), model) datasets = list( map( lambda x: TabDataSet( x, df, ds["reader_config"], ds["column_config"], label_col=ds["output_col"], ), ds["paths"], ) ) cds = SyConcatDataset(datasets) X = cds # Y = None sampler = SubsetRandomSampler( indices=tuple(ds["indices"]), block_size=nrows, train=False ) model.iterator_valid__sampler = sampler elif ds["dstype"] == "image": if "To Tensor" not in ds["transforms"]: ds["transforms"].extend(["To Tensor"]) ds["transforms_values"].extend([{}]) cds = ImgDataSet( ds["paths"], ds["transforms"], ds["transforms_values"], ds["labels"], preview=False ) if len(ds["indices"]) > 0: test_ds = _torch().utils.data.Subset(cds, indices=ds["indices"]) else: test_ds = _torch().utils.data.Subset(cds, indices=ds["indices"]) X = test_ds # Y = None else: raise TypeError("This data type is not supported.") with warnings.catch_warnings(): warnings.simplefilter( "ignore", sklearn.exceptions.ConvergenceWarning) # Filter user warning related to named tensors warnings.filterwarnings("ignore", category=UserWarning) if "iterator_valid" in model.__dict__: model.batch_size = None Y_pred = model.predict(X) if ds["dstype"] == "table": _, Y_test = cds[tuple(ds["indices"])] Y_test = Y_test.numpy() else: Y_test = np.array([item[1] for item in iter(X)]) else: # Add Y if possible sywarn( "This model requires an explicit Y, attempting to load " "into memory...") try: model.iterator_valid__sampler = None model.batch_size = 1024 X, Y_test = cds[tuple(ds["indices"])] Y_test = Y_test.numpy() Y_pred = model.predict(X) except TypeError as exc: raise SyDataError( "Model does not implement the 'predict' function" ) from exc return if len(Y_pred.shape) < 2: Y_pred = Y_pred.reshape(Y_pred.shape + (1,)) if len(Y_test.shape) < 2: Y_test = Y_test.reshape(Y_test.shape + (1,)) if "labels" in ds: y_names = [str(ds["labels"])] else: y_names = [str(ds["output_col"])] if y_names is None: y_names = ["y{0}".format(i) for i in range(Y_pred.shape[1])] for i, name in enumerate(y_names): pred_table.set_column_from_array(name, Y_pred[:, i]) true_table.set_column_from_array(name, Y_test[:, i])
# json = node_context.output["dataset"] # json.set_ds(ds) # json.save()