Source code for sylib.library.plugins.data.table.importers.plugin_mat_importer

# This file is part of Sympathy for Data.
# Copyright (c) 2016, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
import os

from sympathy.api import node as synode
from sympathy.api import importers
from sympathy.api import qt2 as qt_compat
from sympathy.api import table
from sylib.table_importer_gui import (
    TableImportWidget, TableImportController, offset_spinbox)
from sylib.table_sources import (
    TableSourceModel, PreviewWorker, compute_end_row)

QtGui = qt_compat.import_module('QtGui')
QtCore = qt_compat.import_module('QtCore')
QtWidgets = qt_compat.import_module('QtWidgets')

ITER_LIMIT = 50

_read_to_end = 'Read to the end of file'
_read_selected = 'Read specified number of rows'
_read_from_end = 'Read to specified number of rows from the end'
_read_options = [_read_to_end, _read_selected, _read_from_end]

_exception_raise = 'Raise Exceptions'
_exception_partial = 'Partially read file'
_exception_no_delimiter = 'Read file without delimiters'
_exception_options = [
    _exception_raise, _exception_partial, _exception_no_delimiter]


def _vjoin_list(out_table, table_list):
    out_table.vjoin(table_list, fill=True, minimum_increment=0)


class ImporterMAT:
    """Importer class for data in mat format."""

    def __init__(self, mat_source):
        self._source = mat_source
        self._partial_read = False

    def set_partial_read(self, value):
        self._partial_read = value

    def import_mat(self, out_table, nr_data_rows, data_foot_rows,
                   data_row_offset, units_row_offset=-1,
                   require_num=True,
                   read_mat_full_rows=False):

        self._discard = False

        data_table = self._data_as_table(nr_data_rows, data_foot_rows,
                                         data_row_offset, read_mat_full_rows)

        out_table.update(data_table)

    def _data_as_table(self, no_rows, foot_rows, offset_rows, full_rows):
        """
        Merges the imported data, stored in one or many Tables, into a single
        Table.
        """
        out_table = table.File()

        if full_rows:
            table_list = self._data_as_tables_full_rows(no_rows, offset_rows)
        else:
            table_list = self._data_as_tables(no_rows, foot_rows, offset_rows)

        if len(table_list) > 1:
            _vjoin_list(out_table, table_list)
        elif len(table_list) == 1:
            out_table = table_list[0]

        if foot_rows > 0 and not self._discard:
            return out_table[:-foot_rows]
        else:
            return out_table

    def _data_as_tables(self, no_rows, foot_rows, offset_rows, iter_count=0):
        """
        Import data from mat-file in chunks. Each chunk is represented
        by a Table in the list, table_list.
        """
        table_list = []
        iter_count += 1

        if iter_count > ITER_LIMIT:
            message = ('Process has ended because the number of calls '
                       'of the method "_data_as_tables" have passed the '
                       'allowed limit, {0}'.format(ITER_LIMIT))
            raise Exception(message)

        if no_rows < 0:
            table_list.append(self._source.read(0, offset_rows, False))
        elif no_rows >= 0:
            table_list.append(self._source.read(no_rows, offset_rows, True))
        else:
            raise Exception('Not valid number of rows to read.')

        return table_list

    def _data_as_tables_full_rows(self, no_rows, offset_rows):
        """Import data from mat-file as whole rows."""
        if no_rows == -1:
            return self._source.read(-1, offset_rows, True)
        elif no_rows > 0:
            return [(self._source.read(no_rows, offset_rows, True))]
        else:
            raise Exception('Not valid number of rows to read.')


