# This file is part of Sympathy for Data.
# Copyright (c) 2021 Combine Control Systems
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import warnings
import sklearn.exceptions
import sklearn.model_selection
import dask.dataframe as dd
import numpy as np
from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import SyDataError, sywarn
from sympathy.utils.pip_util import import_optional
from sylib.machinelearning.model import ModelPort
from sylib_aml.dataset import DatasetPort, prefix_rename
from sylib.machinelearning.utility import table_to_array
import inspect
import functools
from ast import literal_eval
from sympathy.api import qt2 as qt
QtWidgets = qt.QtWidgets
def _torch():
return import_optional("torch", group="torch")
@functools.lru_cache()
def mem_rows(ds, model):
ds = literal_eval(ds)
reader_config = dict(ds["reader_config"])
prefix = reader_config.pop('prefix', None)
df = dd.read_csv(
ds["paths"],
**reader_config,
include_path_column=True,
assume_missing=True,
blocksize=1000e6,
usecols=[
col
for col in ds["column_config"].keys()
if ds["column_config"][col]["orig"]
]
if reader_config["header"]
else [
i
for i in range(len(ds["column_config"].keys()))
if ds["column_config"][list(ds["column_config"].keys())[i]]["orig"]
],
)
prefix_rename(df, prefix)
# Repartition step
if "iterator_train" in model.__dict__ and df.npartitions >= 100:
df = df.repartition(partition_size="1000MB")
# # Persist changes to speed up future computations
df = df.persist()
return (df, df.npartitions, len(df.get_partition(0)),
df[ds["output_col"]].nunique())
[docs]
class FitDataSet(node.Node):
"""
Creates a fitted model object from model input and dataset representation
"""
name = "Fit Dataset (Experimental)"
nodeid = "com.sympathyfordata.advancedmachinelearning.fitdataset"
author = "Jannes Germishuys"
description = "Trains a model by fitting to dataset."
icon = "fit_ds.svg"
tags = Tags(Tag.MachineLearning.Apply)
inputs = Ports(
[
ModelPort("Input model", "in-model"),
DatasetPort("Input dataset", "X"),
Port.Custom("table", "sample_weights", name="sample_weights",
n=(0, 1)),
]
)
outputs = Ports([ModelPort("Output model", "out-model")])
def execute(self, node_context):
from sylib_aml.amlnets import ImgDataSet, TabDataSet, SyConcatDataset, SubsetRandomSampler
data = node_context.input["X"]
in_model = node_context.input["in-model"]
out_model = node_context.output["out-model"]
sample_weights = node_context.input.group("sample_weights")
if len(sample_weights) > 0:
sample_weight = table_to_array(sample_weights[0], unitary=True)
else:
sample_weight = None
out_model.source(in_model)
out_model.load()
model = out_model.get_skl()
data.load()
ds = data.get_ds()
if ds is None or model is None:
raise SyDataError("Empty dataset and/or model")
if ds["dstype"] == "table":
df, npartitions, nrows, ncategories = mem_rows(str(ds), model)
datasets = list(
map(
lambda x: TabDataSet(
x,
df,
ds["reader_config"],
ds["column_config"],
label_col=ds["output_col"],
),
ds["paths"],
)
)
cds = SyConcatDataset(datasets)
X = cds
Y = None
if not hasattr(model, 'train_split'):
model.train_split = None
if model.train_split is not None:
sampler = SubsetRandomSampler(
indices=tuple(ds["indices"]), block_size=nrows, cv=True)
model.train_split.cv = sampler
else:
sampler = SubsetRandomSampler(
indices=tuple(ds["indices"]), block_size=nrows, cv=False)
model.iterator_train__sampler = sampler
model.module__output_dim = ncategories.compute()
model.module__virtual_batch_size = nrows
elif ds["dstype"] == "image":
if "To Tensor" not in ds["transforms"]:
ds["transforms"].extend(["To Tensor"])
ds["transforms_values"].extend([{}])
cds = ImgDataSet(
ds["paths"], ds["transforms"], ds["transforms_values"],
ds["labels"],
preview=False
)
X = _torch().utils.data.Subset(cds, indices=ds["indices"])
Y = None
npartitions = 0
else:
raise TypeError("This data type is not supported.")
# Check if we can fit in a progress_update function
kwargs = {}
args = inspect.getfullargspec(model.fit).args
if "progress_fn" in args:
kwargs["progress_fn"] = lambda i: self.set_progress(i)
with warnings.catch_warnings():
warnings.simplefilter(
"ignore", sklearn.exceptions.ConvergenceWarning)
# Filter user warning related to named tensors
warnings.filterwarnings("ignore", category=UserWarning)
if Y is None:
if sample_weight is None:
if "iterator_train" in model.__dict__:
if ds["dstype"] == "table":
model.module__input_dim = (
len(ds["column_config"].keys())
- sum(
ds["column_config"][col]["exclude"]
for col in ds["column_config"].keys()
)
- 1
)
if npartitions > 1:
model.fit(X, Y)
else:
try:
X_train, Y = cds[tuple(ds["indices"])]
model.iterator_train__sampler = None
model.batch_size = 1024
model.fit(X_train, Y)
except MemoryError(
"Data too large to be loaded into "
"memory."):
pass
else:
model.fit(X, Y)
else:
if ds["dstype"] == "table":
model.module__input_dim = (
len(ds["column_config"].keys())
- sum(
ds["column_config"][col]["exclude"]
for col in ds["column_config"].keys()
)
- 1
)
# Add Y if possible
sywarn(
"This model requires an explicit Y, "
"attempting to load into memory...")
try:
X_train, Y = cds[tuple(ds["indices"])]
model.iterator_train__sampler = None
model.batch_size = 1024
model.fit(X_train, Y)
except MemoryError(
"Data too large to be loaded into "
"memory."):
pass
else:
# Add Y if possible
sywarn(
"This model requires an explicit Y, "
"attempting to load into memory...")
try:
X_train, Y = cds[ds["indices"]]
model.fit(X_train, Y)
except MemoryError(
"Data too large to be loaded into "
"memory."):
pass
else:
model.fit(X, sample_weight=sample_weight)
else:
if sample_weight is None:
model.fit(X, Y)
else:
model.fit(X, Y, sample_weight=sample_weight)
desc = out_model.get_desc()
# desc.set_x_names(X_tbl.column_names())
# if Y is not None:
# desc.set_y_names(Y_tbl.column_names())
desc.post_fit(model)
out_model.save()
[docs]
class PredictDataset(node.Node):
name = "Predict Dataset (Experimental)"
nodeid = "com.sympathyfordata.advancedmachinelearning.predict_dataset"
author = "Jannes Germishuys"
icon = "predict_ds.svg"
tags = Tags(Tag.MachineLearning.Apply)
description = "Uses a model to predict Y given X"
parameters = node.parameters()
inputs = Ports(
[ModelPort("Input model", "in-model"),
DatasetPort("Dataset", name="dataset")]
)
outputs = Ports(
[
Port.Table("Table Containing Predictions", "predictions"),
Port.Table("Table Containing True Values", "ground_truth"),
]
)
def execute(self, node_context):
from sylib_aml.amlnets import ImgDataSet, TabDataSet, SyConcatDataset, SubsetRandomSampler
in_model = node_context.input["in-model"]
pred_table = node_context.output["predictions"]
true_table = node_context.output["ground_truth"]
data = node_context.input["dataset"]
in_model.load()
model = in_model.get_skl()
data.load()
ds = data.get_ds()
if ds is None or model is None:
raise SyDataError("Empty dataset and/or model")
if ds["dstype"] == "table":
df, npartitions, nrows, ncategories = mem_rows(str(ds), model)
datasets = list(
map(
lambda x: TabDataSet(
x,
df,
ds["reader_config"],
ds["column_config"],
label_col=ds["output_col"],
),
ds["paths"],
)
)
cds = SyConcatDataset(datasets)
X = cds
# Y = None
sampler = SubsetRandomSampler(
indices=tuple(ds["indices"]), block_size=nrows, train=False
)
model.iterator_valid__sampler = sampler
elif ds["dstype"] == "image":
if "To Tensor" not in ds["transforms"]:
ds["transforms"].extend(["To Tensor"])
ds["transforms_values"].extend([{}])
cds = ImgDataSet(
ds["paths"], ds["transforms"], ds["transforms_values"],
ds["labels"], preview=False
)
if len(ds["indices"]) > 0:
test_ds = _torch().utils.data.Subset(cds, indices=ds["indices"])
else:
test_ds = _torch().utils.data.Subset(cds, indices=ds["indices"])
X = test_ds
# Y = None
else:
raise TypeError("This data type is not supported.")
with warnings.catch_warnings():
warnings.simplefilter(
"ignore", sklearn.exceptions.ConvergenceWarning)
# Filter user warning related to named tensors
warnings.filterwarnings("ignore", category=UserWarning)
if "iterator_valid" in model.__dict__:
model.batch_size = None
Y_pred = model.predict(X)
if ds["dstype"] == "table":
_, Y_test = cds[tuple(ds["indices"])]
Y_test = Y_test.numpy()
else:
Y_test = np.array([item[1] for item in iter(X)])
else:
# Add Y if possible
sywarn(
"This model requires an explicit Y, attempting to load "
"into memory...")
try:
model.iterator_valid__sampler = None
model.batch_size = 1024
X, Y_test = cds[tuple(ds["indices"])]
Y_test = Y_test.numpy()
Y_pred = model.predict(X)
except TypeError as exc:
raise SyDataError(
"Model does not implement the 'predict' function"
) from exc
return
if len(Y_pred.shape) < 2:
Y_pred = Y_pred.reshape(Y_pred.shape + (1,))
if len(Y_test.shape) < 2:
Y_test = Y_test.reshape(Y_test.shape + (1,))
if "labels" in ds:
y_names = [str(ds["labels"])]
else:
y_names = [str(ds["output_col"])]
if y_names is None:
y_names = ["y{0}".format(i) for i in range(Y_pred.shape[1])]
for i, name in enumerate(y_names):
pred_table.set_column_from_array(name, Y_pred[:, i])
true_table.set_column_from_array(name, Y_test[:, i])
# json = node_context.output["dataset"]
# json.set_ds(ds)
# json.save()