Source code for plugin_csv_importer

# Copyright (c) 2013, System Engineering Software Society
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the System Engineering Software Society nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.
# IN NO EVENT SHALL SYSTEM ENGINEERING SOFTWARE SOCIETY BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from __future__ import (print_function, division, unicode_literals,
                        absolute_import)
import os
from sylib.table_importer_gui import TableImportWidgetCSV
from sylib.table_sources import ImporterCSV, TableSourceCSV
from sympathy.api import qt2 as qt_compat
from sympathy.api import node as synode
from sympathy.api import importers
from sympathy.api.exceptions import SyDataError
QtGui = qt_compat.import_module('QtGui')


[docs]class DataImportCSV(importers.TableDataImporterBase): """Importer for CSV files.""" IMPORTER_NAME = "CSV" def __init__(self, fq_infilename, parameters): super(DataImportCSV, self).__init__(fq_infilename, parameters) if parameters is not None: self._init_parameters() def name(self): return self.IMPORTER_NAME def _init_parameters(self): parameters = self._parameters nbr_of_rows = 99999 nbr_of_end_rows = 9999999 # Init header row spinbox if 'header_row' not in parameters: parameters.set_integer( 'header_row', value=1, description='The row where the headers are located.', editor=synode.Util.bounded_spinbox_editor( 1, nbr_of_rows, 1)) # Init unit row spinbox if 'unit_row' not in parameters: parameters.set_integer( 'unit_row', value=1, description='The row where the units are located.', editor=synode.Util.bounded_spinbox_editor( 1, nbr_of_rows, 1)) # Init description row spinbox if 'description_row' not in parameters: parameters.set_integer( 'description_row', value=1, description='The row where the descriptions are located.', editor=synode.Util.bounded_spinbox_editor( 1, nbr_of_rows, 1)) # Init data start row spinbox if 'data_start_row' not in parameters: parameters.set_integer( 'data_start_row', value=1, description='The first row where data is stored.', editor=synode.Util.bounded_spinbox_editor( 1, nbr_of_rows, 1)) # Init data end row spinbox if 'data_end_row' not in parameters: parameters.set_integer( 'data_end_row', value=0, description='The data rows.', editor=synode.Util.bounded_spinbox_editor( 0, nbr_of_end_rows, 1)) # Init headers checkbox if 'headers' not in parameters: parameters.set_boolean( 'headers', value=None, description='File has headers.') # Init units checkbox if 'units' not in parameters: parameters.set_boolean( 'units', value=False, description='File has headers.') # Init descriptions checkbox if 'descriptions' not in parameters: parameters.set_boolean( 'descriptions', value=False, description='File has headers.') # Init transposed checkbox if 'transposed' not in parameters: parameters.set_boolean( 'transposed', value=False, label='Transpose input data', description='Transpose the data.') if 'end_of_file' not in parameters: parameters.set_boolean( 'end_of_file', value=True, description='Select all rows to the end of the file.') if 'read_selection' not in parameters: parameters.set_list( 'read_selection', value=[0], plist=['Read to the end of file', 'Read specified number of rows', 'Read to specified number of rows from the end'], description='Select how to read the data', editor=synode.Util.combo_editor()) # Move value of old parameter to new the format. if not parameters['end_of_file'].value: parameters['read_selection'].value = [2] if 'delimiter' not in parameters: parameters.set_string( 'delimiter', value=None) if 'other_delimiter' not in parameters: parameters.set_string( 'other_delimiter', value=None, description='Enter other delimiter than the standard ones.') if 'preview_start_row' not in parameters: parameters.set_integer( 'preview_start_row', value=1, label='Preview start row', description='The first row where data will review from.', editor=synode.Util.bounded_spinbox_editor( 1, 500, 1)) if 'no_preview_rows' not in parameters: parameters.set_integer( 'no_preview_rows', value=20, label='Number of preview rows', description='The number of preview rows to show.', editor=synode.Util.bounded_spinbox_editor(1, 200, 1)) if 'source_coding' not in parameters: parameters.set_string( 'source_coding', value=None) if 'double_quotations' not in parameters: parameters.set_boolean( 'double_quotations', value=False, label='Remove double quotations', description='Remove double quotations when importing.') if 'exceptions' not in parameters: parameters.set_list( 'exceptions', label='How to handle failed import:', description='Select method to handle eventual errors', plist=['Raise Exceptions', 'Partially read file', 'Read file without delimiters'], value=[0], editor=synode.Util.combo_editor()) def valid_for_file(self): """Return True if input file is a valid CSV file.""" if self._fq_infilename is None or not os.path.isfile( self._fq_infilename): return False not_allowed_extensions = ['xls', 'xlsx', 'h5', 'sydata', 'mat'] extension = os.path.splitext(self._fq_infilename)[1][1:] if extension in not_allowed_extensions: return False else: return True def parameter_view(self, parameters): valid_for_file = self.valid_for_file() return TableImportWidgetCSV(parameters, self._fq_infilename, valid_for_file) def import_data(self, out_datafile, parameters=None, progress=None): """Import CSV data from a file""" parameters = parameters headers_bool = parameters['headers'].value units_bool = parameters['units'].value descriptions_bool = parameters['descriptions'].value headers_row_offset = parameters['header_row'].value - 1 units_row_offset = parameters['unit_row'].value - 1 descriptions_row_offset = parameters['description_row'].value - 1 data_row_offset = parameters['data_start_row'].value - 1 read_selection = parameters['read_selection'].value[0] data_rows = parameters['data_end_row'].value delimiter = parameters['delimiter'].value encoding = parameters['source_coding'].value exceptions = parameters['exceptions'].value[0] # Establish connection to csv datasource table_source = TableSourceCSV(self._fq_infilename) # Check if csv-file has a header (for auto import only) if headers_bool is None: headers_bool = table_source.has_header if headers_bool: data_row_offset += 1 # Check if delimiter and encoding have been modified in GUI if delimiter is not None: table_source.delimiter = delimiter if encoding is not None: table_source.encoding = encoding if table_source.valid_encoding: if not headers_bool: headers_row_offset = -1 if not units_bool: units_row_offset = -1 if not descriptions_bool: descriptions_row_offset = -1 if read_selection == 0: nr_data_rows = -1 data_end_rows = 0 elif read_selection == 1: nr_data_rows = data_rows data_end_rows = 0 elif read_selection == 2: nr_data_rows = 0 data_end_rows = data_rows else: raise ValueError('Unknown Read Selection.') importer = ImporterCSV(table_source) try: if exceptions == 1: importer.set_partial_read(True) importer.import_csv( out_datafile, nr_data_rows, data_end_rows, data_row_offset, headers_row_offset, units_row_offset, descriptions_row_offset) except Exception as e: if exceptions == 0 and isinstance(e, StopIteration): # Happens when a file is empty. In this case it makes # sense to create an empty table. return if exceptions == 2: importer.import_csv( out_datafile, nr_data_rows, data_end_rows, data_row_offset, headers_row_offset, units_row_offset, descriptions_row_offset, read_csv_full_rows=True) else: raise else: raise SyDataError( 'No valid encoding could be determined for input file.')