# This file is part of Sympathy for Data.
# Copyright (c) 2016, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data. If not, see <http://www.gnu.org/licenses/>.
import os
from sympathy.api import node as synode
from sympathy.api import importers
from sympathy.api import qt2 as qt_compat
from sympathy.api import table
from sylib.table_importer_gui import (
TableImportWidget, TableImportController, offset_spinbox)
from sylib.table_sources import (
TableSourceModel, PreviewWorker, compute_end_row)
QtGui = qt_compat.import_module('QtGui')
QtCore = qt_compat.import_module('QtCore')
QtWidgets = qt_compat.import_module('QtWidgets')
ITER_LIMIT = 50
_read_to_end = 'Read to the end of file'
_read_selected = 'Read specified number of rows'
_read_from_end = 'Read to specified number of rows from the end'
_read_options = [_read_to_end, _read_selected, _read_from_end]
_exception_raise = 'Raise Exceptions'
_exception_partial = 'Partially read file'
_exception_no_delimiter = 'Read file without delimiters'
_exception_options = [
_exception_raise, _exception_partial, _exception_no_delimiter]
def _vjoin_list(out_table, table_list):
out_table.vjoin(table_list, fill=True, minimum_increment=0)
class ImporterMAT:
"""Importer class for data in mat format."""
def __init__(self, mat_source):
self._source = mat_source
self._partial_read = False
def set_partial_read(self, value):
self._partial_read = value
def import_mat(self, out_table, nr_data_rows, data_foot_rows,
data_row_offset, units_row_offset=-1,
require_num=True,
read_mat_full_rows=False):
self._discard = False
data_table = self._data_as_table(nr_data_rows, data_foot_rows,
data_row_offset, read_mat_full_rows)
out_table.update(data_table)
def _data_as_table(self, no_rows, foot_rows, offset_rows, full_rows):
"""
Merges the imported data, stored in one or many Tables, into a single
Table.
"""
out_table = table.File()
if full_rows:
table_list = self._data_as_tables_full_rows(no_rows, offset_rows)
else:
table_list = self._data_as_tables(no_rows, foot_rows, offset_rows)
if len(table_list) > 1:
_vjoin_list(out_table, table_list)
elif len(table_list) == 1:
out_table = table_list[0]
if foot_rows > 0 and not self._discard:
return out_table[:-foot_rows]
else:
return out_table
def _data_as_tables(self, no_rows, foot_rows, offset_rows, iter_count=0):
"""
Import data from mat-file in chunks. Each chunk is represented
by a Table in the list, table_list.
"""
table_list = []
iter_count += 1
if iter_count > ITER_LIMIT:
message = ('Process has ended because the number of calls '
'of the method "_data_as_tables" have passed the '
'allowed limit, {0}'.format(ITER_LIMIT))
raise Exception(message)
if no_rows < 0:
table_list.append(self._source.read(0, offset_rows, False))
elif no_rows >= 0:
table_list.append(self._source.read(no_rows, offset_rows, True))
else:
raise Exception('Not valid number of rows to read.')
return table_list
def _data_as_tables_full_rows(self, no_rows, offset_rows):
"""Import data from mat-file as whole rows."""
if no_rows == -1:
return self._source.read(-1, offset_rows, True)
elif no_rows > 0:
return [(self._source.read(no_rows, offset_rows, True))]
else:
raise Exception('Not valid number of rows to read.')
class TableSourceModelMAT(TableSourceModel):
"""Model layer between GUI and mat importer."""
get_preview = qt_compat.Signal(int, int, int, int, int, int)
def __init__(self, parameters, fq_infilename, mode, valid):
super().__init__(parameters, fq_infilename, mode)
self.data_table = None
self._valid = valid
self._mat_source = TableSourceMAT(fq_infilename)
self._importer = ImporterMAT(self._mat_source)
self._importer.set_partial_read(True)
self._init_model_specific_parameters()
self._init_preview_worker()
def _init_model_specific_parameters(self):
"""Init special parameters for mat importer."""
pass
def _init_preview_worker(self):
"""Collect preview data from mat file."""
def import_function(*args):
try:
self._importer.import_mat(*args, require_num=False)
except Exception:
self._importer.import_mat(*args, require_num=False,
read_mat_full_rows=True)
self._preview_thread = QtCore.QThread()
self._preview_worker = PreviewWorker(self._importer.import_mat)
self._preview_worker.moveToThread(self._preview_thread)
self._preview_thread.finished.connect(self._preview_worker.deleteLater)
self.get_preview.connect(self._preview_worker.create_preview_table)
self._preview_worker.preview_ready.connect(self.set_preview_table)
self._preview_worker.preview_failed.connect(self.set_preview_failed)
self._preview_thread.start()
self.collect_preview_values()
@qt_compat.Slot()
def collect_preview_values(self):
"""Collect preview data from mat file."""
no_rows = self.no_preview_rows.value
row_offset = self.data_offset.value - 1
if self._valid:
self.get_preview.emit(no_rows, 0, row_offset, -1, -1, -1)
else:
self.data_table = table.File()
self.update_table.emit()
@qt_compat.Slot(table.File)
def set_preview_table(self, data_table):
self.data_table = data_table
self.update_table.emit()
def cleanup(self):
self._preview_thread.quit()
self._preview_thread.wait()
class TableSourceMAT:
"""
This class is the layer between the physical mat-file and the import
routines.
"""
def __init__(self, fq_infilename):
self._fq_infilename = fq_infilename
def read(self, no_rows, offset, part):
from sylib.matlab import matlab
out_table = table.File()
mat_table = matlab.read_matfile_to_table(self._fq_infilename)
names = mat_table.column_names()
rows = mat_table.number_of_rows()
end_row = compute_end_row(offset, no_rows, rows, part)
for column in names:
data = mat_table.get_column_to_array(column)
out_table.set_column_from_array(column, data[offset:end_row])
out_table.set_attributes(mat_table.get_attributes())
out_table.set_name(mat_table.get_name())
return out_table
class ImportParametersWidgetMAT(QtWidgets.QWidget):
"""
The control group box includes the widgets for determination of
data start and end row/column and transpose condition.
"""
get_preview = QtCore.Signal()
def __init__(self, model, parent=None):
super().__init__(parent)
self._model = model
self._init_gui(model)
self._init_preview_signals()
def _init_gui(self, model):
layout = QtWidgets.QVBoxLayout()
layout.setAlignment(QtCore.Qt.AlignmentFlag.AlignLeft)
grid_layout = QtWidgets.QGridLayout()
grid_layout.setHorizontalSpacing(20)
grid_row = 0
# Data
self._data_offset_spinbox = offset_spinbox(model.data_offset, -1)
self._data_read_selection = model.data_read_selection.gui()
self._data_rows_spinbox = offset_spinbox(model.data_rows, -1)
grid_row += 1
data_start_label = QtWidgets.QLabel('Data start at row')
grid_layout.addWidget(data_start_label, grid_row, 1)
grid_layout.addWidget(self._data_offset_spinbox, grid_row, 2)
grid_layout.addWidget(self._data_read_selection, grid_row, 3)
grid_layout.addWidget(self._data_rows_spinbox, grid_row, 4)
layout.addLayout(grid_layout)
self.setLayout(layout)
def _init_preview_signals(self):
signals = [
self._data_offset_spinbox.valueChanged,
self._data_rows_spinbox.valueChanged]
for signal in signals:
signal.connect(self.get_preview)
class TableImportWidgetMAT(TableImportWidget):
MODE = 'MAT'
def __init__(self, parameters, fq_infilename, valid=True):
self.model = TableSourceModelMAT(
parameters, fq_infilename, self.MODE, valid)
super().__init__(
parameters, fq_infilename, self.MODE, valid)
def _collect_import_parameters_widget(self, model):
return ImportParametersWidgetMAT(model)
def _collect_table_source_widget(self, model):
pass
def _collect_controller(self, **kwargs):
return TableImportController(**kwargs)
[docs]class DataImportMAT(importers.TableDataImporterBase):
"""Importer for MAT files."""
IMPORTER_NAME = "MAT"
def __init__(self, fq_infilename, parameters):
super().__init__(fq_infilename, parameters)
if parameters is not None:
self._init_parameters()
def name(self):
return self.IMPORTER_NAME
def _init_parameters(self):
parameters = self._parameters
nbr_of_rows = 99999
nbr_of_end_rows = 9999999
# Init data start row spinbox
if 'data_start_row' not in parameters:
parameters.set_integer(
'data_start_row', value=1,
description='The first row where data is stored.',
editor=synode.editors.bounded_spinbox_editor(
1, nbr_of_rows, 1))
# Init data end row spinbox
if 'data_end_row' not in parameters:
parameters.set_integer(
'data_end_row', value=0,
description='The data rows.',
editor=synode.editors.bounded_spinbox_editor(
0, nbr_of_end_rows, 1))
if 'end_of_file' not in parameters:
parameters.set_boolean(
'end_of_file', value=True,
description='Select all rows to the end of the file.')
if 'read_selection' not in parameters:
parameters.set_list(
'read_selection', value=[0],
list=_read_options,
description='Select how to read the data',
editor=synode.editors.combo_editor())
# Move value of old parameter to new the format.
if not parameters['end_of_file'].value:
parameters['read_selection'].value = [2]
if 'preview_start_row' not in parameters:
parameters.set_integer(
'preview_start_row', value=1, label='Preview start row',
description='The first row where data will review from.',
editor=synode.editors.bounded_spinbox_editor(
1, 500, 1))
if 'no_preview_rows' not in parameters:
parameters.set_integer(
'no_preview_rows', value=20, label='Number of preview rows',
description='The number of preview rows to show.',
editor=synode.editors.bounded_spinbox_editor(1, 200, 1))
if 'exceptions' not in parameters:
parameters.set_list(
'exceptions', label='How to handle failed import:',
description='Select method to handle eventual errors',
list=_exception_options,
value=[0], editor=synode.editors.combo_editor())
def valid_for_file(self):
"""Return True if input file is a valid MAT file."""
from sylib.matlab import matlab
if self._fq_infilename is None:
return False
try:
matlab.read_matfile_to_table(self._fq_infilename)
except Exception:
return False
allowed_extensions = ['MAT', 'mat']
extension = os.path.splitext(self._fq_infilename)[1][1:]
return extension in allowed_extensions
def parameter_view(self, parameters):
valid_for_file = self.valid_for_file()
return TableImportWidgetMAT(parameters, self._fq_infilename,
valid_for_file)
def import_data(self, out_datafile, parameters=None, progress=None):
"""Import MAT data from a file"""
parameters = parameters
data_row_offset = parameters['data_start_row'].value - 1
read_selection = parameters['read_selection'].selected
data_rows = parameters['data_end_row'].value
exceptions = parameters['exceptions'].selected
# Establish connection to mat datasource
table_source = TableSourceMAT(self._fq_infilename)
if read_selection == _read_to_end:
nr_data_rows = -1
data_end_rows = 0
elif read_selection == _read_selected:
nr_data_rows = data_rows
data_end_rows = 0
elif read_selection == _read_from_end:
nr_data_rows = -1
data_end_rows = data_rows
else:
raise ValueError('Unknown Read Selection.')
importer = ImporterMAT(table_source)
try:
try:
if exceptions == _exception_partial:
importer.set_partial_read(True)
importer.import_mat(out_datafile, nr_data_rows, data_end_rows,
data_row_offset)
except Exception:
if exceptions == _exception_no_delimiter:
importer.import_mat(
out_datafile, nr_data_rows, data_end_rows,
data_row_offset, read_mat_full_rows=True)
else:
raise
except Exception as e:
raise self.import_failed(e)