# This file is part of Sympathy for Data.
# Copyright (c) 2021 Combine Control Systems
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import warnings
import sklearn.exceptions
import dask.dataframe as dd
import numpy as np
from sympathy.api import node
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags
from sympathy.api.exceptions import SyDataError, sywarn
from sympathy.utils.pip_util import import_optional
from sylib.machinelearning.model import ModelPort
from sylib_aml.dataset import DatasetPort, prefix_rename
from sylib.machinelearning.utility import table_to_array
import inspect
import functools
from ast import literal_eval
from sympathy.api import qt2 as qt
QtWidgets = qt.QtWidgets
def _torch():
return import_optional("torch", group="torch")
@functools.lru_cache()
def mem_rows(ds, model):
ds = literal_eval(ds)
reader_config = dict(ds["reader_config"])
prefix = reader_config.pop('prefix', None)
df = dd.read_csv(
ds["paths"],
**reader_config,
include_path_column=True,
assume_missing=True,
blocksize=1000e6,
usecols=[
col
for col in ds["column_config"].keys()
if ds["column_config"][col]["orig"]
]
if reader_config["header"]
else [
i
for i in range(len(ds["column_config"].keys()))
if ds["column_config"][list(ds["column_config"].keys())[i]]["orig"]
],
)
prefix_rename(df, prefix)
# Repartition step
if "iterator_train" in model.__dict__ and df.npartitions >= 100:
df = df.repartition(partition_size="1000MB")
# # Persist changes to speed up future computations
df = df.persist()
return (df, df.npartitions, len(df.get_partition(0)),
df[ds["output_col"]].nunique())
[docs]
class FitDataSet(node.Node):
"""
Creates a fitted model object from model input and dataset representation
"""
name = "Fit Dataset (Experimental)"
nodeid = "com.sympathyfordata.advancedmachinelearning.fitdataset"
author = "Jannes Germishuys"
description = "Trains a model by fitting to dataset."
icon = "fit_ds.svg"
tags = Tags(Tag.MachineLearning.Apply)
inputs = Ports(
[
ModelPort("Input model", "in-model"),
DatasetPort("Input dataset", "X"),
Port.Custom("table", "sample_weights", name="sample_weights",
n=(0, 1)),
]
)
outputs = Ports([ModelPort("Output model", "out-model")])
def execute(self, node_context):
from sylib_aml.amlnets import ImgDataSet, TabDataSet, SyConcatDataset, SubsetRandomSampler
data = node_context.input["X"]
in_model = node_context.input["in-model"]
out_model = node_context.output["out-model"]
sample_weights = node_context.input.group("sample_weights")
if len(sample_weights) > 0:
sample_weight = table_to_array(sample_weights[0], unitary=True)
else:
sample_weight = None
out_model.source(in_model)
out_model.load()
model = out_model.get_skl()
data.load()
ds = data.get_ds()
if ds is None or model is None:
raise SyDataError("Empty dataset and/or model")
if ds["dstype"] == "table":
df, npartitions, nrows, ncategories = mem_rows(str(ds), model)
datasets = list(
map(
lambda x: TabDataSet(
x,
df,
ds["reader_config"],
ds["column_config"],
label_col=ds["output_col"],
),
ds["paths"],
)
)
cds = SyConcatDataset(datasets)
X = cds
Y = None
if not hasattr(model, 'train_split'):
model.train_split = None
if model.train_split is not None:
sampler = SubsetRandomSampler(
indices=tuple(ds["indices"]), block_size=nrows, cv=True)
model.train_split.cv = sampler
else:
sampler = SubsetRandomSampler(
indices=tuple(ds["indices"]), block_size=nrows, cv=False)
model.iterator_train__sampler = sampler
model.module__output_dim = ncategories.compute()
model.module__virtual_batch_size = nrows
elif ds["dstype"] == "image":
if "To Tensor" not in ds["transforms"]:
ds["transforms"].extend(["To Tensor"])
ds["transforms_values"].extend([{}])
cds = ImgDataSet(
ds["paths"], ds["transforms"], ds["transforms_values"],
ds["labels"],
preview=False
)
X = _torch().utils.data.Subset(cds, indices=ds["indices"])
Y = None
npartitions = 0
else:
raise TypeError("This data type is not supported.")
# Check if we can fit in a progress_update function
kwargs = {}
args = inspect.getfullargspec(model.fit).args
if "progress_fn" in args:
kwargs["progress_fn"] = lambda i: self.set_progress(i)
with warnings.catch_warnings():
warnings.simplefilter(
"ignore", sklearn.exceptions.ConvergenceWarning)
# Filter user warning related to named tensors
warnings.filterwarnings("ignore", category=UserWarning)
if Y is None:
if sample_weight is None:
if "iterator_train" in model.__dict__:
if ds["dstype"] == "table":
model.module__input_dim = (
len(ds["column_config"].keys())
- sum(
ds["column_config"][col]["exclude"]
for col in ds["column_config"].keys()
)
- 1
)
if npartitions > 1:
model.fit(X, Y)
else:
try:
X_train, Y = cds[tuple(ds["indices"])]
model.iterator_train__sampler = None
model.batch_size = 1024
model.fit(X_train, Y)
except MemoryError(
"Data too large to be loaded into "
"memory."):
pass
else:
model.fit(X, Y)
else:
if ds["dstype"] == "table":
model.module__input_dim = (
len(ds["column_config"].keys())
- sum(
ds["column_config"][col]["exclude"]
for col in ds["column_config"].keys()
)
- 1
)
# Add Y if possible
sywarn(
"This model requires an explicit Y, "
"attempting to load into memory...")
try:
X_train, Y = cds[tuple(ds["indices"])]
model.iterator_train__sampler = None
model.batch_size = 1024
model.fit(X_train, Y)
except MemoryError(
"Data too large to be loaded into "
"memory."):
pass
else:
# Add Y if possible
sywarn(
"This model requires an explicit Y, "
"attempting to load into memory...")
try:
X_train, Y = cds[ds["indices"]]
model.fit(X_train, Y)
except MemoryError(
"Data too large to be loaded into "
"memory."):
pass
else:
model.fit(X, sample_weight=sample_weight)
else:
if sample_weight is None:
model.fit(X, Y)
else:
model.fit(X, Y, sample_weight=sample_weight)
desc = out_model.get_desc()
# desc.set_x_names(X_tbl.column_names())
# if Y is not None:
# desc.set_y_names(Y_tbl.column_names())
desc.post_fit(model)
out_model.save()
[docs]
class PredictDataset(node.Node):
name = "Predict Dataset (Experimental)"
nodeid = "com.sympathyfordata.advancedmachinelearning.predict_dataset"
author = "Jannes Germishuys"
icon = "predict_ds.svg"
tags = Tags(Tag.MachineLearning.Apply)
description = "Uses a model to predict Y given X"
parameters = node.parameters()
inputs = Ports(
[ModelPort("Input model", "in-model"),
DatasetPort("Dataset", name="dataset")]
)
outputs = Ports(
[
Port.Table("Table Containing Predictions", "predictions"),
Port.Table("Table Containing True Values", "ground_truth"),
]
)
def execute(self, node_context):
from sylib_aml.amlnets import ImgDataSet, TabDataSet, SyConcatDataset, SubsetRandomSampler
in_model = node_context.input["in-model"]
pred_table = node_context.output["predictions"]
true_table = node_context.output["ground_truth"]
data = node_context.input["dataset"]
in_model.load()
model = in_model.get_skl()
data.load()
ds = data.get_ds()
if ds is None or model is None:
raise SyDataError("Empty dataset and/or model")
if ds["dstype"] == "table":
df, npartitions, nrows, ncategories = mem_rows(str(ds), model)
datasets = list(
map(
lambda x: TabDataSet(
x,
df,
ds["reader_config"],
ds["column_config"],
label_col=ds["output_col"],
),
ds["paths"],
)
)
cds = SyConcatDataset(datasets)
X = cds
# Y = None
sampler = SubsetRandomSampler(
indices=tuple(ds["indices"]), block_size=nrows, train=False
)
model.iterator_valid__sampler = sampler
elif ds["dstype"] == "image":
if "To Tensor" not in ds["transforms"]:
ds["transforms"].extend(["To Tensor"])
ds["transforms_values"].extend([{}])
cds = ImgDataSet(
ds["paths"], ds["transforms"], ds["transforms_values"],
ds["labels"], preview=False
)
if len(ds["indices"]) > 0:
test_ds = _torch().utils.data.Subset(cds, indices=ds["indices"])
else:
test_ds = _torch().utils.data.Subset(cds, indices=ds["indices"])
X = test_ds
# Y = None
else:
raise TypeError("This data type is not supported.")
with warnings.catch_warnings():
warnings.simplefilter(
"ignore", sklearn.exceptions.ConvergenceWarning)
# Filter user warning related to named tensors
warnings.filterwarnings("ignore", category=UserWarning)
if "iterator_valid" in model.__dict__:
model.batch_size = None
Y_pred = model.predict(X)
if ds["dstype"] == "table":
_, Y_test = cds[tuple(ds["indices"])]
Y_test = Y_test.numpy()
else:
Y_test = np.array([item[1] for item in iter(X)])
else:
# Add Y if possible
sywarn(
"This model requires an explicit Y, attempting to load "
"into memory...")
try:
model.iterator_valid__sampler = None
model.batch_size = 1024
X, Y_test = cds[tuple(ds["indices"])]
Y_test = Y_test.numpy()
Y_pred = model.predict(X)
except TypeError as exc:
raise SyDataError(
"Model does not implement the 'predict' function"
) from exc
return
if len(Y_pred.shape) < 2:
Y_pred = Y_pred.reshape(Y_pred.shape + (1,))
if len(Y_test.shape) < 2:
Y_test = Y_test.reshape(Y_test.shape + (1,))
if "labels" in ds:
y_names = [str(ds["labels"])]
else:
y_names = [str(ds["output_col"])]
if y_names is None:
y_names = ["y{0}".format(i) for i in range(Y_pred.shape[1])]
for i, name in enumerate(y_names):
pred_table.set_column_from_array(name, Y_pred[:, i])
true_table.set_column_from_array(name, Y_test[:, i])
# json = node_context.output["dataset"]
# json.set_ds(ds)
# json.save()