class TableSourceModelMAT(TableSourceModel):
    """Model layer between GUI and mat importer."""

    get_preview = qt_compat.Signal(int, int, int, int, int, int)

    def __init__(self, parameters, fq_infilename, mode, valid):
        super().__init__(parameters, fq_infilename, mode)
        self.data_table = None
        self._valid = valid
        self._mat_source = TableSourceMAT(fq_infilename)
        self._importer = ImporterMAT(self._mat_source)
        self._importer.set_partial_read(True)

        self._init_model_specific_parameters()
        self._init_preview_worker()

    def _init_model_specific_parameters(self):
        """Init special parameters for mat importer."""
        pass

    def _init_preview_worker(self):
        """Collect preview data from mat file."""
        def import_function(*args):
            try:
                self._importer.import_mat(*args, require_num=False)
            except Exception:
                self._importer.import_mat(*args, require_num=False,
                                          read_mat_full_rows=True)

        self._preview_thread = QtCore.QThread()
        self._preview_worker = PreviewWorker(self._importer.import_mat)
        self._preview_worker.moveToThread(self._preview_thread)
        self._preview_thread.finished.connect(self._preview_worker.deleteLater)
        self.get_preview.connect(self._preview_worker.create_preview_table)
        self._preview_worker.preview_ready.connect(self.set_preview_table)
        self._preview_worker.preview_failed.connect(self.set_preview_failed)
        self._preview_thread.start()
        self.collect_preview_values()

    @qt_compat.Slot()
    def collect_preview_values(self):
        """Collect preview data from mat file."""
        no_rows = self.no_preview_rows.value
        row_offset = self.data_offset.value - 1
        if self._valid:
            self.get_preview.emit(no_rows, 0, row_offset, -1, -1, -1)
        else:
            self.data_table = table.File()
        self.update_table.emit()

    @qt_compat.Slot(table.File)
    def set_preview_table(self, data_table):
        self.data_table = data_table
        self.update_table.emit()

    def cleanup(self):
        self._preview_thread.quit()
        self._preview_thread.wait()


class TableSourceMAT:
    """
    This class is the layer between the physical mat-file and the import
    routines.
    """

    def __init__(self, fq_infilename):
        self._fq_infilename = fq_infilename

    def read(self, no_rows, offset, part):
        from sylib.matlab import matlab
        out_table = table.File()
        mat_table = matlab.read_matfile_to_table(self._fq_infilename)
        names = mat_table.column_names()
        rows = mat_table.number_of_rows()

        end_row = compute_end_row(offset, no_rows, rows, part)

        for column in names:
            data = mat_table.get_column_to_array(column)
            out_table.set_column_from_array(column, data[offset:end_row])

        out_table.set_attributes(mat_table.get_attributes())
        out_table.set_name(mat_table.get_name())
        return out_table


class ImportParametersWidgetMAT(QtWidgets.QWidget):
    """
    The control group box includes the widgets for determination of
    data start and end row/column and transpose condition.
    """

    get_preview = QtCore.Signal()

    def __init__(self, model, parent=None):
        super().__init__(parent)
        self._model = model
        self._init_gui(model)
        self._init_preview_signals()

    def _init_gui(self, model):
        layout = QtWidgets.QVBoxLayout()
        layout.setAlignment(QtCore.Qt.AlignmentFlag.AlignLeft)

        grid_layout = QtWidgets.QGridLayout()
        grid_layout.setHorizontalSpacing(20)
        grid_row = 0

        # Data
        self._data_offset_spinbox = offset_spinbox(model.data_offset, -1)

        self._data_read_selection = model.data_read_selection.gui()
        self._data_rows_spinbox = offset_spinbox(model.data_rows, -1)

        grid_row += 1
        data_start_label = QtWidgets.QLabel('Data start at row')

        grid_layout.addWidget(data_start_label, grid_row, 1)
        grid_layout.addWidget(self._data_offset_spinbox, grid_row, 2)
        grid_layout.addWidget(self._data_read_selection, grid_row, 3)
        grid_layout.addWidget(self._data_rows_spinbox, grid_row, 4)

        layout.addLayout(grid_layout)

        self.setLayout(layout)

    def _init_preview_signals(self):
        signals = [
            self._data_offset_spinbox.valueChanged,
            self._data_rows_spinbox.valueChanged]
        for signal in signals:
            signal.connect(self.get_preview)


