# This file is part of Sympathy for Data.
# Copyright (c) 2013, Combine Control Systems AB
#
# SYMPATHY FOR DATA COMMERCIAL LICENSE
# You should have received a link to the License with Sympathy for Data.
from abc import ABC, abstractmethod
import shutil
import re
import os
import os.path
from sympathy.api import node as synode
from sympathy.api import node_helper
from sympathy.api import datasource as dsrc
from sympathy.api.nodeconfig import Port, Ports, Tag, Tags, deprecated_node
from sympathy.api.exceptions import SyDataError
from sympathy.platform import exceptions
import sylib.url
DOCS = """
The destination location can either be specified in the node's configuration or
by adding the optional *File destination* input port. When the *File
destination* input port is used the destination is taken solely from that port
and not from the configuration.
When specifying the destination directory in the configuration, you can leave
it empty to use the directory of the source file as destination directory.
Missing directories will be created if needed.
Renaming files with a regex
---------------------------
Instead of specifying a static destination in the configuration, you can also
specify a destination by enabling :guilabel:`Use regex` and writing a regex and
replacement string.
For example with the regex :regexp:`^folder_([0-9]+)/datafile.csv$` and
replacement string :regexp:`datafile_\1.csv`, the destination for a file at
location :file:`folder_1234/datafile.csv` would be :file:`datafile_1234.csv`.
For more information about how to write regex, see :ref:`appendix_regex`.
"""
related = [
'org.sysess.sympathy.files.copyfile',
'org.sysess.sympathy.files.movefile',
'org.sysess.sympathy.files.deletefile',
]
def check_files(datasources, no_error=False) -> list[str]:
res = []
for datasource in datasources:
try:
res.append(datasource._require_file())
except Exception:
if not no_error:
raise
return res
def check_file(datasource, no_error=False) -> str | None:
res = None
try:
res = datasource._require_file()
except Exception:
if not no_error:
raise
return res
def regex_parameters(parameters):
parameters.set_boolean(
'use_regex', label='Regex',
description='Turn on/off naming using a regular expression')
parameters.set_string(
'pattern', label='Search',
description=(
'Specify the regular expression that will be used for matching. '
'Learn more about Regular expression syntax in the documentation '
'appendix.'))
parameters.set_string(
'replace', label='Replace',
description=('The string to replace the match found with the regular '
'expression'))
return parameters
def exception_parameter(parameters):
parameters.set_boolean(
'error', label='Do not raise exceptions',
description='If a file operation fails, do not raise an exception')
return parameters
def output_filename(node_context, new_filename):
if new_filename:
ds = dsrc.File()
ds.encode_path(new_filename)
node_context.output['port1'].append(ds)
def get_newfilepath(
filename: str,
directory: str,
new_filename: str | None = None,
regex: tuple[str, str] | None = None,
) -> str:
if regex:
pattern, replace = regex
return re.sub(pattern, replace, filename)
if not directory:
# Empty directory parameter means same directory as source file
directory = os.path.dirname(filename)
if not new_filename:
# Empty filename parameter means same filename as source file
new_filename = filename
return os.path.join(os.path.abspath(directory),
os.path.basename(new_filename))
def add_copy_to_filename(filename: str, new_filename: str) -> str:
if (not new_filename or
os.path.abspath(filename) == os.path.abspath(new_filename)):
stem, ext = os.path.splitext(filename)
return stem + ' - Copy' + ext
return new_filename
def copy_file(
source_filepath: str,
dest_filepath: str,
no_error: bool = False,
) -> bool:
path = os.path.dirname(dest_filepath)
try:
try:
os.makedirs(path)
except OSError:
pass
try:
shutil.copyfile(source_filepath, dest_filepath)
except FileNotFoundError as exc:
raise SyDataError(f"File not found: {source_filepath}.") from exc
except Exception:
if no_error:
return False
raise
return True
def move_file(
source_filepath: str,
dest_filepath: str,
no_error: bool = False,
):
path = os.path.dirname(dest_filepath)
try:
os.makedirs(path)
except OSError:
pass
try:
try:
shutil.move(source_filepath, dest_filepath)
except FileNotFoundError as exc:
raise SyDataError(f"File not found: {source_filepath}.") from exc
except Exception:
if no_error:
return False
raise
return True
def delete_file(filename, delete_folder=False, no_error=False):
directory = os.path.dirname(filename) if delete_folder else None
try:
try:
os.remove(filename)
except FileNotFoundError as exc:
raise SyDataError(f"File not found: {filename}.") from exc
except Exception:
if no_error:
filename = None
else:
raise
if directory:
try:
os.removedirs(directory)
except Exception:
pass
return filename
def filename_columns(parameters):
parameters.set_list(
'current', label='Current filenames',
description='The column with the current file names',
value=[0], editor=synode.editors.combo_editor(edit=True, filter=True))
parameters.set_list(
'new', label='New filenames',
description='The column with the new filenames',
value=[0], editor=synode.editors.combo_editor(edit=True, filter=True))
return parameters
def get_file_lists(node_context):
parameters = node_context.parameters
file_table = node_context.input['port2']
columns = file_table.column_names()
try:
current_index = parameters['current'].value[0]
new_index = parameters['new'].value[0]
# Fix indices for old configurations
if parameters['current'].list[0] == '':
current_index -= 1
if parameters['current'].list[0] == '':
new_index -= 1
current_filenames = file_table.get_column_to_array(
columns[current_index])
new_filenames = file_table.get_column_to_array(
columns[new_index])
except IndexError:
return [], []
if (current_filenames.dtype.kind not in ('U', 'S') or
new_filenames.dtype.kind not in ('U', 'S')):
raise SyDataError(
'One or more of the input columns have the wrong type. '
'They should be text.')
return current_filenames, new_filenames
class SingleFileNode(ABC, synode.Node):
author = 'Alexander Busck & Andreas Tågerud'
tags = Tags(Tag.Disk.File)
inputs = Ports([
Port.Datasource('File source', name='port1', scheme='text'),
Port.Datasource('File destination', name='port2', scheme='text',
n=(0, 1, 0))])
outputs = Ports([Port.Datasource(
'File destination', name='port1', scheme='text')])
parameters = synode.parameters()
parameters.set_string(
'dest_folder', label='Directory',
editor=synode.editors.directory_editor(),
description='Manually enter a directory')
parameters.set_string(
'filename', label='Filename',
description=('Manually enter a filename, if not using a regular '
'expression'))
parameters = regex_parameters(parameters)
parameters = exception_parameter(parameters)
controller = synode.controller(
when=synode.field('use_regex', state='checked'),
action=(
synode.field('filename', state='disabled'),
synode.field('dest_folder', state='disabled'),
synode.field('pattern', state='enabled'),
synode.field('replace', state='enabled'),
),
)
def execute(self, node_context):
parameters = node_context.parameters
ds_in = node_context.input['port1']
ds_out_opt = node_context.input.group('port2')
no_error = parameters['error'].value
use_regex = parameters['use_regex'].value
pattern = parameters['pattern'].value
replace = parameters['replace'].value
in_filepath = check_file(ds_in, no_error)
if in_filepath is None:
return
if ds_out_opt:
new_filepath = check_file(
node_context.input['port2'], no_error)
if new_filepath is None:
return
else:
new_filepath = get_newfilepath(
in_filepath,
parameters['dest_folder'].value,
parameters['filename'].value,
regex=(pattern, replace) if use_regex else None)
new_filepath = self._add_copy_to_filename(
in_filepath, new_filepath)
if self._file_operation(in_filepath, new_filepath, no_error):
node_context.output['port1'].encode_path(new_filepath)
@abstractmethod
def _file_operation(
self, in_filepath: str, new_filepath: str, no_error: bool) -> bool:
return True
def _add_copy_to_filename(self, filename: str, new_filename: str) -> str:
return new_filename
class MultiFileNode(ABC, synode.Node):
author = 'Andreas Tågerud'
tags = Tags(Tag.Disk.File)
inputs = Ports([
Port.Datasources('File sources', name='port1'),
Port.Datasources('File destinations', name='port2', n=(0, 1, 0))])
outputs = Ports([
Port.Datasources('File destination', name='port1')])
parameters = synode.parameters()
parameters.set_string(
'dest_folder', label='Directory',
editor=synode.editors.directory_editor(),
description=('Manually enter a directory'))
parameters = regex_parameters(parameters)
parameters = exception_parameter(parameters)
controller = synode.controller(
when=synode.field('use_regex', state='checked'),
action=(
synode.field('dest_folder', state='disabled'),
synode.field('pattern', state='enabled'),
synode.field('replace', state='enabled'),
),
)
def execute(self, node_context):
parameters = node_context.parameters
no_error = parameters['error'].value
ds_in = node_context.input['port1']
inputs = node_context.input.group('port2')
use_regex = parameters['use_regex'].value
pattern = parameters['pattern'].value
replace = parameters['replace'].value
if inputs:
ds_out = node_context.input['port2']
for in_filename, new_filename in zip(
check_files(ds_in, no_error),
check_files(ds_out, no_error)):
if self._file_operation(in_filename, new_filename, no_error):
output_filename(node_context, new_filename)
else:
for in_filename in check_files(ds_in, no_error):
new_filename = get_newfilepath(
in_filename, parameters['dest_folder'].value,
regex=(pattern, replace) if use_regex else None)
new_filename = self._add_copy_to_filename(
in_filename, new_filename)
if self._file_operation(in_filename, new_filename, no_error):
output_filename(node_context, new_filename)
@abstractmethod
def _file_operation(
self, in_filename: str, new_filename: str, no_error: bool) -> bool:
return True
def _add_copy_to_filename(self, filename: str, new_filename: str) -> str:
"""Base class does not add anything to filename."""
return new_filename
[docs]
class CopyFile(SingleFileNode):
__doc__ = DOCS
name = 'Copy file'
description = (
'Copy a file to a destination specified in configuration or in '
'the optional input port.')
nodeid = 'org.sysess.sympathy.files.copyfile'
icon = 'copy.svg'
related = ['org.sysess.sympathy.files.copyfiles'] + related
def _file_operation(
self, in_filepath: str, new_filepath: str, no_error: bool) -> bool:
return copy_file(in_filepath, new_filepath, no_error)
def _add_copy_to_filename(self, filename: str, new_filename: str) -> str:
"""Add " - Copy" to filename if needed."""
return add_copy_to_filename(filename, new_filename)
[docs]
class CopyFiles(MultiFileNode):
__doc__ = DOCS
name = 'Copy files'
description = (
'Copy files to a destination specified in configuration or in '
'the optional input port.')
author = 'Andreas Tågerud'
nodeid = 'org.sysess.sympathy.files.copyfiles'
icon = 'copy.svg'
def _file_operation(
self, in_filename: str, new_filename: str, no_error: bool) -> bool:
return copy_file(in_filename, new_filename, no_error)
def _add_copy_to_filename(self, filename: str, new_filename: str) -> str:
"""Add " - Copy" to filename if needed."""
return add_copy_to_filename(filename, new_filename)
@deprecated_node('8.0.0', 'Copy Files')
class CopyFilesWithDatasources(synode.Node):
"""
Copies the input file datasources, to the locations designated in the
second datasources input, element by element. Missing directories will be
created if possible.
"""
name = 'Copy files with Datasources'
description = 'Copy files to another location using a table with paths'
author = 'Andreas Tågerud'
nodeid = 'org.sysess.sympathy.files.copyfileswithdsrc'
icon = 'copy.svg'
tags = Tags(Tag.Disk.File)
inputs = Ports([
Port.Datasources('Files to be copied', name='port1'),
Port.Datasources('File destinations to copy to', name='port2')])
outputs = Ports([Port.Datasources('Copied files', name='port1')])
parameters = synode.parameters()
parameters = exception_parameter(parameters)
def execute(self, node_context):
in_dss = node_context.input['port1']
out_dss = node_context.input['port2']
no_error = node_context.parameters['error'].value
for in_filename, new_filename in zip(check_files(in_dss, no_error),
check_files(out_dss, no_error)):
copy_file(in_filename, new_filename, no_error=no_error)
if new_filename:
out_file = dsrc.File()
out_file.encode_path(new_filename)
node_context.output['port1'].append(out_file)
[docs]
class MoveFile(SingleFileNode):
__doc__ = DOCS
name = 'Move File'
description = (
'Move a file to a destination specified in configuration or in '
'the optional input port.')
nodeid = 'org.sysess.sympathy.files.movefile'
icon = 'move.svg'
related = ['org.sysess.sympathy.files.movefiles'] + related
def _file_operation(
self, in_filepath: str, new_filepath: str, no_error: bool) -> bool:
return move_file(in_filepath, new_filepath, no_error)
[docs]
class MoveFiles(MultiFileNode):
__doc__ = DOCS
name = 'Move Files'
description = (
'Move files to a destination specified in configuration or in '
'the optional input port.')
author = 'Andreas Tågerud'
nodeid = 'org.sysess.sympathy.files.movefiles'
icon = 'move.svg'
def _file_operation(
self, in_filename: str, new_filename: str, no_error: bool) -> bool:
return move_file(in_filename, new_filename, no_error)
[docs]
class DeleteFile(synode.Node):
"""Deletes one file."""
name = 'Delete file'
description = 'Delete a file'
author = 'Magnus Sandén & Andreas Tågerud'
nodeid = 'org.sysess.sympathy.files.deletefile'
icon = 'delete.svg'
tags = Tags(Tag.Disk.File)
inputs = Ports([Port.Datasource('File to delete', name='port1')])
outputs = Ports([Port.Datasource('Path to deleted file', name='port1')])
parameters = synode.parameters()
parameters.set_boolean(
'delete_folder', label='Delete enclosing folder if empty',
description=(
'If a file that is removed is the last in that folder, '
'the folder is removed. If this operation fails, '
'no exception is raised.'))
parameters = exception_parameter(parameters)
def execute(self, node_context):
if not node_context.input['port1'].decode_path():
# Legacy support.
return
parameters = node_context.parameters
no_error = parameters['error'].value
for filename in check_files([node_context.input['port1']], no_error):
del_file = delete_file(
filename, parameters['delete_folder'].value,
no_error=parameters['error'].value)
if del_file:
ds = dsrc.File()
ds.encode_path(del_file)
node_context.output['port1'].encode_path(del_file)
[docs]
@node_helper.list_node_decorator(['port1'], ['port1'])
class DeleteFiles(DeleteFile):
name = 'Delete files'
nodeid = 'org.sysess.sympathy.files.deletefiles'
[docs]
class DownloadFile(synode.Node):
"""
Download the data at URL to specified local file.
If URL resource contains credential variables for login or token
credentials these will be entered as part of the URL.
See :ref:`Credentials Preferences<preferences_credentials>` for more info.
"""
name = 'Download URL to file'
description = 'Download file from a URL to specified filename.'
author = 'Erik der Hagopian, '
nodeid = 'org.sysess.sympathy.files.downloadfile'
icon = 'copy.svg'
tags = Tags(Tag.Disk.File)
inputs = Ports([Port.Datasource(
'Datasource pointing to data to read', name='port1', scheme='text')])
outputs = Ports([Port.Datasource(
'Datasource of resulting file', name='port1', scheme='text')])
parameters = synode.parameters()
parameters.set_string(
'filename', label='Filename',
editor=synode.editors.savename_editor(['Any files (*)']),
description=('Manually enter a filename, if not using a regular '
'expression'))
parameters.set_string(
'if_exists', label='If file already exists', value='Overwrite',
description=('What to do if the file already exists'),
editor=synode.editors.combo_editor(
options=['Skip file', 'Overwrite', 'Raise exception']))
def execute(self, node_context):
parameters = node_context.parameters
input_datasource = node_context.input['port1']
output_datasource = node_context.output['port1']
filename = parameters['filename'].value
if_exists = parameters['if_exists'].value
if os.path.isfile(filename):
if if_exists == 'Skip file':
output_datasource.encode_path(filename)
return
elif if_exists == 'Raise exception':
raise SyDataError(
'File {} already exists'.format(filename))
if input_datasource.decode_type() == output_datasource.modes.url:
if not filename:
raise exceptions.filename_not_empty()
try:
output_filename = sylib.url.download_url_with_credentials(
self,
input_datasource.connection(), input_datasource['env'],
filename=filename)
except sylib.url.RequestError as e:
raise SyDataError(f"Download failed due to {e}") from e
output_datasource.source(dsrc.File.from_filename(output_filename))
else:
raise SyDataError('Only URL datasources can be downloaded')