Source code for node_file_operations

# This file is part of Sympathy for Data.
# Copyright (c) 2013, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
from abc import ABC, abstractmethod
import shutil
import re
import os
import os.path

from sympathy.api import node as synode
from sympathy.api import node_helper
from sympathy.api import datasource as dsrc
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, deprecated_node
from sympathy.api.exceptions import SyDataError

from sympathy.platform import exceptions
import sylib.url


DOCS = """
The destination location can either be specified in the node's configuration or
by adding the optional *File destination* input port. When the *File
destination* input port is used the destination is taken solely from that port
and not from the configuration.

When specifying the destination directory in the configuration, you can leave
it empty to use the directory of the source file as destination directory.

Missing directories will be created if needed.

Renaming files with a regex
---------------------------
Instead of specifying a static destination in the configuration, you can also
specify a destination by enabling :guilabel:`Use regex` and writing a regex and
replacement string.

For example with the regex :regexp:`^folder_([0-9]+)/datafile.csv$` and
replacement string :regexp:`datafile_\1.csv`, the destination for a file at
location :file:`folder_1234/datafile.csv` would be :file:`datafile_1234.csv`.

For more information about how to write regex, see :ref:`appendix_regex`.
"""

related = [
    'org.sysess.sympathy.files.copyfile',
    'org.sysess.sympathy.files.movefile',
    'org.sysess.sympathy.files.deletefile',
]


def check_files(datasources, no_error=False) -> list[str]:
    res = []
    for datasource in datasources:
        try:
            res.append(datasource._require_file())
        except Exception:
            if not no_error:
                raise
    return res


def check_file(datasource, no_error=False) -> str | None:
    res = None
    try:
        res = datasource._require_file()
    except Exception:
        if not no_error:
            raise
    return res


def regex_parameters(parameters):
    parameters.set_boolean(
        'use_regex', label='Regex',
        description='Turn on/off naming using a regular expression')
    parameters.set_string(
        'pattern', label='Search',
        description=(
            'Specify the regular expression that will be used for matching. '
            'Learn more about Regular expression syntax in the documentation '
            'appendix.'))
    parameters.set_string(
        'replace', label='Replace',
        description=('The string to replace the match found with the regular '
                     'expression'))
    return parameters


def exception_parameter(parameters):
    parameters.set_boolean(
        'error', label='Do not raise exceptions',
        description='If a file operation fails, do not raise an exception')
    return parameters


def output_filename(node_context, new_filename):
    if new_filename:
        ds = dsrc.File()
        ds.encode_path(new_filename)
        node_context.output['port1'].append(ds)


def get_newfilepath(
    filename: str,
    directory: str,
    new_filename: str | None = None,
    regex: tuple[str, str] | None = None,
) -> str:
    if regex:
        pattern, replace = regex
        return re.sub(pattern, replace, filename)

    if not directory:
        # Empty directory parameter means same directory as source file
        directory = os.path.dirname(filename)
    if not new_filename:
        # Empty filename parameter means same filename as source file
        new_filename = filename

    return os.path.join(os.path.abspath(directory),
                        os.path.basename(new_filename))


def add_copy_to_filename(filename: str, new_filename: str) -> str:
    if (not new_filename or
            os.path.abspath(filename) == os.path.abspath(new_filename)):
        stem, ext = os.path.splitext(filename)
        return stem + ' - Copy' + ext
    return new_filename


def copy_file(
    source_filepath: str,
    dest_filepath: str,
    no_error: bool = False,
) -> bool:
    path = os.path.dirname(dest_filepath)
    try:
        try:
            os.makedirs(path)
        except OSError:
            pass
        try:
            shutil.copyfile(source_filepath, dest_filepath)
        except FileNotFoundError as exc:
            raise SyDataError(f"File not found: {source_filepath}.") from exc
    except Exception:
        if no_error:
            return False
        raise
    return True


def move_file(
    source_filepath: str,
    dest_filepath: str,
    no_error: bool = False,
):
    path = os.path.dirname(dest_filepath)
    try:
        os.makedirs(path)
    except OSError:
        pass
    try:
        try:
            shutil.move(source_filepath, dest_filepath)
        except FileNotFoundError as exc:
            raise SyDataError(f"File not found: {source_filepath}.") from exc
    except Exception:
        if no_error:
            return False
        raise
    return True


