Source code for sympathy.utils.filebase

# This file is part of Sympathy for Data.
# Copyright (c) 2013, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
"""
Utility functions needed to read and write tables from/to different
formats.
"""
from typing import Optional

from .. datasources.info import (get_fileinfo_from_scheme,
                                 get_scheme_from_file)
from .. platform.types import (from_string_alias, from_type_expand,
                               from_string_expand)
from .. types import sylist
from .. platform.types import manager as type_manager
from .. types.factory import typefactory
from .. platform import types
from .. platform import exceptions
from . import port as port_util
from . import complete
from . import names as _names


class PPrintUnicode:
    """
    Base class for pretty printing in IPython.

    Any subclass will be printed with unicode(obj) instead of the default
    repr(obj) when they are the result of an expression in IPython. This allows
    for higher interactivity when working in IPython.
    """
    def _repr_mimebundle_(self, include=None, exclude=None):
        """
        For ipython integration, determines how values of this type are written
        to the console.

        Can be customized in subclasses and can be used to add support for
        other kinds of output such as text/html.
        """
        return {'text/plain': str(self)}


def typeutil(typealias):
    def inner(cls):
        declaration = from_string_alias(typealias)
        cls.container_type = declaration
        type_manager.set_typealias_util(declaration.name(), cls)
        return cls
    return inner


def from_file(filename, scheme=None, sytype=None, external=True):

    if scheme is None:
        scheme = get_scheme_from_file(filename)

    if scheme is None:
        return None

    fileinfo_ = fileinfo(filename, scheme)

    if sytype is None:
        sytype = fileinfo_.type()

    return port_maker(
        {'file': filename, 'scheme': scheme,
         'type': sytype}, 'r', external)


def to_file(filename, scheme, sytype, dstype=None, external=True):
    return port_maker(
        {'file': filename, 'scheme': scheme,
         'type': sytype, 'dstype': dstype}, 'w', external)


def from_type(sytype):
    return typefactory.from_type(sytype)


def empty_from_type(sytype):
    return typefactory.from_type(sytype)


def fileinfo(filename, scheme=None):
    if scheme is None:
        scheme = get_scheme_from_file(filename)

    return get_fileinfo_from_scheme(scheme)(filename)


def filetype(filename):
    try:
        fileinfo_ = fileinfo(filename)
        return fileinfo_.type()
    except Exception:
        pass


def is_type(sytype, filename, scheme='hdf5'):
    info = fileinfo(filename, scheme)
    try:
        return fileinfo.type() == str(sytype)
    except (KeyError, AttributeError, TypeError):
        pass
    try:
        return (str(from_string_expand(info.datatype())) ==
                str(from_type_expand(sytype)))
    except TypeError:
        return False


class FileManager(PPrintUnicode):
    """FileManager handles data contexts for File and FileList."""
    container_type = None
    ELEMENT = None

    def __init__(self, fileobj, data, filename, mode, scheme,
                 import_links=False):
        """
        Fileobj is a file owned. It should be closed by self.
        Data is a borrowed file. It shall not be closed by self.
        Filename is used to construct a new fileobj.
        Mode and scheme are used together with filename to construct
        the filename.
        Import_links is only usable together with filename and enables links
        to the file source to be written.

        Fileobj, data and filename are mutually exclusive.
        """
        self._external_input_file = False

        if filename is not None:
            if mode == 'r':
                self._external_input_file = True

            elif mode != 'w':
                raise AssertionError(
                    "Supported values for mode are: 'r' and 'w', but '{}'"
                    " was given.".format(mode))
        self._data = data
        self.__fileobj = fileobj

        if fileobj is not None:
            self._data = fileobj
        elif data is not None:
            pass
        elif filename is not None:
            if mode == 'w' and import_links:
                exceptions.sywarn(
                    "Argument: 'import_links' must be False for mode 'w'.")
                import_links = False

            self.__fileobj = open_file(
                filename=filename, mode=mode, external=not import_links,
                sytype=self._storage_type(), dstype=self.container_type,
                scheme='hdf5')
            self._data = self.__fileobj
        else:
            self._data = typefactory.from_type(self.container_type)._data

        if isinstance(self._data, type(self)):
            # TODO(erik): Handle per case above, not for all cases at once.
            # Avoiding double wrapping with non-managed nodes.
            assert False, '2.1.0: Should be finished with double-wrapping now!'
            self._data = self._data._data

    def _storage_type(self):
        return self.container_type

    def _copy_base(self):
        cls = type(self)
        obj = cls.__new__(cls)
        obj._data = self._data
        obj.__fileobj = self.__fileobj
        obj._external_input_file = self._external_input_file
        return obj

    def __copy__(self):
        return self._copy_base()

    def __deepcopy__(self, memo=None):
        obj = self._copy_base()
        obj._data = self._data.__deepcopy__()
        return obj

    def writeback(self):
        self._data.writeback()

    def sync(self):
        pass

    def _writeback(self, datasource, link=None):
        return self._data._writeback(datasource, link)

    @classmethod
    def is_type(cls, filename, scheme='hdf5'):
        return is_type(cls.container_type, filename, scheme)

    @staticmethod
    def is_valid():
        return True

    def close(self):
        """Close the referenced data file."""
        # TODO(erik): Handle ownership on close.
        if not self._external_input_file:
            self.sync()
        if self.__fileobj is not None:
            self.__fileobj.close()

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.close()


