# This file is part of Sympathy for Data.
# Copyright (c) 2021 Combine Control Systems
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
import pandas as pd
import numpy as np
from sympathy.api import node, exceptions
from sympathy.api.nodeconfig import Ports, Tag, Tags
from sympathy.api import qt2 as qt_compat
from sympathy.utils import port
from sympathy.platform.parameter_helper import ParameterBoolean
from sympathy.platform.parameter_helper_gui import WidgetBuildingVisitor
from sylib.table.table_importer_csv_gui import TableImportWidgetCSV
from sylib_aml.dataset import DatasetPort, prefix_rename
QtWidgets = qt_compat.import_module('QtWidgets')
[docs]
class ImageDataset(node.Node):
"""
Create and initialise a dataset from paths to image datasources.
"""
name = "Image Dataset (Experimental)"
nodeid = "com.sympathyfordata.advancedmachinelearning.imgdataset"
author = "Jannes Germishuys"
icon = "image_ds.svg"
tags = Tags(Tag.MachineLearning.IO)
parameters = node.parameters()
inputs = Ports(
[
port.CustomPort(
"[datasource]",
"Sources of image data. Must be files on disk",
name="source",
)
]
)
outputs = Ports([DatasetPort("Dataset", "dataset")])
def execute(self, node_context):
try:
paths = [s.decode_path() for s in node_context.input["source"]]
if len(paths) == 0:
raise exceptions.SyDataError("Empty dataset")
except exceptions.NoDataError:
# This is if no input is connected
pass
ds = {
"dstype": "image",
"paths": paths,
"transforms": [],
"transforms_values": [],
"labels": [],
}
json = node_context.output["dataset"]
json.set_ds(ds)
json.save()
[docs]
class TableDataset(node.Node):
"""
Create and initialise a dataset from paths to tabular datasources.
"""
name = "Table Dataset (Experimental)"
nodeid = "com.sympathyfordata.advancedmachinelearning.tabdataset"
author = "Jannes Germishuys"
icon = "table_ds.svg"
tags = Tags(Tag.MachineLearning.IO)
nbr_of_rows = 99999
nbr_of_end_rows = 9999999
parameters = node.parameters()
# Init header row spinbox
parameters.set_integer(
"header_row",
value=1,
description="The row where the headers are located.",
editor=node.editors.bounded_spinbox_editor(1, nbr_of_rows, 1),
)
# Init unit row spinbox
parameters.set_integer(
"unit_row",
value=1,
description="The row where the units are located.",
editor=node.editors.bounded_spinbox_editor(1, nbr_of_rows, 1),
)
# Init description row spinbox
parameters.set_integer(
"description_row",
value=1,
description="The row where the descriptions are located.",
editor=node.editors.bounded_spinbox_editor(1, nbr_of_rows, 1),
)
# Init data start row spinbox
parameters.set_integer(
"data_start_row",
value=1,
description="The first row where data is stored.",
editor=node.editors.bounded_spinbox_editor(1, nbr_of_rows, 1),
)
# Init data end row spinbox
parameters.set_integer(
"data_end_row",
value=0,
description="The data rows.",
editor=node.editors.bounded_spinbox_editor(0, nbr_of_end_rows, 1),
)
# Init headers checkbox
parameters.set_boolean(
"headers", value=None, description="File has headers."
)
# Init units checkbox
parameters.set_boolean(
"units", value=False, description="File has headers."
)
# Init descriptions checkbox
parameters.set_boolean(
"descriptions", value=False, description="File has headers."
)
# Init transposed checkbox
parameters.set_boolean(
"transposed",
value=False,
label="Transpose input data",
description="Transpose the data.",
)
parameters.set_boolean(
"end_of_file",
value=True,
description="Select all rows to the end of the file.",
)
parameters.set_string(
"read_selection",
value="Read to the end of file",
description="Select how to read the data",
editor=node.editors.combo_editor(options=[
"Read to the end of file",
]),
)
# if not parameters["end_of_file"].value:
# parameters["read_selection"].value = "Read to the end of file"
parameters.set_string("delimiter", value=None)
parameters.set_string(
"other_delimiter",
value=None,
description="Enter other delimiter than the standard ones.",
)
parameters.set_integer(
"preview_start_row",
value=1,
label="Preview start row",
description="The first row where data will review from.",
editor=node.editors.bounded_spinbox_editor(1, 500, 1),
)
parameters.set_integer(
"no_preview_rows",
value=20,
label="Number of preview rows",
description="The number of preview rows to show.",
editor=node.editors.bounded_spinbox_editor(1, 200, 1),
)
parameters.set_string("source_coding", value=None)
parameters.set_boolean(
"double_quotations",
value=False,
label="Remove double quotations",
description="Remove double quotations when importing.",
)
parameters.set_string(
"exceptions",
label="How to handle failed import:",
description="Select method to handle eventual errors",
value="Raise Exceptions",
editor=node.editors.combo_editor(options=[
"Raise Exceptions",
"Partially read file",
"Read file without delimiters",
]),
)
def exec_parameter_view(self, node_context):
dummy_label = '_special-dummy-parameter'
def dummy_parameters(parameters):
# Convert the parameter structure to a dict and add dummy parameter
# required by TableImportWidgetCSV but missing in parameters.
res = {p: parameters[p] for p in parameters}
gui_visitor = WidgetBuildingVisitor()
skip_lines_with_all_na = ParameterBoolean(
label=dummy_label,
value=True, gui_visitor=gui_visitor)
res['skip_lines_with_all_na'] = skip_lines_with_all_na
return res
dspath = None
valid = True
try:
try:
datasource = node_context.input.first[0]
except IndexError:
datasource = None
dspath = datasource.decode_path()
except Exception:
# This is if no input is connected.
valid = False
# FIXME: dummy parameters for compat with TableImportWidgetCSV.
# This widget should not be based on the widget for a plugin.
dict_parameters = dummy_parameters(node_context.parameters)
widget = TableImportWidgetCSV(
dict_parameters, dspath, valid=valid)
# Hide the dummy widget.
for checkbox in widget.findChildren(QtWidgets.QCheckBox):
if checkbox.text() == dummy_label:
checkbox.setVisible(False)
return widget
inputs = Ports(
[
port.CustomPort(
"[datasource]",
"Sources of table data. Must be files on disk",
name="source",
)
]
)
outputs = Ports([DatasetPort("Dataset", "dataset")])
def execute(self, node_context):
try:
paths = [s.decode_path() for s in node_context.input["source"]]
if len(paths) == 0:
raise exceptions.SyDataError("Empty dataset")
except exceptions.NoDataError:
# This is if no input is connected
pass
parameters = node_context.parameters
# Reader configuration parameters
headers_bool = parameters["headers"].value
headers_row_offset = parameters["header_row"].value - 1
data_row_offset = parameters["data_start_row"].value - 1
read_selection = parameters["read_selection"].value
delimiter = parameters["delimiter"].value
encoding = parameters["source_coding"].value
data_end_rows = 0 # read entire file by default
# exceptions = parameters["exceptions"].value[0]
# if headers_bool:
# data_row_offset += 1
if encoding:
if not headers_bool:
headers_row_offset = -1
if read_selection == "Read to the end of file":
data_end_rows = 0
reader_config = {
"delimiter": delimiter,
"header": headers_row_offset if headers_bool else None,
"encoding": encoding,
"skip_blank_lines": False,
"skiprows": list(range(1, data_row_offset))
if data_row_offset > 1
else None,
"skipfooter": data_end_rows,
"nrows": 100,
"engine": "c",
"low_memory": True,
}
reader_config["on_bad_lines"] = "skip"
prefix = None if headers_bool else "X"
try:
df = pd.read_csv(paths[0], **reader_config)
prefix_rename(df, prefix)
except UnicodeDecodeError as exc:
raise exceptions.SyDataError(
"Invalid character encoding, please try another encoding "
"from the node configuration."
) from exc
reader_config['prefix'] = prefix
column_config = {
col_name: {
"orig": True,
"exclude": 0,
"dtype": str(np.array(df[col_name].tolist()).dtype),
"transforms": [],
"transforms_values": [],
}
for col_name in df.columns
}
# Increase number of rows post-preview
# TODO: Custom number of rows
del reader_config["nrows"]
ds = {
"dstype": "table",
"paths": paths,
"reader_config": reader_config,
"column_config": column_config,
}
json = node_context.output["dataset"]
json.set_ds(ds)
json.save()