Source code for node_file_operations

# coding=utf8
# Copyright (c) 2013, 2017, System Engineering Software Society
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the System Engineering Software Society nor the
#       names of its contributors may be used to endorse or promote products
#       derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED.
# IN NO EVENT SHALL SYSTEM ENGINEERING SOFTWARE SOCIETY BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import shutil
import re
import os
import os.path

import numpy as np

from sympathy.api import node as synode
from sympathy.api import node_helper
from sympathy.api import datasource as dsrc
from sympathy.api.nodeconfig import (Port, Ports, Tag, Tags, adjust,
                                     deprecated_node)
from sympathy.api.exceptions import SyDataError
from sympathy.api import exceptions
import sylib.url


def copy_file(
        filename, new_filename=None, directory=None, regex=False, pattern=None,
        replace=None, no_error=False):
    if regex:
        new_filename = re.sub(pattern, replace, filename)
    elif directory:
        new_filename = os.path.join(
            directory, os.path.basename(filename))

    if (not new_filename or
            os.path.abspath(new_filename) == os.path.abspath(filename)):
        in_filename, ext = os.path.splitext(filename)
        new_filename = in_filename + ' - Copy' + ext

    new_filename = os.path.abspath(new_filename)
    path = os.path.dirname(new_filename)
    try:
        try:
            os.makedirs(path)
        except (OSError, IOError):
            pass
        shutil.copyfile(filename, new_filename)
    except Exception:
        if no_error:
            return None
        raise
    return new_filename


def delete_file(filename, delete_folder=False, no_error=False):
    directory = os.path.dirname(filename) if delete_folder else None
    try:
        os.remove(filename)
    except Exception:
        if no_error:
            filename = None
        else:
            raise
    if directory:
        try:
            os.removedirs(directory)
        except Exception:
            pass
    return filename


def rename_file(
        filename, new_filename=None, directory=None, regex=False,
        pattern=None, replace=None, no_error=False):
    if not new_filename:
        new_filename = filename
    new_filename = os.path.join(
        os.path.dirname(filename), os.path.basename(new_filename))
    if regex:
        new_filename = re.sub(pattern, replace, filename)
    try:
        os.rename(filename, new_filename)
    except (OSError, IOError):
        if not no_error:
            raise
        return None
    return new_filename


def move_file(filename, new_filename, no_error=False):
    path = os.path.dirname(new_filename)
    try:
        os.makedirs(path)
    except (OSError, IOError):
        pass
    try:
        shutil.move(filename, new_filename)
    except (OSError, IOError):
        if no_error:
            return None
        raise
    return new_filename


def regex_only_parameters(parameters):
    parameters.set_string(
        'pattern', label='Search',
        description=(
            'Specify the regular expression that will be used for matching'))
    parameters.set_string(
        'replace', label='Replace',
        description=('The string to replace the match found with the regular '
                     'expression'))
    return parameters


def regex_parameters(parameters, is_dir=False):
    parameters.set_boolean(
        'use_regex', label='Regex',
        description='Turn on/off naming using a regular expression')
    parameters = regex_only_parameters(parameters)
    return parameters


def exception_parameter(parameters):
    parameters.set_boolean(
        'error', label='Do not raise exceptions',
        description='If a file operation fails, do not raise an exception')
    return parameters


def delete_folder_parameter(parameters):
    parameters.set_boolean(
        'delete_folder', label='Delete enclosing folder if empty',
        description=(
            'If a file that is removed is the last in that folder, '
            'the folder is removed. If this operation fails, '
            'no exception is raised.'))
    return parameters


def filename_columns(parameters):
    parameters.set_list(
        'current', label='Current filenames',
        description='The column with the current file names',
        value=[0], editor=synode.Util.combo_editor(edit=True, filter=True))
    parameters.set_list(
        'new', label='New filenames',
        description='The column with the new filenames',
        value=[0], editor=synode.Util.combo_editor(edit=True, filter=True))
    return parameters


def dir_param(parameters):
    parameters.set_string(
        'filename', label='Directory',
        editor=synode.Util.directory_editor(),
        description=('Manually enter a directory'))
    return parameters


def file_param(parameters):
    parameters.set_string(
        'filename', label='Filename',
        editor=synode.Util.savename_editor(['Any files (*)']),
        description=('Manually enter a filename, if not using a regular '
                     'expression'))
    return parameters


