# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data. If not, see <http://www.gnu.org/licenses/>.
import re
import os
import errno
import shutil
import datetime
import tempfile
import unicodedata
from collections import defaultdict
from packaging import version as _version
import numpy as np
from sylib.export import table as exporttable
from sympathy.api import exceptions
from sympathy.api import qt2 as qt_compat
from sympathy.api.nodeconfig import settings
import sympathy.api
def _openpyxl():
import openpyxl
import openpyxl.styles
import openpyxl.cell
return openpyxl
QtGui = qt_compat.import_module('QtGui')
QtWidgets = qt_compat.import_module('QtWidgets')
def _col2ai(i):
"""
Return corresponding A, B, .. AB notation for column number i.
"""
chars = []
c = i % 26
chars.append(c)
i = (i - c) // 26
while i > 0:
c = (i - 1) % 26
chars.append(c)
i = (i - c) // 26
return ''.join([chr(ord('A') + c_) for c_ in reversed(chars)])
def _range_name(column_name):
"""
Convert column name to valid name for use in workbook named range.
crng_ + converted compatible name.
"""
if isinstance(column_name, bytes):
column_name = column_name.decode('ascii', errors='ignore')
return 'crng_{}'.format(
re.sub('[^a-zA-Z0-9]', '_', unicodedata.normalize('NFD', column_name)))
def _valid_cell(cell_cls, worksheet, row, column):
try:
cell = cell_cls(
worksheet, row=row + 1, column=column + 1)
cell.coordinate
except Exception:
return False
return True
def _get_local_worksheet_names(workbook):
local_worksheet_names = []
for worksheet in workbook.worksheets:
index = workbook.index(worksheet)
local_worksheet_names.append((
worksheet,
[workbook.defined_names.get(n, scope=index)
for n in workbook.defined_names.localnames(
scope=index)]))
return local_worksheet_names
def _remove_local_worksheet_names(workbook, local_worksheet_names):
for worksheet, local_names in local_worksheet_names:
index = workbook.index(worksheet)
for local_name in local_names:
workbook.defined_names.delete(local_name.name, scope=index)
def _restore_local_worksheet_names(workbook, local_worksheet_names):
worksheets = set(workbook.worksheets)
for worksheet, local_names in local_worksheet_names:
if worksheet in worksheets:
index = workbook.index(worksheet)
for local_name in local_names:
local_name.localSheetId = index
workbook.defined_names.append(local_name)
# Possible user controlled option, allow settings to false to discard
# data that is out of exportable range.
_fail_out_of_range = True
def create_filled_sheet(workbook, sheet_name, input_table, header):
openpyxl = _openpyxl()
column_names = input_table.column_names()
worksheet = workbook.create_sheet(sheet_name)
row_limit = 1048576 # 2 ** 20
column_limit = 16384 # 2 ** 14, XFD
column_letter_limit = 18278 # openpyxl limit ZZZ
# Write data columns.
start_row = 1 if header else 0
end_row = input_table.number_of_rows() + start_row
end_column = len(column_names)
cell_cls = openpyxl.cell.Cell
if end_row > row_limit or end_column > column_limit:
if _fail_out_of_range:
if not _valid_cell(
cell_cls, worksheet, end_row - 1, end_column - 1):
raise exceptions.SyDataError(
f'Data range exceeds limits defined by the '
f'format: {row_limit} rows x {column_limit} columns.')
exceptions.sywarn(
f'Data range exceeds limits defined by the '
f'format: {row_limit} rows x {column_limit} columns. '
f'Cells out of range may be ignored.')
# Find a column limit that would succeed.
for column_limit in [column_letter_limit, column_limit]:
if end_column > column_limit:
if not _valid_cell(cell_cls, worksheet, 0, end_column - 1):
end_column = column_limit
column_names = column_names[:end_column]
columns = {key: i for i, key in
enumerate(column_names)}
if header:
bold = openpyxl.styles.Font(bold=True)
columns = []
for column_name in column_names:
cell = openpyxl.cell.WriteOnlyCell(
worksheet, value=column_name)
cell.font = bold
columns.append(cell)
worksheet.append(columns)
warn_about_datetimes = False
warn_about_timedeltas = False
output_table = sympathy.api.table.File()
def mask_unwritable(arr, data, unwritable):
if np.any(unwritable):
mask = np.ma.getmaskarray(arr)
if np.any((~mask) * unwritable):
return True, np.ma.masked_array(
data, mask | unwritable)
return False, None
def unique_column_name_range(column_names):
# Make names unique to avoid exception when redefining name.
# Duplicate range names are be enumerated: X, X2, X3, etc.
column_ranges = defaultdict(list)
unique_range_names = dict()
res = {}
for column_name in column_names:
column_ranges[_range_name(column_name)].append(column_name)
for range_name, column_names in column_ranges.items():
count = 1
for column_name in column_names:
range_name_curr = range_name
while range_name_curr in unique_range_names:
count += 1
range_name_curr = f'{range_name}{count}'
unique_range_names[range_name_curr] = column_name
res[column_name] = range_name_curr
return res
column_name_range = unique_column_name_range(column_names)
for i, column_name in enumerate(column_names):
kind = input_table.column_type(column_name).kind
arr = input_table[column_name]
data = np.ma.getdata(arr)
new_col = None
workbook.create_named_range(
name=column_name_range[column_name],
worksheet=worksheet,
value="${col}${srow}:${col}${erow}".format(
col=_col2ai(i),
srow=start_row + 1,
erow=end_row),
scope=workbook.index(worksheet))
if kind == 'M':
# All dates before 1900-03-01 are ambiguous because of
# a bug in Excel which incorrectly treats the year 1900
# as a leap year. So don't write any such dates to
# file.
unwritable = data < datetime.datetime(1900, 3, 1)
warn, new_col = mask_unwritable(
arr,
data,
unwritable)
warn_about_datetimes |= warn
elif kind == 'm':
# Timedeltas can't be written to excel in general, but
# if the delta is less than a day, it can be
# represented as a time.
warn, new_col = mask_unwritable(
arr,
data,
data >= datetime.timedelta(days=1))
warn_about_timedeltas |= warn
if new_col is None:
output_table.update_column(column_name, input_table)
else:
output_table[column_name] = new_col
for row in output_table.to_rows():
worksheet.append(row)
# Give a warning about problematic datetimes and/or timedeltas
if warn_about_datetimes:
exceptions.sywarn(
"Not writing any datetimes before 1900-03-01. All dates "
"before 1900-03-01 are ambiguous because of a bug in Excel.")
if warn_about_timedeltas:
exceptions.sywarn(
"Not writing any timedeltas bigger than or equal to one full "
"day. Such timedeltas can't be expressed in Excel.")
class DataExportXLSXWidget(QtWidgets.QWidget):
filename_changed = qt_compat.Signal()
def __init__(self, parameter_root, input_list):
super().__init__()
self._parameter_root = parameter_root
self._input_list = input_list
self._init_gui()
def _init_gui(self):
vlayout = QtWidgets.QVBoxLayout()
self._to_sheets = self._parameter_root['to_sheets']
self._to_sheets_check_box = self._to_sheets.gui()
self._to_sheets_check_box.valueChanged.connect(
self._filename_changed)
self._update_sheets = self._parameter_root['update_sheets']
self._update_sheets_check_box = self._update_sheets.gui()
self._update_sheets_check_box.valueChanged.connect(
self._filename_changed)
self._update_sheets_check_box.setEnabled(self._to_sheets.value)
self._table_names = self._parameter_root['table_names']
self.table_names_gui = self._parameter_root['table_names'].gui()
self.table_names_gui.valueChanged.connect(self._filename_changed)
self.table_names_gui.setEnabled(not self._to_sheets.value)
vlayout.addWidget(self.table_names_gui)
vlayout.addWidget(self._to_sheets_check_box)
vlayout.addWidget(self._update_sheets_check_box)
vlayout.addWidget(self._parameter_root['header'].gui())
self._to_sheets_check_box.stateChanged[int].connect(
self._to_sheets_state_changed)
self.setLayout(vlayout)
@qt_compat.Slot(int)
def _to_sheets_state_changed(self, value):
if self._to_sheets.value:
self._table_names.value = False
self.table_names_gui.setEnabled(False)
self._update_sheets_check_box.setEnabled(True)
else:
self.table_names_gui.setEnabled(True)
self._update_sheets_check_box.setEnabled(False)
def _filename_changed(self):
self.filename_changed.emit()
[docs]class DataExportXLSX(exporttable.TableDataExporterBase):
"""Exporter for XLSX files."""
EXPORTER_NAME = 'XLSX'
FILENAME_EXTENSION = 'xlsx'
DEFAULT_XLSX_NAME = 'xlsx_filename'
def __init__(self, parameters):
super().__init__(parameters)
if 'header' not in parameters:
parameters.set_boolean(
'header', value=True, label='Export header',
description='Export column names')
if 'to_sheets' not in parameters:
parameters.set_boolean(
'to_sheets', label='Export data to sheets',
description=(
'Select to export incoming tables to one file with '
'multiple sheets instead of multiple files with one '
'sheet.'))
if 'update_sheets' not in parameters:
parameters.set_boolean(
'update_sheets', label='Replace existing sheets',
description=(
'Export data to sheets in an existing file, creating the '
'file if it does not exist. Existing sheets are replaced '
'by input tables when table name == sheet name, and are '
'otherwise preserved.\n\n'
'Note that the existing file is re-written and preserved '
'sheets are first read and then written, a handling which '
'may result in some properties being lost.'))
if 'table_names' not in parameters:
parameters.set_boolean(
'table_names',
label='Use table names as filenames',
description='Use table names as filenames')
def parameter_view(self, input_list):
return DataExportXLSXWidget(
self._parameters, input_list)
def export_data(self, in_data, filename, progress=None):
"""Export Table to XLSX."""
openpyxl = _openpyxl()
_openpyxl_version = _version.parse(openpyxl.__version__)
_version_310 = _version.Version('3.1.0')
header = self._parameters['header'].value
to_sheets = self._parameters['to_sheets'].value
update_sheets = self._parameters['update_sheets'].value
update_sheets &= to_sheets
existing_workbook = False
table_names = self._parameters['table_names'].value
if not to_sheets:
in_data = [in_data]
if update_sheets:
try:
workbook = openpyxl.load_workbook(filename)
existing_workbook = True
except Exception:
workbook = openpyxl.Workbook()
# Remove default worksheet which would otherwise
# be created.
workbook.remove(workbook.active)
else:
workbook = openpyxl.Workbook(write_only=True)
local_worksheet_names = []
if existing_workbook and _openpyxl_version < _version_310:
# Remove all existing local names to work around index issues in
# openpyxl.
local_worksheet_names = _get_local_worksheet_names(workbook)
_remove_local_worksheet_names(workbook, local_worksheet_names)
sheet_names = []
for i, in_table in enumerate(in_data):
if table_names:
sheet_name = os.path.basename(in_table.get_name())
else:
sheet_name = in_table.get_name()
sheet_name = sheet_name or 'Table_{0}'.format(i)
if len(sheet_name) > 23:
sheet_name = sheet_name[:23]
j = 1
sheet_base = sheet_name
while sheet_name in sheet_names:
sheet_name = f'{sheet_base}{j}'
j += 1
sheet_names.append(sheet_name)
if existing_workbook and sheet_name in workbook:
# Should be removed before starting to create new one
# because of index issues.
del workbook[sheet_name]
for i, (sheet_name, in_table) in enumerate(zip(sheet_names, in_data)):
create_filled_sheet(
workbook, sheet_name, in_table, header)
if existing_workbook and _openpyxl_version < _version_310:
# Restore local names that are scoped to existing worksheets.
_restore_local_worksheet_names(workbook, local_worksheet_names)
if existing_workbook:
with tempfile.NamedTemporaryFile(
prefix='plugin_xlsx_exporter_', suffix='.xlsx',
delete=False,
dir=settings()['session_folder']) as f:
workbook.save(f.name)
workbook.close()
try:
os.replace(f.name, filename)
except OSError as err:
if err.errno == errno.EXDEV: # "Invalid cross-device link"
# This can happen if source and target are on different
# drives. Fallback on replacing the file non-atomically.
os.unlink(filename)
shutil.move(f.name, filename)
else:
raise
else:
workbook.save(filename)
workbook.close()
def create_filenames(self, input_list, filename):
to_sheets = self._parameters['to_sheets'].value
filename_generator = super().create_filenames(
input_list, filename)
if to_sheets:
return [next(iter(filename_generator))]
return filename_generator
def cardinality(self):
to_sheets = self._parameters['to_sheets'].value
if to_sheets:
return self.many_to_one
return self.one_to_one