# This file is part of Sympathy for Data.
# Copyright (c) 2013, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data. If not, see <http://www.gnu.org/licenses/>.
import os
from sympathy.api import node as synode
from sympathy.api import importers
from sympathy.api.exceptions import SyDataError
from sylib.table.table_source_csv import TableSourceCSV, ImporterCSV
from sylib.table.table_importer_csv_gui import TableImportWidgetCSV
_read_to_end = 'Read to the end of file'
_read_selected = 'Read specified number of rows'
_read_from_end = 'Read to specified number of rows from the end'
_read_options = [_read_to_end, _read_selected, _read_from_end]
_exception_raise = 'Raise Exceptions'
_exception_partial = 'Partially read file'
_exception_no_delimiter = 'Read file without delimiters'
_exception_options = [
_exception_raise, _exception_partial, _exception_no_delimiter]
[docs]class DataImportCSV(importers.TableDataImporterBase):
"""Importer for CSV files."""
IMPORTER_NAME = "CSV"
def __init__(self, fq_infilename, parameters):
super().__init__(fq_infilename, parameters)
if parameters is not None:
self._init_parameters()
def name(self):
return self.IMPORTER_NAME
def auto_priority(self):
return 0
def _init_parameters(self):
parameters = self._parameters
nbr_of_rows = 99999
nbr_of_end_rows = 9999999
# Init header row spinbox
if 'header_row' not in parameters:
parameters.set_integer(
'header_row', value=1,
description='The row where the headers are located.',
editor=synode.editors.bounded_spinbox_editor(
1, nbr_of_rows, 1))
# Init unit row spinbox
if 'unit_row' not in parameters:
parameters.set_integer(
'unit_row', value=1,
description='The row where the units are located.',
editor=synode.editors.bounded_spinbox_editor(
1, nbr_of_rows, 1))
# Init description row spinbox
if 'description_row' not in parameters:
parameters.set_integer(
'description_row', value=1,
description='The row where the descriptions are located.',
editor=synode.editors.bounded_spinbox_editor(
1, nbr_of_rows, 1))
# Init data start row spinbox
if 'data_start_row' not in parameters:
parameters.set_integer(
'data_start_row', value=1,
description='The first row where data is stored.',
editor=synode.editors.bounded_spinbox_editor(
1, nbr_of_rows, 1))
# Init data end row spinbox
if 'data_end_row' not in parameters:
parameters.set_integer(
'data_end_row', value=0,
description='The data rows.',
editor=synode.editors.bounded_spinbox_editor(
0, nbr_of_end_rows, 1))
# Init headers checkbox
if 'headers' not in parameters:
parameters.set_boolean(
'headers', value=None,
description='File has headers.')
# Init units checkbox
if 'units' not in parameters:
parameters.set_boolean(
'units', value=False,
description='File has headers.')
# Init descriptions checkbox
if 'descriptions' not in parameters:
parameters.set_boolean(
'descriptions', value=False,
description='File has headers.')
# Init transposed checkbox
if 'transposed' not in parameters:
parameters.set_boolean(
'transposed', value=False, label='Transpose input data',
description='Transpose the data.')
if 'end_of_file' not in parameters:
parameters.set_boolean(
'end_of_file', value=True,
description='Select all rows to the end of the file.')
if 'read_selection' not in parameters:
parameters.set_list(
'read_selection', value=[0],
list=_read_options,
description='Select how to read the data',
editor=synode.editors.combo_editor())
# Move value of old parameter to new the format.
if not parameters['end_of_file'].value:
parameters['read_selection'].value_names = [_read_from_end]
if 'delimiter' not in parameters:
parameters.set_string(
'delimiter',
value=None)
if 'other_delimiter' not in parameters:
parameters.set_string(
'other_delimiter',
value=None,
description='Enter other delimiter than the standard ones.')
if 'preview_start_row' not in parameters:
parameters.set_integer(
'preview_start_row', value=1, label='Preview start row',
description='The first row where data will review from.',
editor=synode.editors.bounded_spinbox_editor(
1, 500, 1))
if 'no_preview_rows' not in parameters:
parameters.set_integer(
'no_preview_rows', value=20, label='Number of preview rows',
description='The number of preview rows to show.',
editor=synode.editors.bounded_spinbox_editor(1, 200, 1))
if 'source_coding' not in parameters:
parameters.set_string(
'source_coding',
value=None)
if 'double_quotations' not in parameters:
parameters.set_boolean(
'double_quotations', value=False,
label='Remove double quotations',
description='Remove double quotations when importing.')
if 'exceptions' not in parameters:
parameters.set_list(
'exceptions', label='How to handle failed import:',
description='Select method to handle eventual errors',
list=_exception_options,
value=[0], editor=synode.editors.combo_editor())
if 'skip_lines_with_all_na' not in parameters:
parameters.set_boolean(
'skip_lines_with_all_na',
value=True,
label='Skip empty rows',
description=(
'Skip rows with only empty or missing cells.'
'\n\n'
'When checked (default), rows with only empty and missing '
'cells are skipped (excluded from the output). '
'\n\n'
'When unchecked, rows with only empty or missing values '
'are included in the output. The representation for '
'missing values depends on the column type, for example, '
'empty string, NaN or masked.'))
def valid_for_file(self):
"""Return True if input file is a valid CSV file."""
if self._fq_infilename is None or not os.path.isfile(
self._fq_infilename):
return False
return True
def parameter_view(self, parameters):
valid_for_file = self.valid_for_file()
return TableImportWidgetCSV(parameters, self._fq_infilename,
valid_for_file)
def import_data(self, out_datafile, parameters=None, progress=None):
"""Import CSV data from a file"""
parameters = parameters
headers_bool = parameters['headers'].value
units_bool = parameters['units'].value
descriptions_bool = parameters['descriptions'].value
headers_row_offset = parameters['header_row'].value - 1
units_row_offset = parameters['unit_row'].value - 1
descriptions_row_offset = parameters['description_row'].value - 1
data_row_offset = parameters['data_start_row'].value - 1
read_selection = parameters['read_selection'].selected
data_rows = parameters['data_end_row'].value
delimiter = parameters['delimiter'].value
encoding = parameters['source_coding'].value
exceptions = parameters['exceptions'].selected
skip_lines_with_all_na = parameters['skip_lines_with_all_na'].value
# Establish connection to csv datasource
table_source = TableSourceCSV(self._fq_infilename)
# Check if csv-file has a header (for auto import only)
if headers_bool is None:
headers_bool = table_source.has_header
if headers_bool:
data_row_offset += 1
# Check if delimiter and encoding have been modified in GUI
if delimiter is not None:
table_source.delimiter = delimiter
if encoding is not None:
table_source.encoding = encoding
table_source.skip_lines_with_all_na = skip_lines_with_all_na
if table_source.valid_encoding:
if not headers_bool:
headers_row_offset = -1
if not units_bool:
units_row_offset = -1
if not descriptions_bool:
descriptions_row_offset = -1
if read_selection == _read_to_end:
nr_data_rows = -1
data_end_rows = 0
elif read_selection == _read_selected:
nr_data_rows = data_rows
data_end_rows = 0
elif read_selection == _read_from_end:
nr_data_rows = 0
data_end_rows = data_rows
else:
raise ValueError('Unknown Read Selection.')
importer = ImporterCSV(table_source)
args = (
out_datafile, nr_data_rows, data_end_rows,
data_row_offset, headers_row_offset,
units_row_offset, descriptions_row_offset)
try:
if exceptions == _exception_partial:
importer.set_partial_read(True)
importer.import_csv(*args)
except Exception as e:
if exceptions == _exception_raise and isinstance(
e, StopIteration):
# Happens when a file is empty. In this case it makes
# sense to create an empty table.
return
if exceptions == _exception_no_delimiter:
importer.import_csv(
*args,
read_csv_full_rows=True)
else:
raise self.import_failed(e)
else:
raise SyDataError(
'No valid encoding could be determined for input file.')