import logging
import logging.config
import os
import platform
from datetime import datetime
from tempfile import gettempdir
from transitions import Machine
from .common import FileType, HandlerResult, PipelineFilePublishType, PipelineFileCheckType, validate_publishtype
from .configlib import validate_lazyconfigmanager
from .destpath import get_path_function
from .exceptions import (PipelineProcessingError, HandlerAlreadyRunError, InvalidConfigError, InvalidInputFileError,
InvalidFileFormatError, MissingConfigParameterError, UnmatchedFilesError)
from .files import PipelineFile, PipelineFileCollection
from .log import SYSINFO, get_pipeline_logger
from .schema import (validate_check_params, validate_custom_params, validate_harvest_params, validate_notify_params,
validate_resolve_params)
from .statequery import StateQuery
from .steps import (get_check_runner, get_harvester_runner, get_notify_runner, get_resolve_runner, get_store_runner)
from ..util import (ensure_regex_list, ensure_writeonceordereddict, format_exception,
get_file_checksum, iter_public_attributes, lazyproperty, matches_regexes, merge_dicts,
validate_relative_path_attr, TemporaryDirectory, WfsBroker, DEFAULT_WFS_VERSION)
from aodncore import __version__ as _aodncore_version
__all__ = [
'HandlerBase'
]
FALLBACK_LOG_FORMAT = "%(asctime)s %(levelname)s [%(name)s] %(message)s"
FALLBACK_LOG_LEVEL = SYSINFO
[docs]class HandlerBase(object):
"""Base class for pipeline handler sub-classes.
Implements common handler methods and defines state machine states and transitions.
:param input_file: Path to the file being handled. A non-existent file will cause the handler to exit with an error
during the initialise step.
.. note:: :py:attr:`input_file` is the only positional argument. Other arguments may be provided in any order.
:type input_file: str
:param allowed_archive_path_regexes: List of allowed regular expressions of which
:py:attr:`PipelineFile.archive_path` must match at least one. If any non-matching values are found, the handler
will exit with an error during the publish step *before* publishing anything.
:type allowed_archive_path_regexes: list
:param allowed_dest_path_regexes: List of allowed regular expressions of which :py:attr:`PipelineFile.dest_path`
must match at least one. If any non-matching values are found, the handler will exit with an error during the
publish step *before* publishing anything.
:type allowed_dest_path_regexes: list
:param allowed_extensions: List of allowed extensions for :py:attr:`input_file`. Non-matching input files will cause
the handler to exit with an error during the initialise step.
:type allowed_extensions: list
:param allowed_regexes: List of allowed regular expressions for :py:attr:`input_file`. Non-matching input files will
cause the handler to exit with an error during the initialise step.
.. note:: :py:attr:`allowed_regexes` are checked *after* :py:attr:`allowed_extensions`
:type allowed_regexes: list
:param archive_input_file: Flags whether the original input file should be uploaded to the archive, the location of
which is configured by the environment configuration. The file will be archived at
ARCHIVE_URI/PIPELINE_NAME/BASENAME.
:type archive_input_file: bool
:param archive_path_function: See :py:attr:`dest_path_function`. This operates identically, except that it is used
to calculate the :py:attr:`PipelineFile.archive_path` attribute and that the path is relative to the
ARCHIVE_URI.
:type archive_path_function: str, function
:param celery_task: A Celery task object, in order for the handler instance to derive runtime information such as
the current task name and UUID.
.. note:: If absent (e.g. when unit testing), the handler will revert to having no task information available,
and will log output to standard output.
:type celery_task: :py:class:`celery.Task`
:param check_params: A dict containing parameters passed directly to the check step (e.g. compliance checker
suites). The structure of the dict is defined by the :const:`CHECK_PARAMS_SCHEMA` object in the
:py:mod:`aodncore.pipeline.schema` module.
:type check_params: :py:class:`dict`
:param config: A configuration object which the handler uses to retrieve configuration from it's environment. If
absent, the handler will exit with an error during the :py:meth:`__init__` method (i.e. will not
instantiate).
.. note:: While this attribute is mandatory, it is not generally required to supply it directly in normal use
cases, unless instantiating the handler class manually.
When deployed, the parameter is automatically included by the worker service configuration.
When testing, unit tests inheriting from :py:class:`HandlerTestCase` contain a pre-prepared config object
available as :attr:`self.config`. The :py:meth:`HandlerTestCase.run_handler` and
:py:meth:`HandlerTestCase.run_handler_with_exception` helper methods automatically assign the test config to
the handler being tested.
:type config: :py:class:`aodncore.pipeline.config.LazyConfigManager`
:param custom_params: A dict containing parameters which are ignored by the base class, but allow passing arbitrary
custom values to subclasses. The structure of the dict is defined by the :const:`CUSTOM_PARAMS_SCHEMA` object in
the :py:mod:`aodncore.pipeline.schema` module.
:type custom_params: :py:class:`dict`
:param dest_path_function: The function used to determine the :py:attr:`PipelineFile.dest_path` attribute, relative
to the UPLOAD_URI configuration item. If absent, the handler will attempt to use the :py:meth:`dest_path` method
in the handler itself. If a function is not found by either mechanism, the handler will exit with an error
during the initialise step.
.. note:: When the value is a string, it is assumed that it refers to the name of a function advertised in the
*pipeline.handlers* entry point group.
:type dest_path_function: :py:class:`str`, :py:class:`callable`
:param error_cleanup_regexes: A list of regular expressions which, when a cleanup policy of
DELETE_CUSTOM_REGEXES_FROM_ERROR_STORE is set, controls which files are deleted from the error store upon
successful execution of the handler instance
:type error_cleanup_regexes: :py:class:`list`
:param exclude_regexes: See :py:attr:`include_regexes`.
:type exclude_regexes: :py:class:`list`
:param harvest_params: A dict containing parameters passed directly to the harvest step (e.g. slice size,
undo behaviour). The structure of the dict is defined by the :py:const:`HARVEST_PARAMS_SCHEMA` object in the
:py:mod:`aodncore.pipeline.schema` module.
:type harvest_params: :py:class:`dict`
:param harvest_type: String to inform the :py:mod:`aodncore.pipeline.steps.harvest` step factory function which
HarvesterRunner implementation to use during the publish step.
.. note:: Currently the only valid value is 'talend', which is the default.
:type harvest_type: :py:class:`str`
:param include_regexes: A list of regexes which, when combined with :py:attr:`exclude_regexes`, determines which
files in the collection are assigned with the :py:attr:`default_addition_publish_type` or
:py:attr:`default_deletion_publish_type` types (depending on whether the file is an addition or a deletion). If
set, to be considered included, file paths must match one of the regexes in :attr:`include_regexes` but *not*
any of the regexes in :py:attr:`exclude_regexes`.
Files not matching the inclusion criteria will remain with a :attr:`publish_type` attribute of
:py:attr:`PipelineFilePublishType.NO_ACTION`, meaning they will be ignored by the publish step.
.. note:: If omitted, the default is to select *all* files in :py:attr:`file_collection` for publication.
.. note:: This relates only to the files in :py:attr:`file_collection`, and has no relation to the
:py:attr:`input_file` path, unless the input file is itself in the collection (e.g. when handling a single
file).
For example, a single '.nc' file could feasibly match the :py:attr:`allowed_extensions` for the handler, but
still be excluded by this mechanism once it is added to :py:attr:`file_collection` during the
:py:mod:`aodncore.pipeline.steps.resolve` step.
:type include_regexes: list
:param notify_params: A dict containing parameters passed directly to the :py:mod:`aodncore.pipeline.steps.notify`
step (e.g. owner/success/failure notify lists). The structure of the dict is defined by the
:py:const:`NOTIFY_PARAMS_SCHEMA` object in the :py:mod:`aodncore.pipeline.schema` module.
:type notify_params: :py:class:`dict`
:param upload_path: A string attribute to hold the original upload path of the :py:attr:`input_file`.
.. note:: This is intended for information purposes only (e.g. to appear in notification templates), since there
is a distinction between the original path, and the :py:attr:`input_file` as provided to the handler, which
represents where the file was moved to for processing.
:type upload_path: :py:class:`str`
:param resolve_params: A dict containing parameters passed directly to the resolve step (e.g. the root path
prepended to relative paths in manifest files). The structure of the dict is defined by the
:py:const:`RESOLVE_PARAMS_SCHEMA` object in the :py:mod:`aodncore.pipeline.schema` module.
:type resolve_params: :py:class:`dict`
"""
ordered_states = [
'HANDLER_INITIAL',
'HANDLER_INITIALISED',
'HANDLER_RESOLVED',
'HANDLER_PREPROCESSED',
'HANDLER_CHECKED',
'HANDLER_PROCESSED',
'HANDLER_PUBLISHED',
'HANDLER_POSTPROCESSED'
]
other_states = [
'HANDLER_NOTIFIED_SUCCESS',
'HANDLER_NOTIFIED_ERROR',
'HANDLER_COMPLETED_SUCCESS',
'HANDLER_COMPLETED_ERROR'
]
all_states = ordered_states[:]
all_states.extend(other_states)
ordered_transitions = [
{
'trigger': '_trigger_initialise',
'source': 'HANDLER_INITIAL',
'dest': 'HANDLER_INITIALISED',
'before': '_initialise'
},
{
'trigger': '_trigger_resolve',
'source': 'HANDLER_INITIALISED',
'dest': 'HANDLER_RESOLVED',
'before': '_resolve'
},
{
'trigger': '_trigger_preprocess',
'source': 'HANDLER_RESOLVED',
'dest': 'HANDLER_PREPROCESSED',
'before': 'preprocess'
},
{
'trigger': '_trigger_check',
'source': 'HANDLER_PREPROCESSED',
'dest': 'HANDLER_CHECKED',
'before': '_check'
},
{
'trigger': '_trigger_process',
'source': 'HANDLER_CHECKED',
'dest': 'HANDLER_PROCESSED',
'before': 'process'
},
{
'trigger': '_trigger_publish',
'source': 'HANDLER_PROCESSED',
'dest': 'HANDLER_PUBLISHED',
'before': '_publish'
},
{
'trigger': '_trigger_postprocess',
'source': 'HANDLER_PUBLISHED',
'dest': 'HANDLER_POSTPROCESSED',
'before': 'postprocess'
}
]
other_transitions = [
{
'trigger': '_trigger_notify_success',
'source': 'HANDLER_POSTPROCESSED',
'dest': 'HANDLER_NOTIFIED_SUCCESS',
'before': '_notify_success'
},
{
'trigger': '_trigger_notify_error',
'source': ordered_states, # note: reference to ordered_states list, not a string
'dest': 'HANDLER_NOTIFIED_ERROR',
'before': '_notify_error'
},
{
'trigger': '_trigger_complete_success',
'source': 'HANDLER_NOTIFIED_SUCCESS',
'dest': 'HANDLER_COMPLETED_SUCCESS',
'before': '_complete_success'
},
{
'trigger': '_trigger_complete_with_errors',
'source': 'HANDLER_NOTIFIED_ERROR',
'dest': 'HANDLER_COMPLETED_ERROR',
'before': '_complete_with_errors'
}
]
all_transitions = ordered_transitions[:]
all_transitions.extend(other_transitions)
def __init__(self, input_file,
allowed_archive_path_regexes=None,
allowed_dest_path_regexes=None,
allowed_extensions=None,
allowed_regexes=None,
archive_input_file=False,
archive_path_function=None,
celery_task=None,
check_params=None,
config=None,
custom_params=None,
dest_path_function=None,
error_cleanup_regexes=None,
exclude_regexes=None,
harvest_params=None,
harvest_type='talend',
include_regexes=None,
notify_params=None,
upload_path=None,
resolve_params=None
):
# property backing variables
self._config = None
self._default_addition_publish_type = PipelineFilePublishType.HARVEST_UPLOAD
self._default_deletion_publish_type = PipelineFilePublishType.DELETE_UNHARVEST
self._error = None
self._error_details = None
self._exclude_regexes = None
self._file_basename = None
self._file_checksum = None
self._file_collection = None
self._file_extension = None
self._file_type = None
self._include_regexes = None
self._input_file_archive_path = None
self._instance_working_directory = None
self._notification_results = None
self._is_archived = False
self._logger = None
self._result = HandlerResult.UNKNOWN
self._should_notify = None
self._start_time = datetime.now()
# public attributes
self.input_file = input_file
self.allowed_archive_path_regexes = allowed_archive_path_regexes
self.allowed_dest_path_regexes = allowed_dest_path_regexes
self.allowed_extensions = allowed_extensions
self.allowed_regexes = allowed_regexes
self.archive_input_file = archive_input_file
self.archive_path_function = archive_path_function
self.celery_task = celery_task
self.check_params = check_params
self.custom_params = custom_params
self.config = config
self.dest_path_function = dest_path_function
self.error_cleanup_regexes = error_cleanup_regexes
self.exclude_regexes = exclude_regexes
self.harvest_params = harvest_params
self.harvest_type = harvest_type
self.include_regexes = include_regexes
self.notify_params = notify_params
self.upload_path = upload_path
self.resolve_params = resolve_params
# private attributes
self._archive_path_function_ref = None
self._archive_path_function_name = None
self._dest_path_function_ref = None
self._dest_path_function_name = None
self._handler_run = False
self._machine = Machine(model=self, states=HandlerBase.all_states, initial='HANDLER_INITIAL',
auto_transitions=False, transitions=HandlerBase.all_transitions,
after_state_change='_after_state_change')
def __iter__(self):
ignored_attributes = {'celery_task', 'config', 'default_addition_publish_type', 'default_deletion_publish_type',
'input_file_object', 'logger', 'state', 'state_query', 'trigger'}
ignored_attributes.update("is_{state}".format(state=s) for s in self.all_states)
return iter_public_attributes(self, ignored_attributes)
def __str__(self):
return "{name}({attrs})".format(name=self.__class__.__name__, attrs=dict(self))
#
# public properties
#
@property
def celery_task_id(self):
"""Read-only property to access Celery task ID
:return: Celery task ID (if applicable)
:rtype: :class:`str`, :class:`None`
"""
return self._celery_task_id
@property
def celery_task_name(self):
"""Read-only property to access Celery task name
:return: Celery task name (if applicable)
:rtype: :class:`str`, :class:`None`
"""
return self._celery_task_name
@property
def config(self):
"""Property to access the :attr:`config` attribute
:return: configuration object
:rtype: :class:`aodncore.pipeline.config.LazyConfigManager`
"""
return self._config
@config.setter
def config(self, config):
validate_lazyconfigmanager(config)
self._config = config
@property
def error(self):
"""Read-only property to access :py:class:`Exception` object from handler instance
:return: the exception object which caused the handler to fail (if applicable)
:rtype: :class:`Exception`, :class:`None`
"""
return self._error
@property
def error_details(self):
"""Read-only property to retrieve string description of error from handler instance
:return: error description string (if applicable)
:rtype: :class:`str`, :class:`None`
"""
return self._error_details
@property
def exclude_regexes(self):
"""Property to manage exclude_regexes attribute
:return:
:rtype: :py:class:`list`
"""
return self._exclude_regexes
@exclude_regexes.setter
def exclude_regexes(self, regexes):
self._exclude_regexes = ensure_regex_list(regexes)
@property
def file_basename(self):
"""Read-only property to access the :py:attr:`input_file` basename
:return: :attr:`input_file` basename
:rtype: :class:`str`
"""
return self._file_basename
@property
def file_collection(self):
"""Read-only property to access the handler's primary PipelineFileCollection instance
:return: handler file collection
:rtype: :class:`PipelineFileCollection`
"""
return self._file_collection
@property
def file_checksum(self):
"""Read-only property to access the :py:attr:`input_file` checksum
:return: :attr:`input_file` checksum string
:rtype: :class:`str`
"""
return self._file_checksum
@property
def file_extension(self):
"""Read-only property to access the :py:attr:`input_file` extension
:return: :attr:`input_file` extension string
:rtype: :class:`str`
"""
return self._file_extension
@property
def file_type(self):
"""Read-only property to access the :py:attr:`input_file` type
:return: :attr:`input_file` type
:rtype: :class:`FileType`
"""
return self._file_type
@property
def include_regexes(self):
"""Property to manage include_regexes attribute
:return:
:rtype: :py:class:`list`
"""
return self._include_regexes
@include_regexes.setter
def include_regexes(self, regexes):
self._include_regexes = ensure_regex_list(regexes)
@property
def instance_working_directory(self):
"""Read-only property to retrieve the instance working directory
:return: string containing path to top level working directory for this instance
:rtype: :class:`str`, :class:`None`
"""
return self._instance_working_directory
@property
def input_file_archive_path(self):
"""Property used to determine the archive path for the original input file
:return: string containing the archive path
:rtype: :class:`str`
"""
if not self._input_file_archive_path:
self.input_file_archive_path = os.path.join(self._pipeline_name, os.path.basename(self.input_file))
return self._input_file_archive_path
@input_file_archive_path.setter
def input_file_archive_path(self, path):
validate_relative_path_attr(path, 'input_file_archive_path')
self._input_file_archive_path = path
@lazyproperty
def input_file_object(self):
"""Read-only property to access the original input file represented as a PipelineFile object
:return: input file object
:rtype: :py:class:`PipelineFile`
"""
input_file_object = PipelineFile(self.input_file, file_update_callback=self._file_update_callback)
return input_file_object
@property
def logger(self):
"""Read-only property to access the instance Logger
:return: :py:class:`Logger`
"""
if self._logger is None:
self._init_logger()
return self._logger
@lazyproperty
def versions(self):
"""Read-only property to access module versions
:return: module version strings for aodncore, aodndata and compliance checker modules
:rtype: :class:`dict`
"""
versions = {'python': platform.python_version(),
'aodncore': _aodncore_version}
loaded_versions, failed_versions = self.config.discovered_module_versions
failed_version_dict = dict.fromkeys(failed_versions, 'LOAD_FAILED')
versions.update(loaded_versions)
versions.update(failed_version_dict)
return versions
@property
def notification_results(self):
"""Read-only property to retrieve the notification results, including the sent status of each recipient
:return: list of :class:`aodncore.pipeline.steps.notify.NotifyRecipient` instances
:rtype: :class:`aodncore.pipeline.steps.notify.NotifyList`
"""
return self._notification_results
@property
def result(self):
"""Read-only property to retrieve the overall end result of the handler instance
:return: handler result
:rtype: :class:`aodncore.pipeline.common.HandlerResult`
"""
return self._result
@property
def should_notify(self):
"""Read-only property to retrieve the list of intended recipients *after* being assembled based on
:py:attr:`notify_params`
:return: list of intended recipients
:rtype: :py:class:`list`
"""
return self._should_notify
@property
def start_time(self):
"""Read-only property containing the timestamp of when this instance was created
:return: timestamp of handler starting time
:rtype: :py:class:`datetime.datetime`
"""
return self._start_time
@lazyproperty
def state_query(self):
"""Read-only property containing an initialised StateQuery instance, for querying existing pipeline state
:return: StateQuery instance
:rtype: :py:class:`StateQuery`
"""
wfs_broker = WfsBroker(self.config.pipeline_config['global'].get('wfs_url'),
version=self.config.pipeline_config['global'].get('wfs_version', DEFAULT_WFS_VERSION))
return StateQuery(storage_broker=self._upload_store_runner.broker, wfs_broker=wfs_broker)
@property
def default_addition_publish_type(self):
"""Property to manage attribute which determines the default publish type assigned to 'addition'
:py:class:`PipelineFile` instances
:return: default addition publish type
:rtype: :py:class:`aodncore.pipeline.common.PipelinePublishType`
"""
return self._default_addition_publish_type
@default_addition_publish_type.setter
def default_addition_publish_type(self, publish_type):
validate_publishtype(publish_type)
self._default_addition_publish_type = publish_type
@property
def default_deletion_publish_type(self):
"""Property to manage attribute which determines the default publish type assigned to 'deletion'
:py:class:`PipelineFile` instances
:return: default deletion publish type
:rtype: :class:`aodncore.pipeline.common.PipelinePublishType`
"""
return self._default_deletion_publish_type
@default_deletion_publish_type.setter
def default_deletion_publish_type(self, publish_type):
validate_publishtype(publish_type)
self._default_deletion_publish_type = publish_type
@property
def collection_dir(self):
"""Temporary subdirectory where the *initial* input file collection will be unpacked
.. warning:: Any new files created during the handler execution (i.e. were not in the original input file)
should be created in :py:attr:`self.products_dir` rather than here.
:return: collection subdirectory of instance working directory (as populated by
:py:mod:`aodncore.pipeline.steps.resolve` step)
:rtype: :class:`str`, :class:`None`
"""
if self._instance_working_directory:
return os.path.join(self._instance_working_directory, 'collection')
@property
def products_dir(self):
"""Temporary subdirectory in which products may be created
:return: products subdirectory of instance working directory
:rtype: :class:`str`, :class:`None`
"""
if self._instance_working_directory:
return os.path.join(self._instance_working_directory, 'products')
@property
def temp_dir(self):
"""Temporary subdirectory where any other arbitrary temporary files may be created by handler sub-classes
:return: temporary subdirectory of instance working directory
:rtype: :class:`str`, :class:`None`
"""
if self._instance_working_directory:
return os.path.join(self._instance_working_directory, 'temp')
#
# private properties
#
@lazyproperty
def _archive_store_runner(self):
"""Private read-only property for accessing the instance's 'archive' store runner (for internal use only)
:return: :py:class:`StoreRunner`
"""
archive_store_runner_object = get_store_runner(self._config.pipeline_config['global']['archive_uri'],
self._config, self.logger, archive_mode=True)
self.logger.sysinfo("get_store_runner (archive) -> {archive_store_runner_object}".format(
archive_store_runner_object=archive_store_runner_object))
return archive_store_runner_object
@lazyproperty
def _upload_store_runner(self):
"""Private read-only property for accessing the instance 'upload' store runner (for internal use only)
:return: :py:class:`StoreRunner`
"""
upload_store_runner_object = get_store_runner(self._config.pipeline_config['global']['upload_uri'],
self._config, self.logger)
self.logger.sysinfo("get_store_runner (upload) -> {upload_store_runner_object}".format(
upload_store_runner_object=upload_store_runner_object))
return upload_store_runner_object
#
# 'before' methods for ordered state machine transitions
#
def _initialise(self):
self.logger.info("running handler -> {self}".format(self=self))
self._file_collection = PipelineFileCollection()
self._validate_and_freeze_params()
self._set_input_file_attributes()
self._check_input_file_name()
self._set_path_functions()
self._init_working_directory()
def _resolve(self):
resolve_runner = get_resolve_runner(self.input_file, self.collection_dir, self.config, self.logger,
self.resolve_params)
self.logger.sysinfo("get_resolve_runner -> {resolve_runner}".format(resolve_runner=resolve_runner))
resolved_files = resolve_runner.run()
resolved_files.set_file_update_callback(self._file_update_callback)
# if include_regexes is not defined, default to including all files when setting publish types
include_regexes = self.include_regexes if self.include_regexes else ensure_regex_list([r'.*'])
resolved_files.set_publish_types_from_regexes(include_regexes, self.exclude_regexes,
self.default_addition_publish_type,
self.default_deletion_publish_type)
self.file_collection.update(resolved_files)
def _check(self):
check_runner = get_check_runner(self.config, self.logger, self.check_params)
self.logger.sysinfo("get_check_runner -> {check_runner}".format(check_runner=check_runner))
self.file_collection \
.filter_by_attribute_id('check_type', PipelineFileCheckType.UNSET) \
.set_default_check_types(self.check_params)
files_to_check = self.file_collection.filter_by_attribute_id_not('check_type', PipelineFileCheckType.NO_ACTION)
if files_to_check:
check_runner.run(files_to_check)
def _archive(self):
files_to_archive = self.file_collection.filter_by_bool_attribute('pending_archive')
if files_to_archive:
self._archive_store_runner.run(files_to_archive)
if self.archive_input_file:
if self.input_file_object.publish_type is PipelineFilePublishType.UNSET:
self.input_file_object.publish_type = PipelineFilePublishType.ARCHIVE_ONLY
self.input_file_object.archive_path = self.input_file_archive_path
self._archive_store_runner.run(self.input_file_object)
def _harvest(self):
harvest_runner = get_harvester_runner(self.harvest_type, self._upload_store_runner.broker, self.harvest_params,
self.temp_dir, self.config, self.logger)
self.logger.sysinfo("get_harvester_runner -> {harvest_runner}".format(harvest_runner=harvest_runner))
files_to_harvest = self.file_collection.filter_by_bool_attribute('pending_harvest')
if files_to_harvest:
harvest_runner.run(files_to_harvest)
def _store_unharvested(self):
files_to_store = self.file_collection.filter_by_bool_attribute('pending_store')
if files_to_store:
self._upload_store_runner.run(files_to_store)
def _pre_publish(self):
unset = self.file_collection \
.filter_by_bool_attribute_not('is_deletion') \
.filter_by_attribute_id('publish_type', PipelineFilePublishType.UNSET) \
.get_attribute_list('src_path')
if unset:
raise UnmatchedFilesError("files with UNSET publish_type found: '{unset}'".format(unset=unset))
self.file_collection.set_archive_paths(self._archive_path_function_ref)
self.file_collection.validate_attribute_uniqueness('archive_path')
if self.allowed_archive_path_regexes:
files_to_archive = self.file_collection.filter_by_bool_attribute('pending_archive')
files_to_archive.validate_attribute_value_matches_regexes('archive_path', self.allowed_archive_path_regexes)
self.file_collection.set_dest_paths(self._dest_path_function_ref)
self.file_collection.validate_attribute_uniqueness('dest_path')
if self.allowed_dest_path_regexes:
files_to_store = self.file_collection.filter_by_bool_attributes_or('pending_store', 'pending_harvest')
files_to_store.validate_attribute_value_matches_regexes('dest_path', self.allowed_dest_path_regexes)
self._upload_store_runner.set_is_overwrite(self.file_collection)
def _publish(self):
self._pre_publish()
self._archive()
self._harvest()
self._store_unharvested()
#
# 'before' methods for non-ordered state machine transitions
#
def _notify_common(self):
collection_headers, collection_data = self.file_collection.get_table_data()
checks = () if self.check_params is None else self.check_params.get('checks', ())
class_dict = dict(self)
extra = {
'input_file': os.path.basename(self.input_file),
'processing_result': self.result.name,
'handler_start_time': self.start_time.strftime("%Y-%m-%d %H:%M"),
'checks': ','.join(checks) or 'None',
'collection_headers': collection_headers,
'collection_data': collection_data,
'error_details': self.error_details or False,
'upload_dir': os.path.dirname(self.upload_path) if self.upload_path else None
}
notification_data = merge_dicts(class_dict, extra)
notify_runner = get_notify_runner(notification_data, self.config, self.logger, self.notify_params)
self.logger.sysinfo("get_notify_runner -> {notify_runner}".format(notify_runner=notify_runner))
if self._should_notify:
self._notification_results = notify_runner.run(self._should_notify)
def _notify_success(self):
self._notify_common()
def _notify_error(self):
self._notify_common()
def _complete_common(self):
self.logger.info("handler result for input_file '{self.input_file}': {self._result.name}".format(self=self))
def _complete_success(self):
self._complete_common()
def _complete_with_errors(self):
self._complete_common()
#
# callbacks
#
def _after_state_change(self):
self.logger.sysinfo(
"{self.__class__.__name__} transitioned to state: {self.state}".format(self=self))
if self.celery_task is not None:
self.celery_task.update_state(state=self.state)
def _file_update_callback(self, **kwargs):
raw_name = kwargs.get('name')
name = "{name} (deletion)".format(name=raw_name) if kwargs.get('is_deletion') else raw_name
self.logger.info("updated file '{name}': {message}".format(name=name, message=kwargs.get('message', '')))
#
# "internal" helper methods
#
def _check_input_file_name(self):
if self.allowed_extensions and self.file_extension not in self.allowed_extensions:
raise InvalidFileFormatError("input file extension '{self.file_extension}' "
"not in allowed_extensions list: {self.allowed_extensions}".format(self=self))
if self.allowed_regexes and not matches_regexes(self.file_basename, include_regexes=self.allowed_regexes):
raise InvalidInputFileError("input file '{self.file_basename}' does not match any patterns "
"in the allowed_regexes list: {self.allowed_regexes}".format(self=self))
def _init_logger(self):
try:
celery_task_id = self.celery_task.request.id
celery_task_name = self.celery_task.name
pipeline_name = self.celery_task.pipeline_name
self._logger = self.celery_task.logger
except AttributeError as e:
# the absence of a celery task indicates we're in a unittest or IDE, so fall-back to basic logging config
celery_task_id = None
celery_task_name = 'NO_TASK'
pipeline_name = 'NO_PIPELINE'
logging.basicConfig(level=FALLBACK_LOG_LEVEL, format=FALLBACK_LOG_FORMAT)
logging_extra = {
'celery_task_id': celery_task_id,
'celery_task_name': celery_task_name,
'pipeline_name': pipeline_name
}
logger = get_pipeline_logger('', logging_extra)
# turn down logging for noisy libraries to WARN, unless overridden in pipeline config 'liblevel' key
liblevel = getattr(self.config, 'pipeline_config', {}).get('logging', {}).get('liblevel', 'WARN')
for lib in ('botocore', 'paramiko', 's3transfer', 'transitions'):
logging.getLogger(lib).setLevel(liblevel)
logger.warning('no logger parameter or celery task found, falling back to root logger')
logger.debug('_init_logging exception: {e}'.format(e=e))
self._logger = logger
self._celery_task_id = celery_task_id
self._celery_task_name = celery_task_name
self._pipeline_name = pipeline_name
def _init_working_directory(self):
for subdirectory in ('collection', 'products', 'temp'):
os.mkdir(os.path.join(self._instance_working_directory, subdirectory))
def _handle_error(self, exception, system_error=False):
self._error = exception
self._result = HandlerResult.ERROR
should_notify = []
notify_params_dict = self.notify_params or {}
try:
if system_error:
self.logger.exception(format_exception(exception))
import traceback
self._error_details = traceback.format_exc()
# invalid configuration means notification is not possible
if not isinstance(exception, (InvalidConfigError, MissingConfigParameterError)):
should_notify.extend(notify_params_dict.get('owner_notify_list', []))
if notify_params_dict.get('error_notify_list'):
self.logger.warning("exception is not a user-correctable problem, "
"excluding 'error_notify_list' from notification")
else:
self.logger.error(format_exception(exception))
self._error_details = str(exception)
should_notify.extend(notify_params_dict.get('error_notify_list', []))
if notify_params_dict.get('notify_owner_error', False):
should_notify.extend(notify_params_dict.get('owner_notify_list', []))
self._should_notify = should_notify
self._trigger_notify_error()
self._trigger_complete_with_errors()
except Exception as e:
self.logger.exception('error during _handle_error method: {e}'.format(e=format_exception(e)))
def _handle_success(self):
self._result = HandlerResult.SUCCESS
should_notify = []
notify_params_dict = self.notify_params or {}
should_notify.extend(notify_params_dict.get('success_notify_list', []))
if notify_params_dict.get('notify_owner_success', False):
should_notify.extend(notify_params_dict.get('owner_notify_list', []))
self._should_notify = should_notify
try:
self._trigger_notify_success()
self._trigger_complete_success()
except Exception as e:
self.logger.exception('error during _handle_success method: {e}'.format(e=format_exception(e)))
def _set_input_file_attributes(self):
try:
self._file_checksum = get_file_checksum(self.input_file)
except (IOError, OSError) as e:
self.logger.exception(e)
raise InvalidInputFileError(e)
self.logger.sysinfo("get_file_checksum -> '{self.file_checksum}'".format(self=self))
self._file_basename = os.path.basename(self.input_file)
self.logger.sysinfo("file_basename -> '{self._file_basename}'".format(self=self))
_, self._file_extension = os.path.splitext(self.input_file)
self.logger.sysinfo("file_extension -> '{self._file_extension}'".format(self=self))
self._file_type = FileType.get_type_from_extension(self.file_extension)
self.logger.sysinfo("file_type -> {self._file_type}".format(self=self))
def _set_path_functions(self):
dest_path_function_ref, dest_path_function_name = get_path_function(self, self.config.pipeline_config[
'pluggable']['path_function_group'])
self._dest_path_function_ref = dest_path_function_ref
self._dest_path_function_name = dest_path_function_name
self.logger.sysinfo("get_path_function (upload) -> {dest_path_function_name}".format(
dest_path_function_name=dest_path_function_name))
archive_path_function_ref, archive_path_function_name = get_path_function(self, self.config.pipeline_config[
'pluggable']['path_function_group'], archive_mode=True)
self._archive_path_function_ref = archive_path_function_ref
self._archive_path_function_name = archive_path_function_name
self.logger.sysinfo("get_path_function (archive) -> {archive_path_function_name}".format(
archive_path_function_name=archive_path_function_name))
def _validate_and_freeze_params(self):
if self.check_params is not None:
validate_check_params(self.check_params)
self.check_params = ensure_writeonceordereddict(self.check_params)
if self.custom_params is not None:
validate_custom_params(self.custom_params)
self.custom_params = ensure_writeonceordereddict(self.custom_params)
if self.harvest_params is not None:
validate_harvest_params(self.harvest_params)
self.harvest_params = ensure_writeonceordereddict(self.harvest_params)
if self.notify_params is not None:
validate_notify_params(self.notify_params)
self.notify_params = ensure_writeonceordereddict(self.notify_params)
if self.resolve_params is not None:
validate_resolve_params(self.resolve_params)
self.resolve_params = ensure_writeonceordereddict(self.resolve_params)
#
# process methods - to be overridden by child class as required
#
[docs] def preprocess(self): # pragma: no cover
"""Method designed to be overridden by child handlers in order to execute code between resolve and check steps
:return: None
"""
self.logger.sysinfo("`preprocess` not overridden by child class, skipping step")
[docs] def process(self): # pragma: no cover
"""Method designed to be overridden by child handlers in order to execute code between check and publish steps
:return: None
"""
self.logger.sysinfo("`process` not overridden by child class, skipping step")
[docs] def postprocess(self): # pragma: no cover
"""Method designed to be overridden by child handlers in order to execute code between publish and notify steps
:return: None
"""
self.logger.sysinfo("`postprocess` not overridden by child class, skipping step")
#
# "public" methods
#
[docs] def add_to_collection(self, pipeline_file, **kwargs):
"""Helper method to add a PipelineFile object or path to the handler's file_collection, with the handler
instance's file_update_callback method assigned
Note: as this is a wrapper to the PipelineFileCollection.add method, kwargs are *only* applied when
pipeline_file is a path (string). When adding an existing PipelineFile object, the object is added "as-is",
and attributes must be set explicitly on the object.
:param pipeline_file: :py:class:`PipelineFile` or file path
:param kwargs: keyword arguments passed through to the PipelineFileCollection.add method
:return: None
"""
if isinstance(pipeline_file, PipelineFile):
pipeline_file.file_update_callback = self._file_update_callback
self.file_collection.add(pipeline_file, file_update_callback=self._file_update_callback, **kwargs)
[docs] def run(self):
"""The entry point to the handler instance. Executes the automatic state machine transitions, and populates the
:attr:`result` attribute to signal success or failure of the handler instance.
"""
if self._handler_run:
raise HandlerAlreadyRunError('handler instance has already been run')
self._handler_run = True
base_temp_directory = self.config.pipeline_config['global'].get('tmp_dir', gettempdir())
with TemporaryDirectory(prefix=self.__class__.__name__, dir=base_temp_directory) as instance_working_directory:
self._instance_working_directory = instance_working_directory
try:
for transition in HandlerBase.ordered_transitions:
self.trigger(transition['trigger'])
except PipelineProcessingError as e:
self._handle_error(e)
except (Exception, KeyboardInterrupt, SystemExit) as e:
self._handle_error(e, system_error=True)
else:
self._handle_success()