def regex_controllers():
    return (
        synode.controller(
            when=synode.field('use_regex', state='checked'),
            action=(
                synode.field('filename', state='disabled'),
                synode.field('pattern', state='enabled'),
                synode.field('replace', state='enabled'),
            ),
        ),
    )


def get_file_lists(node_context):
    parameters = node_context.parameters

    file_table = node_context.input['port2']
    columns = file_table.column_names()

    try:
        current_index = parameters['current'].value[0]
        new_index = parameters['new'].value[0]
        # Fix indices for old configurations
        if parameters['current'].list[0] == '':
            current_index -= 1
        if parameters['current'].list[0] == '':
            new_index -= 1

        current_filenames = file_table.get_column_to_array(
            columns[current_index])
        new_filenames = file_table.get_column_to_array(
            columns[new_index])
    except IndexError:
        return [], []
    if (current_filenames.dtype.kind not in ('U', 'S') or
       new_filenames.dtype.kind not in ('U', 'S')):
            raise SyDataError(
                'One or more of the input columns have the wrong type. '
                'They should be text.')
    return current_filenames, new_filenames


[docs]class CopyFile(synode.Node): """ Copy a file from one directory to another. Missing directories will be created if possible. """ name = 'Copy file' description = ('Copy files to another location. It is possible to name ' 'the copies using a regular expression.') author = 'Alexander Busck & Andreas Tågerud' version = '1.0' nodeid = 'org.sysess.sympathy.files.copyfile' icon = 'copy.svg' tags = Tags(Tag.Disk.File) inputs = Ports([Port.Datasource( 'Datasource of file to be copied', name='port1', scheme='text')]) outputs = Ports([Port.Datasource( 'Datasource of copied file', name='port1', scheme='text')]) parameters = synode.parameters() parameters = file_param(parameters) parameters = regex_parameters(parameters) parameters = exception_parameter(parameters) controllers = regex_controllers() def execute(self, node_context): parameters = node_context.parameters ds_in = node_context.input['port1'].decode() try: filename = ds_in['path'] except KeyError: return new_filename = copy_file( filename, parameters['filename'].value, None, parameters['use_regex'].value, parameters['pattern'].value, parameters['replace'].value, parameters['error'].value) if new_filename: node_context.output['port1'].encode_path(new_filename)
[docs]class CopyFiles(synode.Node): """ Copy multiple files from one directory to another. Missing directories will be created if possible. """ name = 'Copy files' description = ('Copy files to another location. It is possible to name ' 'the copies using a regular expression.') author = 'Andreas Tågerud' version = '1.0' nodeid = 'org.sysess.sympathy.files.copyfiles' icon = 'copy.svg' tags = Tags(Tag.Disk.File) inputs = Ports([Port.Datasources('Files to be copied', name='port1')]) outputs = Ports([Port.Datasources('Copied files', name='port1')]) parameters = synode.parameters() parameters = dir_param(parameters) parameters = regex_parameters(parameters) parameters = exception_parameter(parameters) controllers = regex_controllers() def execute(self, node_context): parameters = node_context.parameters for filename in node_context.input['port1']: new_filename = copy_file( filename.decode_path(), None, parameters['filename'].value, parameters['use_regex'].value, parameters['pattern'].value, parameters['replace'].value, parameters['error'].value) if new_filename: ds = dsrc.File() ds.encode_path(new_filename) node_context.output['port1'].append(ds)
[docs]@deprecated_node('1.7.0', 'Copy files with Datasources') class CopyFilesWithTable(synode.Node): """ Copies the input file datasources, to the locations designated in the input Table. Relative and absolute paths can be used in the input Table. Missing directories will be created if possible. """ name = 'Copy files with Table' description = 'Copy files to another location using a table with paths' author = 'Andreas Tågerud' version = '1.0' nodeid = 'org.sysess.sympathy.files.copyfileswithtable' icon = 'copy.svg' tags = Tags(Tag.Disk.File) inputs = Ports([ Port.Datasources('Files to be copied', name='port1'), Port.Table('New file names', name='port2')]) outputs = Ports([Port.Datasources('Copied files', name='port1')]) parameters = synode.parameters() parameters = filename_columns(parameters) parameters = exception_parameter(parameters) def adjust_parameters(self, node_context): adjust(node_context.parameters['current'], node_context.input['port2']) adjust(node_context.parameters['new'], node_context.input['port2']) def execute(self, node_context): current_filenames, new_filenames = get_file_lists(node_context) for ds in node_context.input['port1']: abs_path = os.path.abspath(ds.decode_path()) rel_path = os.path.relpath(abs_path) if abs_path in current_filenames: file = abs_path elif rel_path in current_filenames: file = rel_path else: continue new_filename = copy_file( file, new_filenames[np.where(current_filenames == file)[0][0]], no_error=node_context.parameters['error'].value) new_file = dsrc.File() new_file.encode_path(new_filename) node_context.output['port1'].append(new_file)
[docs]class CopyFilesWithDatasources(synode.Node): """ Copies the input file datasources, to the locations designated in the second datasources input, element by element. Missing directories will be created if possible. """ name = 'Copy files with Datasources' description = 'Copy files to another location using a table with paths' author = 'Andreas Tågerud' version = '1.0' nodeid = 'org.sysess.sympathy.files.copyfileswithdsrc' icon = 'copy.svg' tags = Tags(Tag.Disk.File) inputs = Ports([ Port.Datasources('Files to be copied', name='port1'), Port.Datasources('File destinations to copy to', name='port2')]) outputs = Ports([Port.Datasources('Copied files', name='port1')]) parameters = synode.parameters() parameters = exception_parameter(parameters) def execute(self, node_context): in_dss = node_context.input['port1'] out_dss = node_context.input['port2'] for in_ds, out_ds in zip(in_dss, out_dss): new_filename = copy_file( in_ds.decode_path(), out_ds.decode_path(), no_error=node_context.parameters['error'].value) if new_filename: out_file = dsrc.File() out_file.encode_path(new_filename) node_context.output['port1'].append(out_file)
[docs]class DeleteFile(synode.Node): """Deletes one file.""" name = 'Delete file' description = 'Delete a file' author = 'Magnus Sandén & Andreas Tågerud' version = '1.0' nodeid = 'org.sysess.sympathy.files.deletefile' icon = 'delete.svg' tags = Tags(Tag.Disk.File) inputs = Ports([Port.Datasource('File to delete', name='port1')]) outputs = Ports([Port.Datasource('Path to deleted file', name='port1')]) parameters = synode.parameters() parameters = delete_folder_parameter(parameters) parameters = exception_parameter(parameters) def execute(self, node_context): parameters = node_context.parameters filename = node_context.input['port1'].decode_path() if filename: del_file = delete_file( filename, parameters['delete_folder'].value, no_error=parameters['error'].value) if del_file: ds = dsrc.File() ds.encode_path(del_file) node_context.output['port1'].encode_path(del_file)
[docs]@node_helper.list_node_decorator(['port1'], ['port1']) class DeleteFiles(DeleteFile): name = 'Delete files' nodeid = 'org.sysess.sympathy.files.deletefiles'
[docs]@deprecated_node('1.7.0', 'Move file and possibly Rename datasource with Regex') class RenameFile(synode.Node): """Rename one file using a new filename.""" name = 'Rename File' version = '1.0' author = 'Andreas Tågerud' icon = 'rename.svg' description = 'Rename file in the current directory' nodeid = 'org.sysess.sympathy.files.renamefile' tags = Tags(Tag.Disk.File) inputs = Ports([Port.Datasource('File to rename', name='port1')]) outputs = Ports([Port.Datasource('Path to renamed file', name='port1')]) parameters = synode.parameters() parameters = file_param(parameters) parameters = regex_parameters(parameters) parameters = exception_parameter(parameters) controllers = regex_controllers() def execute(self, node_context): parameters = node_context.parameters filename = node_context.input['port1'].decode_path() if filename: new_filename = rename_file( filename, parameters['filename'].value, None, parameters['use_regex'].value, parameters['pattern'].value, parameters['replace'].value, no_error=parameters['error'].value) if new_filename: node_context.output['port1'].encode_path(new_filename)
[docs]@deprecated_node( '1.7.0', 'Move files and possibly Rename datasource with Regex') @node_helper.list_node_decorator(['port1'], ['port1']) class RenameFiles(RenameFile): name = 'Rename Files' nodeid = 'org.sysess.sympathy.files.renamefiles'
[docs]@deprecated_node( '1.7.0', 'Move files') class RenameFilesWithTable(synode.Node): """ Using a table with old names and new names in two columns, this node renames files in the same folder as the input datasources. The datasources can point to multiple files in different directories. The column with the current filenames can be an absolute or relative path, but the column with the new names should only contain the filename, with no directory prefix. If a prefix is present it will be discarded. """ name = 'Rename Files with Table' author = 'Andreas Tågerud' version = '1.0' icon = 'rename.svg' description = ( 'Using a table with a a column for current filenames and one for new ' 'filenames, rename the files') nodeid = 'org.sysess.sympathy.files.renamefileswithtable' tags = Tags(Tag.Disk.File) inputs = Ports([ Port.Datasources('Files to be renamed', name='port1'), Port.Table('List of new filenames', name='port2')]) outputs = Ports([Port.Datasources('Renamed files', name='port1')]) parameters = synode.parameters() parameters = filename_columns(parameters) parameters = exception_parameter(parameters) def adjust_parameters(self, node_context): adjust(node_context.parameters['current'], node_context.input['port2']) adjust(node_context.parameters['new'], node_context.input['port2']) def execute(self, node_context): filenames, new_filenames = get_file_lists(node_context) filenames = np.array([os.path.basename(file) for file in filenames]) for ds in node_context.input['port1']: orig = ds.decode_path() directory, file = os.path.split(orig) if orig in filenames: idx = np.where(filenames == orig)[0][0] elif file in filenames: idx = np.where(filenames == file)[0][0] else: continue new = os.path.join(directory, os.path.basename(new_filenames[idx])) new = rename_file( orig, new, no_error=node_context.parameters['error'].value) new_file = dsrc.File() new_file.encode_path(new) node_context.output['port1'].append(new_file)
[docs]class MoveFile(synode.Node): """Move one file from one location to another.""" name = 'Move File' author = 'Andreas Tågerud' version = '1.0' icon = 'move.svg' description = ( 'Moves a file to new location using a datasource with the location') nodeid = 'org.sysess.sympathy.files.movefile' tags = Tags(Tag.Disk.File) inputs = Ports([ Port.Datasource('File to be moved', name='port1'), Port.Datasource('New location', name='port2')]) outputs = Ports([Port.Datasource('Moved file', name='port1')]) parameters = synode.parameters() parameters = exception_parameter(parameters) def execute(self, node_context): filename = node_context.input['port1'].decode_path() if filename: new_filename = move_file( filename, node_context.input['port2'].decode_path(), no_error=node_context.parameters['error'].value) if new_filename: node_context.output['port1'].encode_path(new_filename)
[docs]@node_helper.list_node_decorator(['port1', 'port2'], ['port1']) class MoveFiles(MoveFile): name = 'Move Files' nodeid = 'org.sysess.sympathy.files.movefiles'
[docs]class DownloadFile(synode.Node): name = 'Download URL to file' description = 'Download file from a URL to specified filename.' author = 'Erik der Hagopian, ' version = '1.0' nodeid = 'org.sysess.sympathy.files.downloadfile' icon = 'copy.svg' tags = Tags(Tag.Disk.File) inputs = Ports([Port.Datasource( 'Datasource pointing to data to read', name='port1', scheme='text')]) outputs = Ports([Port.Datasource( 'Datasource of resulting file', name='port1', scheme='text')]) parameters = synode.parameters() parameters = file_param(parameters) parameters.set_string( 'if_exists', label='If file already exists', value='Overwrite', description=('What to do if the file already exists'), editor=synode.Util.combo_editor( options=['Skip file', 'Overwrite', 'Raise exception'])) def execute(self, node_context): parameters = node_context.parameters input_datasource = node_context.input['port1'] output_datasource = node_context.output['port1'] filename = parameters['filename'].value if_exists = parameters['if_exists'].value if os.path.isfile(filename): if if_exists == 'Skip file': output_datasource.encode_path(filename) return elif if_exists == 'Raise exception': raise exceptions.SyDataError( 'File {} already exists'.format(filename)) if input_datasource.decode_type() == output_datasource.modes.url: output_datasource.source(sylib.url.download_datasource( input_datasource, filename)) else: exceptions.SyDataError('Only URL datasources can be downloaded')