Source code for sylib.library.plugins.data.table.importers.plugin_csv_importer

# This file is part of Sympathy for Data.
# Copyright (c) 2013, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
import os

from sympathy.api import node as synode
from sympathy.api import importers
from sympathy.api.exceptions import SyDataError
from sylib.table.table_source_csv import TableSourceCSV, ImporterCSV
from sylib.table.table_importer_csv_gui import TableImportWidgetCSV

_read_to_end = 'Read to the end of file'
_read_selected = 'Read specified number of rows'
_read_from_end = 'Read to specified number of rows from the end'
_read_options = [_read_to_end, _read_selected, _read_from_end]

_exception_raise = 'Raise Exceptions'
_exception_partial = 'Partially read file'
_exception_no_delimiter = 'Read file without delimiters'
_exception_options = [
    _exception_raise, _exception_partial, _exception_no_delimiter]


[docs]class DataImportCSV(importers.TableDataImporterBase): """Importer for CSV files.""" IMPORTER_NAME = "CSV" def __init__(self, fq_infilename, parameters): super().__init__(fq_infilename, parameters) if parameters is not None: self._init_parameters() def name(self): return self.IMPORTER_NAME def auto_priority(self): return 0 def _init_parameters(self): parameters = self._parameters nbr_of_rows = 99999 nbr_of_end_rows = 9999999 # Init header row spinbox if 'header_row' not in parameters: parameters.set_integer( 'header_row', value=1, description='The row where the headers are located.', editor=synode.editors.bounded_spinbox_editor( 1, nbr_of_rows, 1)) # Init unit row spinbox if 'unit_row' not in parameters: parameters.set_integer( 'unit_row', value=1, description='The row where the units are located.', editor=synode.editors.bounded_spinbox_editor( 1, nbr_of_rows, 1)) # Init description row spinbox if 'description_row' not in parameters: parameters.set_integer( 'description_row', value=1, description='The row where the descriptions are located.', editor=synode.editors.bounded_spinbox_editor( 1, nbr_of_rows, 1)) # Init data start row spinbox if 'data_start_row' not in parameters: parameters.set_integer( 'data_start_row', value=1, description='The first row where data is stored.', editor=synode.editors.bounded_spinbox_editor( 1, nbr_of_rows, 1)) # Init data end row spinbox if 'data_end_row' not in parameters: parameters.set_integer( 'data_end_row', value=0, description='The data rows.', editor=synode.editors.bounded_spinbox_editor( 0, nbr_of_end_rows, 1)) # Init headers checkbox if 'headers' not in parameters: parameters.set_boolean( 'headers', value=None, description='File has headers.') # Init units checkbox if 'units' not in parameters: parameters.set_boolean( 'units', value=False, description='File has headers.') # Init descriptions checkbox if 'descriptions' not in parameters: parameters.set_boolean( 'descriptions', value=False, description='File has headers.') # Init transposed checkbox if 'transposed' not in parameters: parameters.set_boolean( 'transposed', value=False, label='Transpose input data', description='Transpose the data.') if 'end_of_file' not in parameters: parameters.set_boolean( 'end_of_file', value=True, description='Select all rows to the end of the file.') if 'read_selection' not in parameters: parameters.set_list( 'read_selection', value=[0], list=_read_options, description='Select how to read the data', editor=synode.editors.combo_editor()) # Move value of old parameter to new the format. if not parameters['end_of_file'].value: parameters['read_selection'].value_names = [_read_from_end] if 'delimiter' not in parameters: parameters.set_string( 'delimiter', value=None) if 'other_delimiter' not in parameters: parameters.set_string( 'other_delimiter', value=None, description='Enter other delimiter than the standard ones.') if 'preview_start_row' not in parameters: parameters.set_integer( 'preview_start_row', value=1, label='Preview start row', description='The first row where data will review from.', editor=synode.editors.bounded_spinbox_editor( 1, 500, 1)) if 'no_preview_rows' not in parameters: parameters.set_integer( 'no_preview_rows', value=20, label='Number of preview rows', description='The number of preview rows to show.', editor=synode.editors.bounded_spinbox_editor(1, 200, 1)) if 'source_coding' not in parameters: parameters.set_string( 'source_coding', value=None) if 'double_quotations' not in parameters: parameters.set_boolean( 'double_quotations', value=False, label='Remove double quotations', description='Remove double quotations when importing.') if 'exceptions' not in parameters: parameters.set_list( 'exceptions', label='How to handle failed import:', description='Select method to handle eventual errors', list=_exception_options, value=[0], editor=synode.editors.combo_editor()) if 'skip_lines_with_all_na' not in parameters: parameters.set_boolean( 'skip_lines_with_all_na', value=True, label='Skip empty rows', description=( 'Skip rows with only empty or missing cells.' '\n\n' 'When checked (default), rows with only empty and missing ' 'cells are skipped (excluded from the output). ' '\n\n' 'When unchecked, rows with only empty or missing values ' 'are included in the output. The representation for ' 'missing values depends on the column type, for example, ' 'empty string, NaN or masked.')) def valid_for_file(self): """Return True if input file is a valid CSV file.""" if self._fq_infilename is None or not os.path.isfile( self._fq_infilename): return False return True def parameter_view(self, parameters): valid_for_file = self.valid_for_file() return TableImportWidgetCSV(parameters, self._fq_infilename, valid_for_file) def import_data(self, out_datafile, parameters=None, progress=None): """Import CSV data from a file""" parameters = parameters headers_bool = parameters['headers'].value units_bool = parameters['units'].value descriptions_bool = parameters['descriptions'].value headers_row_offset = parameters['header_row'].value - 1 units_row_offset = parameters['unit_row'].value - 1 descriptions_row_offset = parameters['description_row'].value - 1 data_row_offset = parameters['data_start_row'].value - 1 read_selection = parameters['read_selection'].selected data_rows = parameters['data_end_row'].value delimiter = parameters['delimiter'].value encoding = parameters['source_coding'].value exceptions = parameters['exceptions'].selected skip_lines_with_all_na = parameters['skip_lines_with_all_na'].value # Establish connection to csv datasource table_source = TableSourceCSV(self._fq_infilename) # Check if csv-file has a header (for auto import only) if headers_bool is None: headers_bool = table_source.has_header if headers_bool: data_row_offset += 1 # Check if delimiter and encoding have been modified in GUI if delimiter is not None: table_source.delimiter = delimiter if encoding is not None: table_source.encoding = encoding table_source.skip_lines_with_all_na = skip_lines_with_all_na if table_source.valid_encoding: if not headers_bool: headers_row_offset = -1 if not units_bool: units_row_offset = -1 if not descriptions_bool: descriptions_row_offset = -1 if read_selection == _read_to_end: nr_data_rows = -1 data_end_rows = 0 elif read_selection == _read_selected: nr_data_rows = data_rows data_end_rows = 0 elif read_selection == _read_from_end: nr_data_rows = 0 data_end_rows = data_rows else: raise ValueError('Unknown Read Selection.') importer = ImporterCSV(table_source) args = ( out_datafile, nr_data_rows, data_end_rows, data_row_offset, headers_row_offset, units_row_offset, descriptions_row_offset) try: if exceptions == _exception_partial: importer.set_partial_read(True) importer.import_csv(*args) except Exception as e: if exceptions == _exception_raise and isinstance( e, StopIteration): # Happens when a file is empty. In this case it makes # sense to create an empty table. return if exceptions == _exception_no_delimiter: importer.import_csv( *args, read_csv_full_rows=True) else: raise self.import_failed(e) else: raise SyDataError( 'No valid encoding could be determined for input file.')