Source code for sylib.library.plugins.export.table_exporters.plugin_xlsx_exporter

# This file is part of Sympathy for Data.
# Copyright (c) 2017, Combine Control Systems AB
#
# Sympathy for Data is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3 of the License.
#
# Sympathy for Data is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Sympathy for Data.  If not, see <http://www.gnu.org/licenses/>.
import re
import os
import errno
import shutil
import datetime
import tempfile
import unicodedata
from collections import defaultdict
from packaging import version as _version

import numpy as np

from sylib.export import table as exporttable
from sympathy.api import exceptions
from sympathy.api import qt2 as qt_compat
from sympathy.api.nodeconfig import settings
import sympathy.api


def _openpyxl():
    import openpyxl
    import openpyxl.styles
    import openpyxl.cell
    return openpyxl


QtGui = qt_compat.import_module('QtGui')
QtWidgets = qt_compat.import_module('QtWidgets')


def _col2ai(i):
    """
    Return corresponding A, B, .. AB notation for column number i.
    """
    chars = []
    c = i % 26
    chars.append(c)
    i = (i - c) // 26

    while i > 0:
        c = (i - 1) % 26
        chars.append(c)
        i = (i - c) // 26

    return ''.join([chr(ord('A') + c_) for c_ in reversed(chars)])


def _range_name(column_name):
    """
    Convert column name to valid name for use in workbook named range.
    crng_ + converted compatible name.
    """
    if isinstance(column_name, bytes):
        column_name = column_name.decode('ascii', errors='ignore')
    return 'crng_{}'.format(
        re.sub('[^a-zA-Z0-9]', '_', unicodedata.normalize('NFD', column_name)))


def _valid_cell(cell_cls, worksheet, row, column):
    try:
        cell = cell_cls(
            worksheet, row=row + 1, column=column + 1)
        cell.coordinate
    except Exception:
        return False
    return True


def _get_local_worksheet_names(workbook):
    local_worksheet_names = []
    for worksheet in workbook.worksheets:
        index = workbook.index(worksheet)
        local_worksheet_names.append((
            worksheet,
            [workbook.defined_names.get(n, scope=index)
             for n in workbook.defined_names.localnames(
                 scope=index)]))
    return local_worksheet_names


def _remove_local_worksheet_names(workbook, local_worksheet_names):
    for worksheet, local_names in local_worksheet_names:
        index = workbook.index(worksheet)
        for local_name in local_names:
            workbook.defined_names.delete(local_name.name, scope=index)


def _restore_local_worksheet_names(workbook, local_worksheet_names):
    worksheets = set(workbook.worksheets)
    for worksheet, local_names in local_worksheet_names:
        if worksheet in worksheets:
            index = workbook.index(worksheet)
            for local_name in local_names:
                local_name.localSheetId = index
                workbook.defined_names.append(local_name)


# Possible user controlled option, allow settings to false to discard
# data that is out of exportable range.
_fail_out_of_range = True


