import abc
import mimetypes
import os
import warnings
from collections import Counter, MutableSet, OrderedDict
from .common import (FileType, PipelineFilePublishType, PipelineFileCheckType, validate_addition_publishtype,
validate_checkresult, validate_deletion_publishtype, validate_publishtype,
validate_settable_checktype)
from .exceptions import AttributeValidationError, DuplicatePipelineFileError, MissingFileError
from .schema import validate_check_params
from ..util import (IndexedSet, classproperty, ensure_regex_list, format_exception, get_file_checksum,
iter_public_attributes, matches_regexes, rm_f, slice_sequence, validate_bool, validate_callable,
validate_int, validate_mapping, validate_nonstring_iterable, validate_regexes,
validate_relative_path_attr, validate_string, validate_type)
__all__ = [
'PipelineFileCollection',
'PipelineFile',
'RemotePipelineFile',
'RemotePipelineFileCollection',
'ensure_pipelinefilecollection',
'ensure_remotepipelinefilecollection',
'validate_pipelinefilecollection',
'validate_pipelinefile_or_pipelinefilecollection',
'validate_pipelinefile_or_string'
]
[docs]def ensure_pipelinefilecollection(o):
"""Function to accept either a single PipelineFile OR a PipelineFileCollection and ensure that a
PipelineFileCollection object is returned in either case
:param o: PipelineFile or PipelineFileCollection object
:return: PipelineFileCollection object
"""
validate_pipelinefile_or_pipelinefilecollection(o)
return o if isinstance(o, PipelineFileCollection) else PipelineFileCollection(o)
[docs]def ensure_remotepipelinefilecollection(o):
"""Function to accept either a single RemotePipelineFile OR a RemotePipelineFileCollection and ensure that a
RemotePipelineFileCollection object is returned in either case
:param o: RemotePipelineFile or RemotePipelineFileCollection object
:return: RemotePipelineFileCollection object
"""
validate_remotepipelinefile_or_remotepipelinefilecollection(o)
return o if isinstance(o, RemotePipelineFileCollection) else RemotePipelineFileCollection(o)
class PipelineFileBase(object, metaclass=abc.ABCMeta):
"""A base class to represent a "pipeline file", which consists of a local path and a remote "destination path"
"""
__slots__ = ['_file_checksum', '_dest_path', '_local_path', '_extension', '_name', 'file_type']
def __init__(self, local_path, dest_path=None):
self._local_path = local_path
self._dest_path = dest_path
self._name = None
self._file_checksum = None
self._set_local_file_attributes()
def _set_local_file_attributes(self):
if self.local_path:
_, self._extension = os.path.splitext(self.local_path)
self.file_type = FileType.get_type_from_extension(self._extension)
else:
self._extension = None
self.file_type = FileType.UNKNOWN
def __eq__(self, other):
if isinstance(other, type(self)):
return self._key() == other._key()
return False
def __hash__(self):
return hash(self._key())
@abc.abstractmethod
def _key(self):
raise NotImplementedError
def __iter__(self):
return iter_public_attributes(self)
def __repr__(self): # pragma: no cover
return "{name}({repr})".format(name=self.__class__.__name__, repr=repr(dict(self)))
def __str__(self):
return "{name}({str})".format(name=self.__class__.__name__, str=dict(self))
#
# Static properties (read-only, should never change during the lifecycle of the object)
#
@property
def extension(self):
return self._extension
@property
def file_checksum(self):
if self._file_checksum is None:
try:
self._file_checksum = get_file_checksum(self._local_path)
except (IOError, OSError) as e:
raise MissingFileError(
"failed to determine checksum for RemoteFile '{local_path}'. {e}".format(
local_path=self._local_path,
e=format_exception(e)))
return self._file_checksum
@property
def name(self):
return self._name
@property
def local_path(self):
return self._local_path
@property
def dest_path(self):
return self._dest_path
[docs]class RemotePipelineFile(PipelineFileBase):
"""Implementation of PipelineFileBase to represents a single *remote* file. This is used to provide a common
interface for a remote file, to facilitate querying and downloading operations where the *current* state
of the storage is relevant information
"""
__slots__ = ['_last_modified', '_size']
def __init__(self, dest_path, local_path=None, name=None, last_modified=None, size=None):
super().__init__(local_path, dest_path)
self._name = name if name is not None else os.path.basename(dest_path)
self._last_modified = last_modified
self._size = size
def __getattr__(self, item):
# backwards compatibility for code expecting items to be a key/value pair like "(dest_path, metadata_dict)"
return getattr(self.dest_path, item)
def __getitem__(self, item):
# backwards compatibility for code expecting items to be a key/value pair like "(dest_path, metadata_dict)"
return self.dest_path.__getitem__(item)
[docs] @classmethod
def from_pipelinefile(cls, pipeline_file):
"""Construct a RemotePipelineFile instance from an existing PipelineFile instance
Note: the local_path is deliberately *not* set in the new instance, primarily to avoid accidental overwriting of
local files in case of conversion to a remote file and having the remote file downloaded. The local_path
attribute may still be set, but it is an explicit operation.
:param pipeline_file: PipelineFile instance used to instantiate a RemotePipelineFile instance
:return: RemotePipelineFile instance
"""
return cls(dest_path=pipeline_file.dest_path, local_path=None, name=pipeline_file.name)
def _key(self):
return self.name, self.dest_path
@property
def file_checksum(self):
# override superclass property to not attempt to checksum the file if it has no local path
# (i.e. has not been downloaded)
if self.local_path is None:
return None
return super().file_checksum
@property
def last_modified(self):
return self._last_modified
@property
def size(self):
return self._size
@PipelineFileBase.local_path.setter
def local_path(self, local_path):
self._local_path = local_path
# reset file_checksum to None, so that it will be re-evaluated lazily if required
self._file_checksum = None
self._set_local_file_attributes()
[docs] def remove_local(self):
_local_path = self.local_path
self.local_path = None
rm_f(_local_path)
[docs]class PipelineFile(PipelineFileBase):
"""Represents a single file in order to store state information relating to the intended actions to be performed
on the file, and the actions that *were* performed on the file
:param local_path: absolute source path to the file being represented
:type local_path: :py:class:`str`
:param name: arbitrary name (defaults to the output of :py:func:`os.path.basename` on local_path)
:type name: :py:class:`str`
:param archive_path: relative path used when archiving the file
:type archive_path: :py:class:`str`
:param dest_path: relative path used when publishing the file
:type dest_path: :py:class:`str`
:param is_deletion: flag designating whether this is a deletion
:type is_deletion: :py:class:`bool`
:param late_deletion: flag to indicate that this file should be deleted *after* additions are performed (note: ignored if `is_deletion=False`)
:type late_deletion: :py:class:`bool`
:param file_update_callback: optional callback to call when a file property is updated
:type file_update_callback: :py:class:`callable`
:param check_type: check type assigned to the file
:type check_type: PipelineFileCheckType
:param publish_type: publish type assigned to the file
:type publish_type: PipelineFilePublishType
"""
__slots__ = ['_archive_path', '_file_update_callback', '_check_type', '_is_deletion', '_late_deletion',
'_publish_type', '_should_archive', '_should_harvest', '_should_store', '_should_undo', '_is_checked',
'_is_archived', '_is_harvested', '_is_overwrite', '_is_stored', '_is_harvest_undone',
'_is_upload_undone', '_check_result', '_mime_type']
def __init__(self, local_path, name=None, archive_path=None, dest_path=None, is_deletion=False,
late_deletion=False, file_update_callback=None, check_type=None, publish_type=None):
super().__init__(local_path, dest_path)
# general file attributes, set from parameters
self._archive_path = archive_path
self._is_deletion = is_deletion
self._late_deletion = late_deletion
self._name = name if name is not None else os.path.basename(local_path)
# general file attributes, *not* set from parameters
self._check_result = None
self._mime_type = None
# processing flags - these express the *intended actions* for the file
self._should_archive = False
self._should_harvest = False
self._should_store = False
self._should_undo = False
# status flags - these express the *current state* of the file
self._is_checked = False
self._is_archived = False
self._is_harvested = False
self._is_overwrite = None
self._is_stored = False
self._is_harvest_undone = False
self._is_upload_undone = False
# attributes which must be assigned by the property setter for validation. The backing variable is intentionally
# initialised to a safe default, before the setter is called if the calling code has supplied a value for the
# corresponding parameters
self._file_update_callback = None
if file_update_callback is not None:
self.file_update_callback = file_update_callback
self._check_type = PipelineFileCheckType.UNSET
if check_type is not None:
self.check_type = check_type
self._publish_type = PipelineFilePublishType.UNSET
if publish_type is not None:
self.publish_type = publish_type
[docs] @classmethod
def from_remotepipelinefile(cls, remotepipelinefile, name=None, is_deletion=False, late_deletion=False,
file_update_callback=None, check_type=None, publish_type=None):
"""Construct a PipelineFile instance from an existing RemotePipelineFile instance
:param remotepipelinefile: RemotePipelineFile instance used to instantiate a PipelineFile instance
:param name: name flag passed to __init__ (defaults to remotepipelinefile.name)
:param is_deletion: is_deletion flag passed to __init__
:param late_deletion: late_deletion flag passed to __init__
:param file_update_callback: file_update_callback flag passed to __init__
:param check_type: check_type flag passed to __init__
:param publish_type: publish_type flag passed to __init__
:return: PipelineFile instance
"""
name = name or remotepipelinefile.name
return cls(local_path=remotepipelinefile.local_path, dest_path=remotepipelinefile.dest_path,
name=name, is_deletion=is_deletion, late_deletion=late_deletion,
file_update_callback=file_update_callback, check_type=check_type, publish_type=publish_type)
def _key(self):
return self.name, self.local_path, self.file_checksum
@property
def src_path(self):
return self._local_path
@property
def file_checksum(self):
# override superclass property to handle deletions (which have no local_path and therefore can't be summed)
if self.is_deletion:
return None
return super().file_checksum
#
# State properties (may change during the lifecycle of the object to reflect the current state)
#
@property
def archive_path(self):
return self._archive_path
@archive_path.setter
def archive_path(self, archive_path):
validate_relative_path_attr(archive_path, 'archive_path')
self._archive_path = archive_path
self._post_property_update({'archive_path': archive_path})
@property
def check_log(self):
return '' if self._check_result is None else os.linesep.join(self._check_result.log)
@property
def check_passed(self):
return 'N/A' if self._check_result is None else str(self._check_result.compliant)
@property
def check_result(self):
return self._check_result
@check_result.setter
def check_result(self, check_result):
validate_checkresult(check_result)
self._is_checked = True
self._check_result = check_result
self._post_property_update({'is_checked': True})
@property
def check_type(self):
return self._check_type
@check_type.setter
def check_type(self, check_type):
if self.is_deletion:
raise ValueError('deletions cannot be assigned a check_type')
validate_settable_checktype(check_type)
self._check_type = check_type
self._post_property_update({'check_type': check_type.name})
@property
def dest_path(self):
return self._dest_path
@dest_path.setter
def dest_path(self, dest_path):
validate_relative_path_attr(dest_path, 'dest_path')
self._dest_path = dest_path
self._post_property_update({'dest_path': dest_path})
@property
def file_update_callback(self):
return self._file_update_callback
@file_update_callback.setter
def file_update_callback(self, callback):
validate_callable(callback)
self._file_update_callback = callback
@property
def is_harvested(self):
return self._is_harvested
@is_harvested.setter
def is_harvested(self, is_harvested):
validate_bool(is_harvested)
self._is_harvested = is_harvested
self._post_property_update({'is_harvested': is_harvested})
@property
def is_archived(self):
return self._is_archived
@is_archived.setter
def is_archived(self, is_archived):
validate_bool(is_archived)
self._is_archived = is_archived
self._post_property_update({'is_archived': is_archived})
@property
def is_checked(self):
return self._is_checked
@property
def is_deletion(self):
return self._is_deletion
@property
def late_deletion(self):
return self._late_deletion
@property
def is_deleted(self):
return self.is_deletion and self.is_stored
@property
def is_overwrite(self):
return self._is_overwrite
@is_overwrite.setter
def is_overwrite(self, is_overwrite):
validate_bool(is_overwrite)
self._is_overwrite = is_overwrite
self._post_property_update({'is_overwrite': is_overwrite})
@property
def is_stored(self):
return self._is_stored
@property
def is_harvest_undone(self):
return self._is_harvest_undone
@is_harvest_undone.setter
def is_harvest_undone(self, is_harvest_undone):
validate_bool(is_harvest_undone)
self._is_harvest_undone = is_harvest_undone
self._post_property_update({'is_harvest_undone': is_harvest_undone})
@property
def is_upload_undone(self):
return self._is_upload_undone
@is_upload_undone.setter
def is_upload_undone(self, is_upload_undone):
validate_bool(is_upload_undone)
self._is_upload_undone = is_upload_undone
self._post_property_update({'is_upload_undone': is_upload_undone})
@is_stored.setter
def is_stored(self, is_stored):
validate_bool(is_stored)
self._is_stored = is_stored
self._post_property_update({'is_stored': is_stored})
@property
def is_uploaded(self):
return not self.is_deletion and self.is_stored
@property
def mime_type(self):
if not self._mime_type:
self._mime_type = self.file_type.mime_type or mimetypes.types_map.get(self.extension,
'application/octet-stream')
return self._mime_type
@mime_type.setter
def mime_type(self, mime_type):
validate_string(mime_type)
self._mime_type = mime_type
self._post_property_update({'mime_type': mime_type})
@property
def published(self):
stored = self.is_stored and not self.is_upload_undone
harvested = self.is_harvested and not self.is_harvest_undone
if self.should_store and self.should_harvest:
published = stored and harvested
else:
published = stored or harvested
return 'Yes' if published else 'No'
@property
def pending_archive(self):
return self.should_archive and not self.is_archived
@property
def pending_harvest(self):
return self.should_harvest and not self.is_harvested and not self.should_undo
@property
def pending_harvest_addition(self):
return self.pending_harvest and not self.is_deletion
@property
def pending_harvest_deletion(self):
return self.pending_harvest and self.is_deletion
@property
def pending_harvest_early_deletion(self):
return self.pending_harvest and self.is_deletion and not self.late_deletion
@property
def pending_harvest_late_deletion(self):
return self.pending_harvest and self.is_deletion and self.late_deletion
@property
def pending_harvest_undo(self):
return self.should_undo and self.should_harvest and not self.is_harvest_undone
@property
def pending_store(self):
return self.should_store and not self.is_stored and not self.should_undo
@property
def pending_store_addition(self):
return self.pending_store and not self.is_deletion
@property
def pending_store_deletion(self):
return self.pending_store and self.is_deletion
@property
def pending_store_undo(self):
return self.should_undo and self.should_store and not self.is_upload_undone
@property
def pending_undo(self):
return self.pending_harvest_undo or self.pending_store_undo
@property
def publish_type(self):
return self._publish_type
@publish_type.setter
def publish_type(self, publish_type):
"""Publish type is a special property which allows handler sub-classes to specify in a single line what
"publishing" actions should be performed on a given file. For this reason, the boolean flags are read-only
properties and only intended to be changed via this property.
:param publish_type: an element of the PipelineFilePublishType enum
:return: None
"""
validate_publishtype(publish_type)
validate_value_func = validate_deletion_publishtype if self.is_deletion else validate_addition_publishtype
validate_value_func(publish_type)
self._should_archive = publish_type.is_archive_type
self._should_harvest = publish_type.is_harvest_type
self._should_store = publish_type.is_store_type
self._publish_type = publish_type
self._post_property_update({'publish_type': publish_type.name})
@property
def should_archive(self):
return self._should_archive
@property
def should_store(self):
return self._should_store
@property
def should_harvest(self):
return self._should_harvest
@property
def should_undo(self):
return self._should_undo
@should_undo.setter
def should_undo(self, should_undo):
validate_bool(should_undo)
if self.is_deletion:
raise ValueError('undo is not possible for deletions')
self._should_undo = should_undo
self._post_property_update({'should_undo': should_undo})
def _post_property_update(self, properties, include_values=True):
"""Method run after a property is updated in order to perform optional actions such as updating ORM (if enabled)
and running the update callback (if set)
:param properties: dict containing the updated properties and their new values (used to update ORM)
:return: None
"""
validate_mapping(properties)
if self.file_update_callback:
log_output = properties if include_values else list(properties.keys())
self.file_update_callback(name=self.name, is_deletion=self.is_deletion,
message="{properties}".format(properties=log_output))
class PipelineFileCollectionBase(MutableSet, metaclass=abc.ABCMeta):
"""A collection base class which implements the MutableSet abstract base class to allow clean set operations, but
limited to containing only :py:class:`PipelineFile` or :py:class:`RemotePipelineFile`elements and providing specific
functionality for handling a collection of them (e.g. filtering, generating tabular data, etc.)
:param data: data to add during initialisation of the collection, either a single :py:class:`PipelineFile` or file
path, or an :py:class:`Iterable` whose elements are :py:class:`PipelineFile` instances or file paths
:param validate_unique: :py:class:`bool` passed to the `add` method
:type data: :py:class:`PipelineFile`, :py:class:`RemotePipelineFile`, :py:class:`str`, :py:class:`Iterable`
"""
__slots__ = ['_s']
def __init__(self, data=None, validate_unique=True):
super().__init__()
self._s = IndexedSet()
if data is not None:
if isinstance(data, (self.member_class, str)):
data = [data]
for f in data:
self.add(f, validate_unique=validate_unique)
@property
@abc.abstractmethod
def member_class(cls):
raise NotImplementedError
@property
@abc.abstractmethod
def member_from_string_method(self):
raise NotImplementedError
@property
@abc.abstractmethod
def member_validator(cls):
raise NotImplementedError
@property
@abc.abstractmethod
def unique_attributes(cls):
raise NotImplementedError
def __bool__(self):
return bool(self._s)
def __contains__(self, v):
element = v if isinstance(v, self.member_class) else self.member_from_string_method(v)
return element in self._s
def __getitem__(self, index):
result = self._s[index]
return self.__class__(result) if isinstance(result, IndexedSet) else result
def __iter__(self):
return iter(self._s)
def __len__(self):
return len(self._s)
def __repr__(self): # pragma: no cover
return "{name}({repr})".format(name=self.__class__.__name__, repr=repr(list(self._s)))
def add(self, pipeline_file, overwrite=False, validate_unique=True, **kwargs):
"""Add a file to the collection
:param pipeline_file: :py:class:`PipelineFile` or file path
:param kwargs: :py:class:`dict` additional keywords passed to to PipelineFileBase __init__ method
:param overwrite: :py:class:`bool` which, if True, will overwrite an existing matching file in the collection
:param validate_unique: :py:class:`bool` which, if True, will validate unique attributes when adding the file
:return: :py:class:`bool` which indicates whether the file was successfully added
"""
self.member_validator(pipeline_file)
validate_bool(overwrite)
if isinstance(pipeline_file, self.member_class):
fileobj = pipeline_file
else:
fileobj = self.member_class(pipeline_file, **kwargs)
result = fileobj not in self._s
if not result and not overwrite:
raise DuplicatePipelineFileError("{f.name} already in collection".format(f=fileobj))
if overwrite:
self._s.discard(fileobj)
result = True
if validate_unique:
for attribute in self.unique_attributes:
value = getattr(fileobj, attribute)
if value is not None:
self.validate_unique_attribute_value(attribute, value)
self._s.add(fileobj)
return result
# alias append to the add method
append = add
def discard(self, pipeline_file):
"""Remove an element from the collection. Do not raise an exception if absent.
:param pipeline_file: :py:class:`PipelineFile` or file path
:return: :py:class:`bool` which indicates whether the file was in the collection AND was successfully discarded
"""
self.member_validator(pipeline_file)
if isinstance(pipeline_file, self.member_class):
fileobj = pipeline_file
else:
fileobj = self.member_from_string_method(pipeline_file)
result = fileobj in self._s
self._s.discard(fileobj)
return result
def difference(self, sequence):
return self.__class__(self._s.difference(sequence))
def issubset(self, sequence):
return self._s.issubset(sequence)
def issuperset(self, sequence):
return self._s.issuperset(sequence)
def union(self, sequence):
if not all(isinstance(f, self.member_class) for f in sequence):
raise TypeError('invalid sequence, all elements must be PipelineFile objects')
return self.__class__(self._s.union(sequence))
def update(self, sequence, overwrite=False, validate_unique=True):
"""Add the elements of an existing :py:class:`Sequence` to this collection
:param sequence: :py:class:`Sequence` containing :py:class:`PipelineFile` or file path elements to be added to
the collection
:param overwrite: :param overwrite: :py:class:`bool` which, if True, will overwrite any existing matching files
in the collection
:param validate_unique: :py:class:`bool` which, if True, will validate unique attributes when adding the files
:return: :py:class:`bool` which indicates whether any files were successfully added
"""
validate_nonstring_iterable(sequence)
results = []
for item in sequence:
results.append(self.add(item, overwrite=overwrite, validate_unique=validate_unique))
return any(results)
def get_pipelinefile_from_dest_path(self, dest_path):
"""Get PipelineFile for a given src_path
:param dest_path: destination path string for which to retrieve corresponding :py:class:`RemotePipelineFile`
instance
:return: matching :py:class:`RemotePipelineFile` instance or :py:const:`None` if it is not in the collection
"""
pipeline_file = next((f for f in self._s if f.dest_path == dest_path), None)
return pipeline_file
def get_pipelinefile_from_src_path(self, src_path):
"""Get PipelineFile for a given src_path
:param src_path: source path string for which to retrieve corresponding :py:class:`PipelineFile` instances
:return: matching :py:class:`PipelineFile` instance or :py:const:`None` if it is not in the collection
"""
pipeline_file = next((f for f in self._s if f.local_path == src_path), None)
return pipeline_file
def get_slices(self, slice_size):
"""Slice this collection into a list of :py:class:`PipelineFileCollections` with maximum length of slice_size
:param slice_size: maximum length of each slice
:return: list containing the current object sliced into new :py:class:`PipelineFileCollection` instances of max
length slice_size
"""
validate_int(slice_size)
return slice_sequence(self, slice_size)
def filter_by_attribute_id(self, attribute, value):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where the id of the given attribute
*is* the given id (i.e. refers to the same object)
:param attribute: attribute by which to filter :py:class:`PipelineFile` instances
:param value: attribute id to filter on
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile` instances with the given
attribute matching the given value
"""
collection = self.__class__((f for f in self._s if getattr(f, attribute) is value), validate_unique=False)
return collection
def filter_by_attribute_id_not(self, attribute, value):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where the id of the given attribute
is *not* the given id (i.e. refers to the same object)
:param attribute: attribute by which to filter :py:class:`PipelineFile` instances
:param value: attribute id to filter on
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile` instances with the given
attribute not matching the given value
"""
collection = self.__class__((f for f in self._s if getattr(f, attribute) is not value), validate_unique=False)
return collection
def filter_by_attribute_value(self, attribute, value):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where the value of the given
attribute is equal to the given value
:param attribute: attribute by which to filter :py:class:`PipelineFile` instances
:param value: attribute value to filter on
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile`instances with the given
attribute matching the given value
"""
collection = self.__class__((f for f in self._s if getattr(f, attribute) == value), validate_unique=False)
return collection
def filter_by_attribute_regexes(self, attribute, regexes):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where the value of the named
attribute matches a given regex pattern
:param attribute: attribute to filter on
:param regexes: regex pattern(s) by which to filter PipelineFiles
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile` instances with the
attribute matching the given pattern
"""
regexes = ensure_regex_list(regexes)
collection = self.__class__(
(f for f in self._s if matches_regexes(getattr(f, attribute), include_regexes=regexes)),
validate_unique=False
)
return collection
# add method alias for backwards compatibility
filter_by_attribute_regex = filter_by_attribute_regexes
def filter_by_bool_attribute(self, attribute):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where the named attribute resolves
to True
:param attribute: attribute by which to filter :py:class:`PipelineFile` instances
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile` instances with a True value
for the given attribute
"""
collection = self.__class__((f for f in self._s if getattr(f, attribute)), validate_unique=False)
return collection
def filter_by_bool_attribute_not(self, attribute):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where the named attribute resolves
to False
:param attribute: attribute by which to filter :py:class:`PipelineFile` instances
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile` instances with a False
value for the given attribute
"""
collection = self.__class__((f for f in self._s if not getattr(f, attribute)), validate_unique=False)
return collection
def filter_by_bool_attributes_and(self, *attributes):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where *all* of the named attributes
resolve to True
:param attributes: attributes by which to filter :py:class:`PipelineFile` instances
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile` instances with a True value
for all of the given attributes
"""
attributes_set = set(attributes)
def all_attributes_true(pf):
return all(getattr(pf, a) for a in attributes_set)
collection = self.__class__((f for f in self._s if all_attributes_true(f)), validate_unique=False)
return collection
def filter_by_bool_attributes_and_not(self, true_attributes, false_attributes):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where *all* of the named
true_attributes have a value of True and all of the false_attributes have a value of False
:param true_attributes: attributes which *must* be True
:param false_attributes: attributes which *must* be False
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile` instances with a True value
for all attributes named in true_attributes and a False value for all attributes named in false_attributes
"""
if isinstance(true_attributes, str):
true_attributes = [true_attributes]
if isinstance(false_attributes, str):
false_attributes = [false_attributes]
true_attributes_set = set(true_attributes)
false_attributes_set = set(false_attributes)
def check_true_attributes(pf):
return all(getattr(pf, a) for a in true_attributes_set)
def check_false_attributes(pf):
return not any(getattr(pf, a) for a in false_attributes_set)
collection = self.__class__(
(f for f in self._s if check_true_attributes(f) and check_false_attributes(f)),
validate_unique=False
)
return collection
def filter_by_bool_attributes_not(self, *attributes):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where *all* of the named attributes
resolve to False
:param attributes: attributes by which to filter :py:class:`PipelineFile` instances
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile` instances with a False
value for all of the given attributes
"""
attributes_set = set(attributes)
def no_attributes_true(pf):
return not any(getattr(pf, a) for a in attributes_set)
collection = self.__class__((f for f in self._s if no_attributes_true(f)), validate_unique=False)
return collection
def filter_by_bool_attributes_or(self, *attributes):
"""Return a new :py:class:`PipelineFileCollection` containing only elements where *any* of the named attributes
resolve to True
:param attributes: attributes by which to filter :py:class:`PipelineFile` instances
:return: :py:class:`PipelineFileCollection` containing only :py:class:`PipelineFile` instances with a True value
for any of the given attributes
"""
attributes_set = set(attributes)
def any_attributes_true(pf):
return any(getattr(pf, a) for a in attributes_set)
collection = self.__class__((f for f in self._s if any_attributes_true(f)), validate_unique=False)
return collection
def get_attribute_list(self, attribute):
"""Return a :py:class:`list` containing the given attribute from each PipelineFile in the collection
:param attribute: the attribute name to retrieve from the objects
:return: :py:class:`list` containing the value of the given attribute for each file in the collection
"""
return [getattr(f, attribute) for f in self._s]
def get_table_data(self):
"""Return :py:class:`PipelineFile` members in a simple tabular data format suitable for rendering into formatted
tables
:return: a :py:class:`tuple` with the first element being a list of columns, and the second being a 2D list of
the data
"""
data = [OrderedDict(e) for e in self._s]
try:
columns = list(data[0].keys())
except IndexError:
columns = []
return columns, data
def validate_unique_attribute_value(self, attribute, value):
"""Check that a given value is not already in the collection for the given :py:class:`PipelineFile` attribute,
and raise an exception if it is
This is intended for the use case of when an *intended* value is known for a particular attribute, and it is
desirable to check uniqueness before setting it (e.g. when adding new files to the collection).
:param attribute: the attribute to check
:param value: the value being tested for uniqueness for the given attribute
:return: None
"""
duplicates = [f for f in self._s if getattr(f, attribute) == value]
if duplicates:
raise AttributeValidationError(
"{attribute} value '{value}' already set for file(s) '{duplicates}'".format(attribute=attribute,
value=value,
duplicates=duplicates))
def validate_attribute_value_matches_regexes(self, attribute, include_regexes):
"""Check that the given :py:class:`PipelineFile` attribute matches at least one of the given regexes for each
file in the collection and raise an exception if any have a non-matched value
:param attribute: the attribute to compare
:param include_regexes: list of regexes of which the attribute must match at least one
:return: None
"""
validate_regexes(include_regexes)
unmatched = {f.name: getattr(f, attribute)
for f in self._s
if not matches_regexes(getattr(f, attribute), include_regexes=include_regexes)}
if unmatched:
raise AttributeValidationError(
"invalid '{attribute}' values found for files: {unmatched}. Must match one of: {regexes}".format(
attribute=attribute, unmatched=unmatched, regexes=include_regexes))
def validate_attribute_uniqueness(self, attribute):
"""Check that the given :py:class:`PipelineFile` attribute is unique amongst all :py:class:`PipelineFile`
instances currently in the collection, and raise an exception if any duplicates are found
This is intended for the use case of a final sanity check of the collection before using it (e.g. before
progressing to the :ref:`publish` step).
:param attribute: the attribute to compare
:return: None
"""
counter = Counter(getattr(f, attribute) for f in self._s if getattr(f, attribute) is not None)
duplicate_values = [k for k, v in counter.items() if v > 1]
if duplicate_values:
duplicates = []
for value in duplicate_values:
duplicates.extend(f for f in self._s if getattr(f, attribute) == value)
raise AttributeValidationError(
"duplicate attribute '{attribute}' found for files '{duplicates}'".format(attribute=attribute,
duplicates=duplicates))
validate_remotepipelinefile_or_string = validate_type((RemotePipelineFile, str))
[docs]class RemotePipelineFileCollection(PipelineFileCollectionBase):
"""A PipelineFileCollectionBase subclass to hold a set of RemotePipelineFile instances
"""
[docs] @classproperty
def member_class(cls):
return RemotePipelineFile
@classproperty
def member_validator(cls):
return validate_remotepipelinefile_or_string
@property
def member_from_string_method(self):
return self.get_pipelinefile_from_dest_path
@classproperty
def unique_attributes(cls):
return 'local_path', 'dest_path'
[docs] @classmethod
def from_pipelinefilecollection(cls, pipelinefilecollection):
return cls(RemotePipelineFile.from_pipelinefile(f) for f in pipelinefilecollection)
[docs] def download(self, broker, local_path):
"""Helper method to download the current collection from a given broker to a given local path
:param broker: BaseStorageBroker subclass to download from
:param local_path: local path into which files are downloaded
:return: None
"""
warnings.warn("This method will be removed in a future version. Please update code to use "
"`StateQuery.download` instead.", DeprecationWarning)
broker.download(self, local_path)
[docs] def keys(self):
# backwards compatibility for code expecting broker query method to return a dict with keys being "dest_path"
return self.get_attribute_list('dest_path')
validate_pipelinefile_or_string = validate_type((PipelineFile, str))
[docs]class PipelineFileCollection(PipelineFileCollectionBase):
"""A PipelineFileCollectionBase subclass to hold a set of PipelineFile instances
"""
[docs] @classproperty
def member_class(cls):
return PipelineFile
@classproperty
def member_validator(cls):
return validate_pipelinefile_or_string
@property
def member_from_string_method(self):
return self.get_pipelinefile_from_src_path
@classproperty
def unique_attributes(cls):
return 'archive_path', 'dest_path'
[docs] @classmethod
def from_remotepipelinefilecollection(cls, remotepipelinefilecollection, are_deletions=False):
return cls(PipelineFile.from_remotepipelinefile(f, is_deletion=are_deletions)
for f in remotepipelinefilecollection)
[docs] def add(self, pipeline_file, is_deletion=False, overwrite=False, validate_unique=True, **kwargs):
self.member_validator(pipeline_file)
validate_bool(is_deletion)
if not isinstance(pipeline_file, self.member_class) and not is_deletion and not os.path.isfile(pipeline_file):
raise MissingFileError("file '{src}' doesn't exist".format(src=pipeline_file))
return super().add(pipeline_file, overwrite=overwrite, validate_unique=validate_unique, is_deletion=is_deletion,
**kwargs)
def _set_attribute(self, attribute, value):
for f in self._s:
setattr(f, attribute, value)
[docs] def set_archive_paths(self, archive_path_function):
"""Set archive_path attributes for each file in the collection
:param archive_path_function: function used to determine archive destination path
:return: None
"""
validate_callable(archive_path_function)
for f in self._s:
if f.archive_path is None and f.should_archive:
candidate_path = archive_path_function(f.src_path)
self.validate_unique_attribute_value('archive_path', candidate_path)
f.archive_path = candidate_path
[docs] def set_check_types(self, check_type):
"""Set check_type attributes for each file in the collection
:param check_type: :py:class:`PipefileFileCheckType` enum member
:return: None
"""
validate_settable_checktype(check_type)
additions = self.__class__(f for f in self._s if not f.is_deletion)
additions._set_attribute('check_type', check_type)
[docs] def set_dest_paths(self, dest_path_function):
"""Set dest_path attributes for each file in the collection
:param dest_path_function: function used to determine publishing destination path
:return: None
"""
validate_callable(dest_path_function)
for f in self._s:
if f.dest_path is None and any((f.should_store, f.should_harvest)):
candidate_path = dest_path_function(f.src_path)
self.validate_unique_attribute_value('dest_path', candidate_path)
f.dest_path = candidate_path
[docs] def set_bool_attribute(self, attribute, value):
"""Set a :py:class:`bool` attribute for each file in the collection
:param attribute: attribute to set
:param value: value to set the attribute
:return: None
"""
validate_bool(value)
self._set_attribute(attribute, value)
[docs] def set_publish_types(self, publish_type):
"""Set publish_type attributes for each file in the collection
:param publish_type: :py:class:`PipefileFilePublishType` enum member
:return: None
"""
validate_publishtype(publish_type)
self._set_attribute('publish_type', publish_type)
[docs] def set_string_attribute(self, attribute, value):
"""Set a string attribute for each file in the collection
:param attribute: attribute to set
:param value: value to set the attribute
:return: None
"""
validate_string(value)
self._set_attribute(attribute, value)
[docs] def set_file_update_callback(self, file_update_callback):
"""Set a callback function in each :py:class:`PipelineFile` in this collection
:param file_update_callback: callback (function)
:return: None
"""
for f in self._s:
f.file_update_callback = file_update_callback
[docs] def set_default_check_types(self, check_params=None):
"""Set check_type attribute for each file in the collection to the default value, based on the file type and
presence of compliance checker checks in the check parameters
:param check_params: :py:class:`dict` or None
:return: None
"""
if check_params is None:
check_params = {}
else:
validate_check_params(check_params)
checks = check_params.get('checks', ())
all_additions = self.__class__(f for f in self._s if not f.is_deletion)
netcdf_additions = self.__class__(f for f in all_additions if f.file_type is FileType.NETCDF)
non_netcdf_additions = all_additions.difference(netcdf_additions)
netcdf_check_type = PipelineFileCheckType.NC_COMPLIANCE_CHECK if checks else PipelineFileCheckType.FORMAT_CHECK
netcdf_additions.set_check_types(netcdf_check_type)
non_netcdf_additions.set_check_types(PipelineFileCheckType.FORMAT_CHECK)
[docs] def set_publish_types_from_regexes(self, include_regexes, exclude_regexes, addition_type, deletion_type):
"""Set publish_type attribute for each file in the collection depending on whether it is considered "included"
according to the regex parameters
:param include_regexes: regex(es) for which a file must match one or more to be included
:param exclude_regexes: regex(es) which will exclude an already included file
:param addition_type: :py:class:`PipefileFilePublishType` enum member set for included addition files
:param deletion_type: :py:class:`PipefileFilePublishType` enum member set for included deletion files
:return: None
"""
validate_regexes(include_regexes)
if exclude_regexes:
validate_regexes(exclude_regexes)
for f in self._s:
if matches_regexes(f.name, include_regexes, exclude_regexes):
f.publish_type = deletion_type if f.is_deletion else addition_type
validate_pipelinefilecollection = validate_type(PipelineFileCollection)
validate_pipelinefile_or_pipelinefilecollection = validate_type((PipelineFile, PipelineFileCollection))
validate_remotepipelinefilecollection = validate_type(RemotePipelineFileCollection)
validate_remotepipelinefile_or_remotepipelinefilecollection = validate_type((RemotePipelineFile,
RemotePipelineFileCollection))