Source code for aodncore.pipeline.steps.store

"""This module provides the step runner class for the "store" step, which is a sub-step of the :ref:`publish` step.

The step runner delegates the low level storage operations to an internal :py:class:`BaseStorageBroker` instance, and so
it's primary purpose is to abstract the storage operations from the :py:class:`HandlerBase` by providing an interface
similar to the other handler steps.
"""

from .basestep import BaseStepRunner
from ..files import ensure_pipelinefilecollection
from ..storage import get_storage_broker

__all__ = [
    'get_store_runner',
    'StoreRunner'

]


[docs]def get_store_runner(store_base_url, config, logger, archive_mode=False): """Factory function to return store runner class, with it's storage broker based on URL scheme :param store_base_url: URL base for storage location :param config: LazyConfigManager instance :param logger: Logger instance :param archive_mode: flag to indicate archive :return: StoreRunner instance """ broker = get_storage_broker(store_base_url) return StoreRunner(broker, config, logger, archive_mode)
[docs]class StoreRunner(BaseStepRunner): def __init__(self, broker, config, logger, archive_mode=False): super().__init__(config, logger) self.broker = broker self.archive_mode = archive_mode def __repr__(self): return "{self.__class__.__name__}(broker={self.broker})".format(self=self) @property def is_stored_attr(self): """PipelineFile attribute to flag completion of upload operation :return: bool """ return 'is_archived' if self.archive_mode else 'is_stored' @property def pending_addition_attr(self): return 'pending_archive' if self.archive_mode else 'pending_store_addition' @property def dest_path_attr(self): """PipelineFile attribute containing the destination path :return: bool """ return 'archive_path' if self.archive_mode else 'dest_path'
[docs] def set_is_overwrite(self, pipeline_files): """Set the "is_overwrite" attribute for each file in the given collection :param pipeline_files: collection to :return: None """ collection = ensure_pipelinefilecollection(pipeline_files) self.broker.set_is_overwrite(pipeline_files=collection, dest_path_attr=self.dest_path_attr)
[docs] def run(self, pipeline_files): """Execute the pending storage operation(s) for each file in the given collection :param pipeline_files: PipelineFileCollection instance :return: None """ collection = ensure_pipelinefilecollection(pipeline_files) additions = collection.filter_by_bool_attribute(self.pending_addition_attr) if additions: self.broker.upload(pipeline_files=additions, is_stored_attr=self.is_stored_attr, dest_path_attr=self.dest_path_attr) deletions = collection.filter_by_bool_attribute('pending_store_deletion') if deletions: self.broker.delete(pipeline_files=deletions, is_stored_attr='is_stored', dest_path_attr=self.dest_path_attr) undo_deletions = collection.filter_by_bool_attribute('pending_undo') if undo_deletions: self.broker.delete(pipeline_files=undo_deletions, is_stored_attr='is_upload_undone', dest_path_attr=self.dest_path_attr)