Source code for sylib.library.plugins.data.table.importers.plugin_parquet_importer

# This file is part of Sympathy for Data.
# Copyright (c) 2022, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
from sylib.table_importer_gui import (PreviewGroupBoxWidget, TableImportWidget,
                                      TableImportController)
from sylib.table_sources import TableSourceModel, PreviewWorker
import sylib.table_sources as table_sources

from sympathy.api import node as synode
from sympathy.api import importers
from sympathy.api import table
from sympathy.api import qt2 as qt_compat

QtGui = qt_compat.import_module('QtGui')
QtCore = qt_compat.QtCore
QtWidgets = qt_compat.import_module('QtWidgets')


class ImporterParquet:
    """Importer class for data in parquet format."""

    def __init__(self, pq_source):
        self._source = pq_source
        self._partial_read = False

    def set_partial_read(self, value):
        self._partial_read = value

    def import_parquet(self, out_table, nr_data_rows):
        self._discard = False
        data_table = self._data_as_table(nr_data_rows)
        out_table.update(data_table)

    def _data_as_table(self, no_rows):
        """
        Merges the imported data, stored in one or many Tables, into a single
        Table.
        """
        out_table = self._data_as_table_full_rows(no_rows)
        return out_table

    def _data_as_table_full_rows(self, no_rows):
        """Import data from mat-file as whole rows."""
        if no_rows == -1:
            return self._source.read(-1, True)
        elif no_rows > 0:
            return self._source.read(no_rows, True)
        else:
            raise Exception('Not valid number of rows to read.')


class TableSourceParquet:
    """
    This class is the layer between the physical parquet-file and the import
    routines.
    """

    def __init__(self, fq_infilename):
        self._fq_infilename = fq_infilename

    def read(self, no_rows, part):
        from sylib.parquet import parquet
        out_table = table.File()
        pq_table = parquet.read_pqfile_to_table(self._fq_infilename)
        names = pq_table.column_names()
        rows = pq_table.number_of_rows()

        end_row = table_sources.compute_end_row(0, no_rows, rows, part)

        for column in names:
            data = pq_table.get_column_to_array(column)
            if end_row == -1:
                data = data[:]
            else:
                data = data[:end_row]
            out_table.set_column_from_array(column, data)

        out_table.set_attributes(pq_table.get_attributes())
        out_table.set_name(pq_table.get_name())
        return out_table


class TableSourceModelParquet(TableSourceModel):
    """Model layer between GUI and parquet importer."""

    get_preview = qt_compat.Signal(int)

    def __init__(self, parameters, fq_infilename, mode, valid):
        super().__init__(
            parameters, fq_infilename, mode)
        self.data_table = None
        self._valid = valid
        self._pq_source = TableSourceParquet(fq_infilename)
        self._importer = ImporterParquet(self._pq_source)
        self._importer.set_partial_read(True)

        self._init_model_specific_parameters()
        self._init_preview_worker()

    def _init_model_common_parameters(self):
        """Init common parameters xlsx, csv, mat and parquet importers."""
        self.preview_start_row = self._parameters['preview_start_row']
        self.no_preview_rows = self._parameters['no_preview_rows']

    def _init_model_specific_parameters(self):
        """Init special parameters for parquet importer."""
        self.preview_start_row = self._parameters['preview_start_row']
        self.no_preview_rows = self._parameters['no_preview_rows']
        self.data_offset = lambda: None
        self.data_offset.value = 1

    def _init_preview_worker(self):
        """Collect preview data from parquet file."""
        def import_function(*args):
            self._importer.import_parquet(*args)

        self._preview_thread = QtCore.QThread()
        self._preview_worker = PreviewWorker(self._importer.import_parquet)
        self._preview_worker.moveToThread(self._preview_thread)
        self._preview_thread.finished.connect(self._preview_worker.deleteLater)
        self.get_preview.connect(self._preview_worker.create_preview_table)
        self._preview_worker.preview_ready.connect(self.set_preview_table)
        self._preview_worker.preview_failed.connect(self.set_preview_failed)
        self._preview_thread.start()
        self.collect_preview_values()

    @qt_compat.Slot()
    def collect_preview_values(self):
        """Collect preview data from parquet file."""
        no_rows = self.no_preview_rows.value
        if self._valid:
            self.get_preview.emit(no_rows)
        else:
            self.data_table = table.File()
        self.update_table.emit()

    @qt_compat.Slot(table.File)
    def set_preview_table(self, data_table):
        self.data_table = data_table
        self.update_table.emit()

    def cleanup(self):
        self._preview_thread.quit()
        self._preview_thread.wait()


class ImportParametersWidgetParquet(QtWidgets.QWidget):
    """
    The control group box includes the widgets for determination of
    data start and end row/column and transpose condition.
    """

    get_preview = QtCore.Signal()

    def __init__(self, model, parent=None):
        super().__init__(parent)
        self._model = model
        self._init_gui(model)
        self._init_preview_signals()

    def _init_gui(self, model):
        pass

    def _init_preview_signals(self):
        pass


class TableImportWidgetParquet(TableImportWidget):
    MODE = 'Parquet'

    def __init__(self, parameters, fq_infilename, valid=True):
        self.model = TableSourceModelParquet(
            parameters, fq_infilename, self.MODE, valid)
        super().__init__(
            parameters, fq_infilename, self.MODE, valid)

    def _init_gui(self, model, mode, valid):
        """Initialize GUI structure."""
        layout = QtWidgets.QVBoxLayout()

        import_parameters = self._collect_import_parameters_widget(
            model)

        table_source = self._collect_table_source_widget(model)

        preview_table = PreviewGroupBoxWidget(model)

        layout.addWidget(preview_table)

        self.controller = self._collect_controller(
            model=model,
            table_source=table_source,
            import_param=import_parameters,
            preview_table=preview_table, valid=valid)

        self.setLayout(layout)
        self.adjustSize()

    def _collect_import_parameters_widget(self, model):
        return ImportParametersWidgetParquet(model)

    def _collect_table_source_widget(self, model):
        pass

    def _collect_controller(self, **kwargs):
        return TableImportController(**kwargs)


[docs]class DataImportParquet(importers.TableDataImporterBase): """Importer for Parquet files.""" IMPORTER_NAME = "Parquet" def __init__(self, fq_infilename, parameters): super().__init__(fq_infilename, parameters) if parameters is not None: if 'preview_start_row' not in parameters: parameters.set_integer( 'preview_start_row', value=1, label='Preview start row', description='The first row where data will review from.', editor=synode.editors.bounded_spinbox_editor( 1, 500, 1)) if 'no_preview_rows' not in parameters: parameters.set_integer( 'no_preview_rows', value=20, label='Number of preview rows', description='The number of preview rows to show.', editor=synode.editors.bounded_spinbox_editor(1, 200, 1)) def name(self): return self.IMPORTER_NAME def valid_for_file(self): """Return True if input file is a valid Parquet file.""" from sylib.parquet import parquet return parquet.is_parquet_file(self._fq_infilename) def parameter_view(self, parameters): valid_for_file = self.valid_for_file() return TableImportWidgetParquet(parameters, self._fq_infilename, valid_for_file) def import_data(self, out_datafile, parameters=None, progress=None): """Import Parquet data from a file""" parameters = parameters # Establish connection to Parquet datasource table_source = TableSourceParquet(self._fq_infilename) importer = ImporterParquet(table_source) try: importer.import_parquet(out_datafile, -1) except Exception as e: raise self.import_failed(e)