def delete_file(filename, delete_folder=False, no_error=False):
    directory = os.path.dirname(filename) if delete_folder else None
    try:
        try:
            os.remove(filename)
        except FileNotFoundError as exc:
            raise SyDataError(f"File not found: {filename}.") from exc
    except Exception:
        if no_error:
            filename = None
        else:
            raise
    if directory:
        try:
            os.removedirs(directory)
        except Exception:
            pass
    return filename


def filename_columns(parameters):
    parameters.set_list(
        'current', label='Current filenames',
        description='The column with the current file names',
        value=[0], editor=synode.editors.combo_editor(edit=True, filter=True))
    parameters.set_list(
        'new', label='New filenames',
        description='The column with the new filenames',
        value=[0], editor=synode.editors.combo_editor(edit=True, filter=True))
    return parameters


def get_file_lists(node_context):
    parameters = node_context.parameters

    file_table = node_context.input['port2']
    columns = file_table.column_names()

    try:
        current_index = parameters['current'].value[0]
        new_index = parameters['new'].value[0]
        # Fix indices for old configurations
        if parameters['current'].list[0] == '':
            current_index -= 1
        if parameters['current'].list[0] == '':
            new_index -= 1

        current_filenames = file_table.get_column_to_array(
            columns[current_index])
        new_filenames = file_table.get_column_to_array(
            columns[new_index])
    except IndexError:
        return [], []
    if (current_filenames.dtype.kind not in ('U', 'S') or
       new_filenames.dtype.kind not in ('U', 'S')):
        raise SyDataError(
            'One or more of the input columns have the wrong type. '
            'They should be text.')
    return current_filenames, new_filenames


class SingleFileNode(ABC, synode.Node):
    author = 'Alexander Busck & Andreas Tågerud'
    tags = Tags(Tag.Disk.File)

    inputs = Ports([
        Port.Datasource('File source', name='port1', scheme='text'),
        Port.Datasource('File destination', name='port2', scheme='text',
                        n=(0, 1, 0))])
    outputs = Ports([Port.Datasource(
        'File destination', name='port1', scheme='text')])

    parameters = synode.parameters()
    parameters.set_string(
        'dest_folder', label='Directory',
        editor=synode.editors.directory_editor(),
        description='Manually enter a directory')
    parameters.set_string(
        'filename', label='Filename',
        description=('Manually enter a filename, if not using a regular '
                     'expression'))
    parameters = regex_parameters(parameters)
    parameters = exception_parameter(parameters)
    controller = synode.controller(
            when=synode.field('use_regex', state='checked'),
            action=(
                synode.field('filename', state='disabled'),
                synode.field('dest_folder', state='disabled'),
                synode.field('pattern', state='enabled'),
                synode.field('replace', state='enabled'),
            ),
        )

    def execute(self, node_context):
        parameters = node_context.parameters
        ds_in = node_context.input['port1']
        ds_out_opt = node_context.input.group('port2')
        no_error = parameters['error'].value
        use_regex = parameters['use_regex'].value
        pattern = parameters['pattern'].value
        replace = parameters['replace'].value

        in_filepath = check_file(ds_in, no_error)
        if in_filepath is None:
            return
        if ds_out_opt:
            new_filepath = check_file(
                node_context.input['port2'], no_error)
            if new_filepath is None:
                return
        else:
            new_filepath = get_newfilepath(
                in_filepath,
                parameters['dest_folder'].value,
                parameters['filename'].value,
                regex=(pattern, replace) if use_regex else None)
            new_filepath = self._add_copy_to_filename(
                in_filepath, new_filepath)
        if self._file_operation(in_filepath, new_filepath, no_error):
            node_context.output['port1'].encode_path(new_filepath)

    @abstractmethod
    def _file_operation(
            self, in_filepath: str, new_filepath: str, no_error: bool) -> bool:
        return True

    def _add_copy_to_filename(self, filename: str, new_filename: str) -> str:
        return new_filename