class TableImportWidgetMAT(TableImportWidget):
    MODE = 'MAT'

    def __init__(self, parameters, fq_infilename, valid=True):
        self.model = TableSourceModelMAT(
            parameters, fq_infilename, self.MODE, valid)
        super().__init__(
            parameters, fq_infilename, self.MODE, valid)

    def _collect_import_parameters_widget(self, model):
        return ImportParametersWidgetMAT(model)

    def _collect_table_source_widget(self, model):
        pass

    def _collect_controller(self, **kwargs):
        return TableImportController(**kwargs)


[docs]class DataImportMAT(importers.TableDataImporterBase): """Importer for MAT files.""" IMPORTER_NAME = "MAT" def __init__(self, fq_infilename, parameters): super().__init__(fq_infilename, parameters) if parameters is not None: self._init_parameters() def name(self): return self.IMPORTER_NAME def _init_parameters(self): parameters = self._parameters nbr_of_rows = 99999 nbr_of_end_rows = 9999999 # Init data start row spinbox if 'data_start_row' not in parameters: parameters.set_integer( 'data_start_row', value=1, description='The first row where data is stored.', editor=synode.editors.bounded_spinbox_editor( 1, nbr_of_rows, 1)) # Init data end row spinbox if 'data_end_row' not in parameters: parameters.set_integer( 'data_end_row', value=0, description='The data rows.', editor=synode.editors.bounded_spinbox_editor( 0, nbr_of_end_rows, 1)) if 'end_of_file' not in parameters: parameters.set_boolean( 'end_of_file', value=True, description='Select all rows to the end of the file.') if 'read_selection' not in parameters: parameters.set_list( 'read_selection', value=[0], list=_read_options, description='Select how to read the data', editor=synode.editors.combo_editor()) # Move value of old parameter to new the format. if not parameters['end_of_file'].value: parameters['read_selection'].value = [2] if 'preview_start_row' not in parameters: parameters.set_integer( 'preview_start_row', value=1, label='Preview start row', description='The first row where data will review from.', editor=synode.editors.bounded_spinbox_editor( 1, 500, 1)) if 'no_preview_rows' not in parameters: parameters.set_integer( 'no_preview_rows', value=20, label='Number of preview rows', description='The number of preview rows to show.', editor=synode.editors.bounded_spinbox_editor(1, 200, 1)) if 'exceptions' not in parameters: parameters.set_list( 'exceptions', label='How to handle failed import:', description='Select method to handle eventual errors', list=_exception_options, value=[0], editor=synode.editors.combo_editor()) def valid_for_file(self): """Return True if input file is a valid MAT file.""" from sylib.matlab import matlab if self._fq_infilename is None: return False try: matlab.read_matfile_to_table(self._fq_infilename) except Exception: return False allowed_extensions = ['MAT', 'mat'] extension = os.path.splitext(self._fq_infilename)[1][1:] return extension in allowed_extensions def parameter_view(self, parameters): valid_for_file = self.valid_for_file() return TableImportWidgetMAT(parameters, self._fq_infilename, valid_for_file) def import_data(self, out_datafile, parameters=None, progress=None): """Import MAT data from a file""" parameters = parameters data_row_offset = parameters['data_start_row'].value - 1 read_selection = parameters['read_selection'].selected data_rows = parameters['data_end_row'].value exceptions = parameters['exceptions'].selected # Establish connection to mat datasource table_source = TableSourceMAT(self._fq_infilename) if read_selection == _read_to_end: nr_data_rows = -1 data_end_rows = 0 elif read_selection == _read_selected: nr_data_rows = data_rows data_end_rows = 0 elif read_selection == _read_from_end: nr_data_rows = -1 data_end_rows = data_rows else: raise ValueError('Unknown Read Selection.') importer = ImporterMAT(table_source) try: try: if exceptions == _exception_partial: importer.set_partial_read(True) importer.import_mat(out_datafile, nr_data_rows, data_end_rows, data_row_offset) except Exception: if exceptions == _exception_no_delimiter: importer.import_mat( out_datafile, nr_data_rows, data_end_rows, data_row_offset, read_mat_full_rows=True) else: raise except Exception as e: raise self.import_failed(e)