def create_filled_sheet(workbook, sheet_name, input_table, header):
    openpyxl = _openpyxl()
    column_names = input_table.column_names()
    worksheet = workbook.create_sheet(sheet_name)
    row_limit = 1048576  # 2 ** 20
    column_limit = 16384  # 2 ** 14, XFD
    column_letter_limit = 18278  # openpyxl limit ZZZ

    # Write data columns.
    start_row = 1 if header else 0
    end_row = input_table.number_of_rows() + start_row
    end_column = len(column_names)
    cell_cls = openpyxl.cell.Cell

    if end_row > row_limit or end_column > column_limit:
        if _fail_out_of_range:
            if not _valid_cell(
                    cell_cls, worksheet, end_row - 1, end_column - 1):
                raise exceptions.SyDataError(
                    f'Data range exceeds limits defined by the '
                    f'format: {row_limit} rows x {column_limit} columns.')
        exceptions.sywarn(
            f'Data range exceeds limits defined by the '
            f'format: {row_limit} rows x {column_limit} columns. '
            f'Cells out of range may be ignored.')

    # Find a column limit that would succeed.
    for column_limit in [column_letter_limit, column_limit]:
        if end_column > column_limit:
            if not _valid_cell(cell_cls, worksheet, 0, end_column - 1):
                end_column = column_limit
    column_names = column_names[:end_column]
    columns = {key: i for i, key in
               enumerate(column_names)}

    if header:
        bold = openpyxl.styles.Font(bold=True)
        columns = []
        for column_name in column_names:
            cell = openpyxl.cell.WriteOnlyCell(
                worksheet, value=column_name)
            cell.font = bold
            columns.append(cell)

        worksheet.append(columns)

    warn_about_datetimes = False
    warn_about_timedeltas = False

    output_table = sympathy.api.table.File()

    def mask_unwritable(arr, data, unwritable):
        if np.any(unwritable):
            mask = np.ma.getmaskarray(arr)
            if np.any((~mask) * unwritable):
                return True, np.ma.masked_array(
                    data, mask | unwritable)
        return False, None

    def unique_column_name_range(column_names):
        # Make names unique to avoid exception when redefining name.
        # Duplicate range names are be enumerated: X, X2, X3, etc.
        column_ranges = defaultdict(list)
        unique_range_names = dict()
        res = {}
        for column_name in column_names:
            column_ranges[_range_name(column_name)].append(column_name)

        for range_name, column_names in column_ranges.items():
            count = 1
            for column_name in column_names:
                range_name_curr = range_name
                while range_name_curr in unique_range_names:
                    count += 1
                    range_name_curr = f'{range_name}{count}'
                unique_range_names[range_name_curr] = column_name
                res[column_name] = range_name_curr
        return res

    column_name_range = unique_column_name_range(column_names)

    for i, column_name in enumerate(column_names):
        kind = input_table.column_type(column_name).kind
        arr = input_table[column_name]
        data = np.ma.getdata(arr)
        new_col = None

        workbook.create_named_range(
            name=column_name_range[column_name],
            worksheet=worksheet,
            value="${col}${srow}:${col}${erow}".format(
                col=_col2ai(i),
                srow=start_row + 1,
                erow=end_row),
            scope=workbook.index(worksheet))

        if kind == 'M':

            # All dates before 1900-03-01 are ambiguous because of
            # a bug in Excel which incorrectly treats the year 1900
            # as a leap year. So don't write any such dates to
            # file.

            unwritable = data < datetime.datetime(1900, 3, 1)
            warn, new_col = mask_unwritable(
                arr,
                data,
                unwritable)
            warn_about_datetimes |= warn

        elif kind == 'm':

            # Timedeltas can't be written to excel in general, but
            # if the delta is less than a day, it can be
            # represented as a time.

            warn, new_col = mask_unwritable(
                arr,
                data,
                data >= datetime.timedelta(days=1))
            warn_about_timedeltas |= warn

        if new_col is None:
            output_table.update_column(column_name, input_table)
        else:
            output_table[column_name] = new_col

    for row in output_table.to_rows():
        worksheet.append(row)

    # Give a warning about problematic datetimes and/or timedeltas
    if warn_about_datetimes:
        exceptions.sywarn(
            "Not writing any datetimes before 1900-03-01. All dates "
            "before 1900-03-01 are ambiguous because of a bug in Excel.")
    if warn_about_timedeltas:
        exceptions.sywarn(
            "Not writing any timedeltas bigger than or equal to one full "
            "day. Such timedeltas can't be expressed in Excel.")


class DataExportXLSXWidget(QtWidgets.QWidget):
    filename_changed = qt_compat.Signal()

    def __init__(self, parameter_root, input_list):
        super().__init__()
        self._parameter_root = parameter_root
        self._input_list = input_list
        self._init_gui()

    def _init_gui(self):
        vlayout = QtWidgets.QVBoxLayout()

        self._to_sheets = self._parameter_root['to_sheets']
        self._to_sheets_check_box = self._to_sheets.gui()
        self._to_sheets_check_box.valueChanged.connect(
            self._filename_changed)

        self._update_sheets = self._parameter_root['update_sheets']
        self._update_sheets_check_box = self._update_sheets.gui()
        self._update_sheets_check_box.valueChanged.connect(
            self._filename_changed)
        self._update_sheets_check_box.setEnabled(self._to_sheets.value)

        self._table_names = self._parameter_root['table_names']
        self.table_names_gui = self._parameter_root['table_names'].gui()
        self.table_names_gui.valueChanged.connect(self._filename_changed)
        self.table_names_gui.setEnabled(not self._to_sheets.value)

        vlayout.addWidget(self.table_names_gui)
        vlayout.addWidget(self._to_sheets_check_box)
        vlayout.addWidget(self._update_sheets_check_box)
        vlayout.addWidget(self._parameter_root['header'].gui())

        self._to_sheets_check_box.stateChanged[int].connect(
            self._to_sheets_state_changed)

        self.setLayout(vlayout)

    @qt_compat.Slot(int)
    def _to_sheets_state_changed(self, value):
        if self._to_sheets.value:
            self._table_names.value = False
            self.table_names_gui.setEnabled(False)
            self._update_sheets_check_box.setEnabled(True)
        else:
            self.table_names_gui.setEnabled(True)
            self._update_sheets_check_box.setEnabled(False)

    def _filename_changed(self):
        self.filename_changed.emit()