class FileBase(FileManager):
    """File represents the top level of a table"""
    container_type = None

    def __init__(self, fileobj=None, data=None, filename=None, mode='r',
                 scheme='hdf5', source=None,
                 import_links=False):
        if filename is not None and mode is None:
            mode = 'r'
        super().__init__(fileobj, data, filename, mode, scheme,
                         import_links=import_links)
        self._extra_init(fileobj, data, filename, mode, scheme, source)

    def _extra_init(self, fileobj, data, filename, mode, scheme, source):
        if source:
            self.source(source)
        else:
            self.init()

    def init(self):
        pass

    def source(self, other, shallow=False):
        """
        Update self with a deepcopy of the data from other, without keeping the
        old state.

        self and other must be of the exact same type.
        """
        self._data.source(other._data, shallow=shallow)


[docs] class TypeAlias(FileBase): """ Base for implementing custom sympathy types. Data serialization is performed through self._data which is setup by __init__ and contains the storage level representation of the data. If the object introduces additional instance fields, __deepcopy__ and sync likely have to be re-implemented. Carefully, read the relevant docstrings. Do not implement __init__, instead implement init. """ container_type = None def __init__(self, filename: Optional[str] = None, mode: str = 'r', **kwargs): super().__init__(filename=filename, mode=mode, **kwargs) @classmethod def port(cls, description, name=None, **kwargs): """ Return a new port for cls. """ return port_util.CustomPort( cls.container_type.name(), description, name=name, **kwargs)
[docs] @classmethod def viewer(cls): """ Return viewer class, which must be a subclass of sympathy.api.typeutil.ViewerBase """ return None
[docs] @classmethod def icon(cls): """ Return full path to svg icon. """ return None
[docs] def names(self, kind=None, fields=None, **kwargs): """ Return data related to names of some kind. In fact, names can go beyond finding names and find for example types. Useful if this type has some kind of names that would be useful in adjust_parameters. Parameters ---------- kind: str The kind of names your are interested in. fields: str or [str] The fields you would like to include in the result. For example, name and type. Returns ------- list or iterator Normally, containing scalar elements if fields is scalar and tuple of multiple such elements when fields is list. """ return []
def types(self, kind=None, **kwargs): """ Obsoleted by names. Use names instead. types(kind=X) === names(kind=X, fields='type'). Return types associated with names(). """ return self.names(kind='cols', fields='type') def completions(self, **kwargs): """ Return completions builder for this object. """ return complete.builder()
[docs] def init(self): """ Perform any initialization, such as, defining local fields. """ pass
def index(self, limit=None) -> dict: """ INTERNAL use only! Return READ-ONLY index of internal storage including typealiases. Caller may view but not modify returned structure. Limit can be used, for performance reasons, to exclude certain items from the output. """ if limit: data = self._data.index(limit['data']) else: data = self._data.index() return { 'type': 'sytypealias', 'name': self.container_type.name(), 'data': data, } def set_index(self, index: dict) -> None: """ INTERNAL use only! Set index to provided index (produced by matching index()). Does nothing unless implemented. Provided index need to match the internal storage data-structure exactly and can therefore not be set after modifications. Caller hands over ownership of index and may not modify the argument structure. """ assert index['type'] == 'sytypealias', ( 'Index has incorrect type.') assert index['name'] == self.container_type.name(), ( 'Index has incorrect type alias.') assert 'data' in index, ( 'Index is missing data.')
[docs] def source(self, other, shallow=False): """ Update self with the data from other, discarding any previous state in self. Parameters ---------- other: type of self Object used as the source for (to update) self. shallow: bool When shallow is True a deepcopy of other will be avoided to improve performance, shallow=True must only be used in operations that do not modify other. When shallow is False the result should be similar to performing the shallow=True with a deepcopy of other so that no modifications of either self or other, after the source operation, can affect the other object. """ raise NotImplementedError
[docs] def sync(self): """ Synchronize data fields that are kept in memory to self._data. Called before data is written to disk and must be re-implemented by subclasses that introduce additional fields to ensure that the fields will be written through self._data. """ pass
[docs] def __deepcopy__(self, memo=None): """ Return new TypeAlias object that does not share references with self. Must be re-implemented by subclasses that introduce additional fields to ensure that the fields are copied to the returned object. """ return super().__deepcopy__()
def can_write(self) -> bool: """ Return True if self._data is backed by a writable file (e.g., the output ports of nodes) and False otherwise. Internal ports in locked subflows and lambdas are not writable. """ return self._data.can_write() or False def _storage_type(self): return self.container_type.get()
def calc_quote(text): return repr(text) class FileListBase(sylist, PPrintUnicode): """FileList represents a list of Files.""" sytype: str = '' scheme: str = '' def __new__(cls, filename: str = None, mode: str = 'r', **kwargs): import_links = kwargs.pop('import_links', False) if mode == 'w' and import_links: exceptions.sywarn( "Argument: 'import_links' must be False for mode 'w'.") import_links = False fileobj = open_file(filename=filename, mode=mode, external=not import_links, sytype=types.from_string(cls.sytype), dstype=None, scheme=cls.scheme) obj = fileobj obj.__class__ = cls obj._fileobj = fileobj return obj def __init__(self, filename: str = None, mode: str = 'r', **kwargs): pass def __enter__(self): return self def __exit__(self, *args): self.close() def close(self): super().close() def is_type(self, filename, scheme=None): return is_type(types.from_string(self.sytype), filename, scheme) def set_read_through(self): exceptions.sywarn('set_read_through is not implemented.') def set_write_through(self): exceptions.sywarn('set_write_through is not implemented.') def is_read_through(self): return False def is_write_through(self): return False def __str__(self): repr_line = repr(self) elements_str = " {} element{}".format( len(self), "s" if len(self) != 1 else "") return repr_line + ":\n" + elements_str def __copy__(self): obj = super().__copy__() obj._fileobj = self._fileobj return obj def __deepcopy__(self, memo=None): obj = super().__deepcopy__() obj._fileobj = self._fileobj return obj def __repr__(self): mode = 'Buffered ' id_ = hex(id(self)) return "<{}FileList object at {}>".format(mode, id_) def open_file(filename=None, mode='r', external=True, sytype=None, dstype=None, scheme='hdf5'): fileobj = None assert mode in 'rw', "Mode should be 'r' or 'w'" if filename is not None: if mode == 'r': fileobj = from_file( filename, external=external, sytype=sytype) elif mode == 'w': assert sytype is not None, "Mode 'w' requires sytype" assert scheme is not None, "Mode 'w' requires scheme" fileobj = to_file(filename, scheme, sytype, dstype, external=external) else: fileobj = from_type(sytype) return fileobj def names(kind, fields): return _names.names(kind, fields) def port_maker(port_information, mode, external=True, no_datasource=False): """ Typaliases should be simplified with intra-dependencies expanded. """ if no_datasource: return port_mem_maker(port_information) else: return port_file_maker( port_information, mode, external) def _undefined_alias_error(alias, error): e = error part = ''.join([str(a) for a in e.args]) return exceptions.SyNodeError( f'Undefined type {part} in port type {alias}') def port_mem_maker(port_information): alias = port_information['type'] try: return typefactory.from_type(types.from_string(alias)) except types.UndefinedTypeError as e: raise _undefined_alias_error(alias, e) def port_file_maker(port_information, mode, external=True): """Return maker for port.""" link = not external alias = port_information['type'] dstype = port_information.get('dstype') if dstype is not None: dstype = str(dstype) try: type_expanded = str(types.from_string_expand(alias)) except types.UndefinedTypeError as e: raise _undefined_alias_error(alias, e) type_expanded = type_expanded.replace('sytable', 'table') type_expanded = type_expanded.replace('sytext', 'text') data = { 'scheme': port_information['scheme'], 'type': str(alias), 'dstype': dstype, 'type_expanded': type_expanded, 'mode': mode, 'external': external, 'can_link': link, 'path': [], 'resource': port_information['file'], } return typefactory.from_dict(data)