class MultiFileNode(ABC, synode.Node):
    author = 'Andreas Tågerud'
    tags = Tags(Tag.Disk.File)

    inputs = Ports([
        Port.Datasources('File sources', name='port1'),
        Port.Datasources('File destinations', name='port2', n=(0, 1, 0))])
    outputs = Ports([
        Port.Datasources('File destination', name='port1')])

    parameters = synode.parameters()
    parameters.set_string(
        'dest_folder', label='Directory',
        editor=synode.editors.directory_editor(),
        description=('Manually enter a directory'))
    parameters = regex_parameters(parameters)
    parameters = exception_parameter(parameters)
    controller = synode.controller(
            when=synode.field('use_regex', state='checked'),
            action=(
                synode.field('dest_folder', state='disabled'),
                synode.field('pattern', state='enabled'),
                synode.field('replace', state='enabled'),
            ),
        )

    def execute(self, node_context):
        parameters = node_context.parameters
        no_error = parameters['error'].value
        ds_in = node_context.input['port1']
        inputs = node_context.input.group('port2')
        use_regex = parameters['use_regex'].value
        pattern = parameters['pattern'].value
        replace = parameters['replace'].value

        if inputs:
            ds_out = node_context.input['port2']
            for in_filename, new_filename in zip(
                    check_files(ds_in, no_error),
                    check_files(ds_out, no_error)):
                if self._file_operation(in_filename, new_filename, no_error):
                    output_filename(node_context, new_filename)
        else:
            for in_filename in check_files(ds_in, no_error):
                new_filename = get_newfilepath(
                    in_filename, parameters['dest_folder'].value,
                    regex=(pattern, replace) if use_regex else None)
                new_filename = self._add_copy_to_filename(
                    in_filename, new_filename)
                if self._file_operation(in_filename, new_filename, no_error):
                    output_filename(node_context, new_filename)

    @abstractmethod
    def _file_operation(
            self, in_filename: str, new_filename: str, no_error: bool) -> bool:
        return True

    def _add_copy_to_filename(self, filename: str, new_filename: str) -> str:
        """Base class does not add anything to filename."""
        return new_filename