[docs]class DataExportXLSX(exporttable.TableDataExporterBase): """Exporter for XLSX files.""" EXPORTER_NAME = 'XLSX' FILENAME_EXTENSION = 'xlsx' DEFAULT_XLSX_NAME = 'xlsx_filename' def __init__(self, parameters): super().__init__(parameters) if 'header' not in parameters: parameters.set_boolean( 'header', value=True, label='Export header', description='Export column names') if 'to_sheets' not in parameters: parameters.set_boolean( 'to_sheets', label='Export data to sheets', description=( 'Select to export incoming tables to one file with ' 'multiple sheets instead of multiple files with one ' 'sheet.')) if 'update_sheets' not in parameters: parameters.set_boolean( 'update_sheets', label='Replace existing sheets', description=( 'Export data to sheets in an existing file, creating the ' 'file if it does not exist. Existing sheets are replaced ' 'by input tables when table name == sheet name, and are ' 'otherwise preserved.\n\n' 'Note that the existing file is re-written and preserved ' 'sheets are first read and then written, a handling which ' 'may result in some properties being lost.')) if 'table_names' not in parameters: parameters.set_boolean( 'table_names', label='Use table names as filenames', description='Use table names as filenames') def parameter_view(self, input_list): return DataExportXLSXWidget( self._parameters, input_list) def export_data(self, in_data, filename, progress=None): """Export Table to XLSX.""" openpyxl = _openpyxl() _openpyxl_version = _version.parse(openpyxl.__version__) _version_310 = _version.Version('3.1.0') header = self._parameters['header'].value to_sheets = self._parameters['to_sheets'].value update_sheets = self._parameters['update_sheets'].value update_sheets &= to_sheets existing_workbook = False table_names = self._parameters['table_names'].value if not to_sheets: in_data = [in_data] if update_sheets: try: workbook = openpyxl.load_workbook(filename) existing_workbook = True except Exception: workbook = openpyxl.Workbook() # Remove default worksheet which would otherwise # be created. workbook.remove(workbook.active) else: workbook = openpyxl.Workbook(write_only=True) local_worksheet_names = [] if existing_workbook and _openpyxl_version < _version_310: # Remove all existing local names to work around index issues in # openpyxl. local_worksheet_names = _get_local_worksheet_names(workbook) _remove_local_worksheet_names(workbook, local_worksheet_names) sheet_names = [] for i, in_table in enumerate(in_data): if table_names: sheet_name = os.path.basename(in_table.get_name()) else: sheet_name = in_table.get_name() sheet_name = sheet_name or 'Table_{0}'.format(i) if len(sheet_name) > 23: sheet_name = sheet_name[:23] j = 1 sheet_base = sheet_name while sheet_name in sheet_names: sheet_name = f'{sheet_base}{j}' j += 1 sheet_names.append(sheet_name) if existing_workbook and sheet_name in workbook: # Should be removed before starting to create new one # because of index issues. del workbook[sheet_name] for i, (sheet_name, in_table) in enumerate(zip(sheet_names, in_data)): create_filled_sheet( workbook, sheet_name, in_table, header) if existing_workbook and _openpyxl_version < _version_310: # Restore local names that are scoped to existing worksheets. _restore_local_worksheet_names(workbook, local_worksheet_names) if existing_workbook: with tempfile.NamedTemporaryFile( prefix='plugin_xlsx_exporter_', suffix='.xlsx', delete=False, dir=settings()['session_folder']) as f: workbook.save(f.name) workbook.close() try: os.replace(f.name, filename) except OSError as err: if err.errno == errno.EXDEV: # "Invalid cross-device link" # This can happen if source and target are on different # drives. Fallback on replacing the file non-atomically. os.unlink(filename) shutil.move(f.name, filename) else: raise else: workbook.save(filename) workbook.close() def create_filenames(self, input_list, filename): to_sheets = self._parameters['to_sheets'].value filename_generator = super().create_filenames( input_list, filename) if to_sheets: return [next(iter(filename_generator))] return filename_generator def cardinality(self): to_sheets = self._parameters['to_sheets'].value if to_sheets: return self.many_to_one return self.one_to_one