[docs] class CopyFile(SingleFileNode): __doc__ = DOCS name = 'Copy file' description = ( 'Copy a file to a destination specified in configuration or in ' 'the optional input port.') nodeid = 'org.sysess.sympathy.files.copyfile' icon = 'copy.svg' related = ['org.sysess.sympathy.files.copyfiles'] + related def _file_operation( self, in_filepath: str, new_filepath: str, no_error: bool) -> bool: return copy_file(in_filepath, new_filepath, no_error) def _add_copy_to_filename(self, filename: str, new_filename: str) -> str: """Add " - Copy" to filename if needed.""" return add_copy_to_filename(filename, new_filename)
[docs] class CopyFiles(MultiFileNode): __doc__ = DOCS name = 'Copy files' description = ( 'Copy files to a destination specified in configuration or in ' 'the optional input port.') author = 'Andreas Tågerud' nodeid = 'org.sysess.sympathy.files.copyfiles' icon = 'copy.svg' def _file_operation( self, in_filename: str, new_filename: str, no_error: bool) -> bool: return copy_file(in_filename, new_filename, no_error) def _add_copy_to_filename(self, filename: str, new_filename: str) -> str: """Add " - Copy" to filename if needed.""" return add_copy_to_filename(filename, new_filename)
@deprecated_node('8.0.0', 'Copy Files') class CopyFilesWithDatasources(synode.Node): """ Copies the input file datasources, to the locations designated in the second datasources input, element by element. Missing directories will be created if possible. """ name = 'Copy files with Datasources' description = 'Copy files to another location using a table with paths' author = 'Andreas Tågerud' nodeid = 'org.sysess.sympathy.files.copyfileswithdsrc' icon = 'copy.svg' tags = Tags(Tag.Disk.File) inputs = Ports([ Port.Datasources('Files to be copied', name='port1'), Port.Datasources('File destinations to copy to', name='port2')]) outputs = Ports([Port.Datasources('Copied files', name='port1')]) parameters = synode.parameters() parameters = exception_parameter(parameters) def execute(self, node_context): in_dss = node_context.input['port1'] out_dss = node_context.input['port2'] no_error = node_context.parameters['error'].value for in_filename, new_filename in zip(check_files(in_dss, no_error), check_files(out_dss, no_error)): copy_file(in_filename, new_filename, no_error=no_error) if new_filename: out_file = dsrc.File() out_file.encode_path(new_filename) node_context.output['port1'].append(out_file)
[docs] class MoveFile(SingleFileNode): __doc__ = DOCS name = 'Move File' description = ( 'Move a file to a destination specified in configuration or in ' 'the optional input port.') nodeid = 'org.sysess.sympathy.files.movefile' icon = 'move.svg' related = ['org.sysess.sympathy.files.movefiles'] + related def _file_operation( self, in_filepath: str, new_filepath: str, no_error: bool) -> bool: return move_file(in_filepath, new_filepath, no_error)
[docs] class MoveFiles(MultiFileNode): __doc__ = DOCS name = 'Move Files' description = ( 'Move files to a destination specified in configuration or in ' 'the optional input port.') author = 'Andreas Tågerud' nodeid = 'org.sysess.sympathy.files.movefiles' icon = 'move.svg' def _file_operation( self, in_filename: str, new_filename: str, no_error: bool) -> bool: return move_file(in_filename, new_filename, no_error)
[docs] class DeleteFile(synode.Node): """Deletes one file.""" name = 'Delete file' description = 'Delete a file' author = 'Magnus Sandén & Andreas Tågerud' nodeid = 'org.sysess.sympathy.files.deletefile' icon = 'delete.svg' tags = Tags(Tag.Disk.File) inputs = Ports([Port.Datasource('File to delete', name='port1')]) outputs = Ports([Port.Datasource('Path to deleted file', name='port1')]) parameters = synode.parameters() parameters.set_boolean( 'delete_folder', label='Delete enclosing folder if empty', description=( 'If a file that is removed is the last in that folder, ' 'the folder is removed. If this operation fails, ' 'no exception is raised.')) parameters = exception_parameter(parameters) def execute(self, node_context): if not node_context.input['port1'].decode_path(): # Legacy support. return parameters = node_context.parameters no_error = parameters['error'].value for filename in check_files([node_context.input['port1']], no_error): del_file = delete_file( filename, parameters['delete_folder'].value, no_error=parameters['error'].value) if del_file: ds = dsrc.File() ds.encode_path(del_file) node_context.output['port1'].encode_path(del_file)
[docs] @node_helper.list_node_decorator(['port1'], ['port1']) class DeleteFiles(DeleteFile): name = 'Delete files' nodeid = 'org.sysess.sympathy.files.deletefiles'
[docs] class DownloadFile(synode.Node): """ Download the data at URL to specified local file. If URL resource contains credential variables for login or token credentials these will be entered as part of the URL. See :ref:`Credentials Preferences<preferences_credentials>` for more info. """ name = 'Download URL to file' description = 'Download file from a URL to specified filename.' author = 'Erik der Hagopian, ' nodeid = 'org.sysess.sympathy.files.downloadfile' icon = 'copy.svg' tags = Tags(Tag.Disk.File) inputs = Ports([Port.Datasource( 'Datasource pointing to data to read', name='port1', scheme='text')]) outputs = Ports([Port.Datasource( 'Datasource of resulting file', name='port1', scheme='text')]) parameters = synode.parameters() parameters.set_string( 'filename', label='Filename', editor=synode.editors.savename_editor(['Any files (*)']), description=('Manually enter a filename, if not using a regular ' 'expression')) parameters.set_string( 'if_exists', label='If file already exists', value='Overwrite', description=('What to do if the file already exists'), editor=synode.editors.combo_editor( options=['Skip file', 'Overwrite', 'Raise exception'])) def execute(self, node_context): parameters = node_context.parameters input_datasource = node_context.input['port1'] output_datasource = node_context.output['port1'] filename = parameters['filename'].value if_exists = parameters['if_exists'].value if os.path.isfile(filename): if if_exists == 'Skip file': output_datasource.encode_path(filename) return elif if_exists == 'Raise exception': raise SyDataError( 'File {} already exists'.format(filename)) if input_datasource.decode_type() == output_datasource.modes.url: if not filename: raise exceptions.filename_not_empty() try: output_filename = sylib.url.download_url_with_credentials( self, input_datasource.connection(), input_datasource['env'], filename=filename) except sylib.url.RequestError as e: raise SyDataError(f"Download failed due to {e}") from e output_datasource.source(dsrc.File.from_filename(output_filename)) else: raise SyDataError('Only URL datasources can be downloaded')