aodncore.pipeline package
Subpackages
Submodules
aodncore.pipeline.common module
This module contains common “types” (typically enums or simple data classes) used by more than one module in the project.
- class aodncore.pipeline.common.CheckResult(compliant, log, errors=False)[source]
Bases:
object
Simple container class to hold a check result
- property compliant
- property errors
- property log
- class aodncore.pipeline.common.FileType(value)[source]
Bases:
Enum
Represents each known file type, including extensions, mime type and validator function
Each enum member may have it’s attributes accessed by name when required for comparisons and filtering, e.g.
lookup the extension and mime-type for PNG file types in general:
FileType.PNG.extensions ('.png',) FileType.PNG.mime_type 'image/png'
assign a type attribute to an object, and query the type-specific values directly from the object:
class Object(object): pass o = Object() o.file_type = FileType.JPEG o.file_type.extensions ('.jpg', '.jpeg') o.file_type.mime_type 'image/jpeg'
The function referenced by validator must accept a single parameter, which is the path to file being checked, and return True if the path represents a valid file of that type. For example, this would typically attempt to open/parse/read the file using a corresponding library (e.g. an NC file can be opened as a valid Dataset by the NetCDF library, a ZIP file can be read using the zipfile module etc.). If
- CSV = (('.csv',), 'text/csv', <function is_nonempty_file>)
- DELETE_MANIFEST = (('.delete_manifest',), 'text/csv', <function is_nonempty_file>)
- DIR_MANIFEST = (('.dir_manifest',), 'text/plain', <function is_nonempty_file>)
- GZIP = (('.gz',), 'application/gzip', <function is_gzip_file>)
- JPEG = (('.jpg', '.jpeg'), 'image/jpeg', <function is_jpeg_file>)
- JSON_MANIFEST = (('.json_manifest',), 'application/json', <function is_json_file>)
- MAP_MANIFEST = (('.map_manifest',), 'text/csv', <function is_nonempty_file>)
- NETCDF = (('.nc',), 'application/octet-stream', <function is_netcdf_file>)
- PDF = (('.pdf',), 'application/pdf', <function is_pdf_file>)
- PNG = (('.png',), 'image/png', <function is_png_file>)
- RSYNC_MANIFEST = (('.rsync_manifest',), 'text/plain', <function is_nonempty_file>)
- SIMPLE_MANIFEST = (('.manifest',), 'text/plain', <function is_nonempty_file>)
- TIFF = (('.tif', '.tiff'), 'image/tiff', <function is_tiff_file>)
- UNKNOWN = ((), None, <function is_nonempty_file>)
- ZIP = (('.zip',), 'application/zip', <function is_zip_file>)
- extensions
- property is_image_type
Read-only boolean property indicating whether the file type instance is an image type.
- is_type(value)[source]
Check whether the type (i.e. the information before the slash) equals the given value
- Parameters
value – type to compare against
- Returns
True if this type matches the given value, otherwise False
- mime_type
- validator
- class aodncore.pipeline.common.HandlerResult(value)[source]
Bases:
Enum
An enumeration.
- ERROR = 2
- SUCCESS = 1
- UNKNOWN = 0
- class aodncore.pipeline.common.NotificationRecipientType(value)[source]
Bases:
Enum
Notification recipient type
- EMAIL = ('email', <function is_valid_email_address>, 'invalid email address')
- INVALID = ('invalid', <function NotificationRecipientType.<lambda>>, 'invalid protocol')
- SNS = ('sns', <function NotificationRecipientType.<lambda>>, 'invalid SNS topic')
- property address_validation_function
- property error_string
- property protocol
- class aodncore.pipeline.common.PipelineFileCheckType(value)[source]
Bases:
Enum
Each
PipelineFile
may individually specify which checks are performed against it- FORMAT_CHECK = 3
- NC_COMPLIANCE_CHECK = 4
- NONEMPTY_CHECK = 2
- NO_ACTION = 1
- TABLE_SCHEMA_CHECK = 5
- UNSET = 0
- all_checkable_types = {<PipelineFileCheckType.FORMAT_CHECK: 3>, <PipelineFileCheckType.NONEMPTY_CHECK: 2>, <PipelineFileCheckType.TABLE_SCHEMA_CHECK: 5>, <PipelineFileCheckType.NC_COMPLIANCE_CHECK: 4>}
- all_settable_types = {<PipelineFileCheckType.NONEMPTY_CHECK: 2>, <PipelineFileCheckType.NO_ACTION: 1>, <PipelineFileCheckType.NC_COMPLIANCE_CHECK: 4>, <PipelineFileCheckType.FORMAT_CHECK: 3>, <PipelineFileCheckType.TABLE_SCHEMA_CHECK: 5>}
- class aodncore.pipeline.common.PipelineFilePublishType(value)[source]
Bases:
Enum
Each
PipelineFile
may individually specify which combination of archive/upload/harvest actions must occur before it is considered “published”Enum member values are a tuple containing boolean flags used for querying/validating types, which are provided to the
__init__()
for each member (since confusingly, eachEnum
member is an instance of this class).Each member’s valid is therefore a tuple of
bool
values representing the following flags:(is_addition_type, is_deletion_type, is_archive_type, is_store_type, is_harvest_type)
- ARCHIVE_ONLY = (True, False, True, False, False)
- DELETE_ONLY = (False, True, False, True, False)
- DELETE_UNHARVEST = (False, True, False, True, True)
- HARVEST_ARCHIVE = (True, False, True, False, True)
- HARVEST_ARCHIVE_UPLOAD = (True, False, True, True, True)
- HARVEST_ONLY = (True, False, False, False, True)
- HARVEST_UPLOAD = (True, False, False, True, True)
- NO_ACTION = (True, True, False, False, False)
- UNHARVEST_ONLY = (False, True, False, False, True)
- UNSET = (None, None, None, None, None)
- UPLOAD_ONLY = (True, False, False, True, False)
- all_addition_types = {<PipelineFilePublishType.ARCHIVE_ONLY: (True, False, True, False, False)>, <PipelineFilePublishType.NO_ACTION: (True, True, False, False, False)>, <PipelineFilePublishType.HARVEST_UPLOAD: (True, False, False, True, True)>, <PipelineFilePublishType.HARVEST_ONLY: (True, False, False, False, True)>, <PipelineFilePublishType.HARVEST_ARCHIVE: (True, False, True, False, True)>, <PipelineFilePublishType.UPLOAD_ONLY: (True, False, False, True, False)>, <PipelineFilePublishType.HARVEST_ARCHIVE_UPLOAD: (True, False, True, True, True)>}
- all_deletion_types = {<PipelineFilePublishType.DELETE_UNHARVEST: (False, True, False, True, True)>, <PipelineFilePublishType.NO_ACTION: (True, True, False, False, False)>, <PipelineFilePublishType.DELETE_ONLY: (False, True, False, True, False)>, <PipelineFilePublishType.UNHARVEST_ONLY: (False, True, False, False, True)>}
- property is_addition_type
- property is_archive_type
- property is_deletion_type
- property is_harvest_type
- property is_store_type
- aodncore.pipeline.common.validate_addition_publishtype(o)
- aodncore.pipeline.common.validate_checkresult(o)
- aodncore.pipeline.common.validate_checktype(o)
- aodncore.pipeline.common.validate_deletion_publishtype(o)
- aodncore.pipeline.common.validate_publishtype(o)
- aodncore.pipeline.common.validate_recipienttype(o)
- aodncore.pipeline.common.validate_settable_checktype(o)
aodncore.pipeline.config module
This module is separate from the aodncore.pipeline.configlib
definitions in order for it to act as the
shared configuration “singleton” for the package.
aodncore.pipeline.configlib module
This module provides code to support loading and accessing a central configuration object. Other modules should
typically access the configuration by importing CONFIG
from aodncore.pipeline.config
rather than
manually creating a new LazyConfigManager
instance.
- class aodncore.pipeline.configlib.LazyConfigManager[source]
Bases:
object
Configuration object to consolidate configuration values.
Different configuration sources are represented as lazy properties so that it is still efficient to pass an instance of this class to several modules, even if they only require one particular type of configuration to operate.
- property celery_application
- property celery_routes
- property discovered_dest_path_functions
- property discovered_handlers
- property discovered_module_versions
- get_worker_logging_config(task_name)[source]
Get the logging config for an individual task
- Parameters
task_name – name of the task to retrieve the log config for
- Returns
logging config for use by logging.config.dictConfig
- property pipeline_config
- property trigger_config
- property watch_config
- property watch_directory_map
- property watchservice_logging_config
- aodncore.pipeline.configlib.load_json_file(default_config_file, envvar='', object_pairs_hook=<class 'aodncore.util.misc.WriteOnceOrderedDict'>)[source]
Load a JSON file into a
dict
- Parameters
default_config_file –
envvar – environment variable containing path to load
object_pairs_hook – class used for json.load ‘object_pairs_hook’ parameter
- Returns
object containing loaded JSON config
- aodncore.pipeline.configlib.load_pipeline_config(default_config_file='/mnt/ebs/pipeline/etc/watches.conf', envvar='PIPELINE_CONFIG_FILE')[source]
- aodncore.pipeline.configlib.load_trigger_config(default_config_file='/usr/local/talend/etc/trigger.conf')[source]
- Parameters
default_config_file – default path to return if not found set in environment variable
- Returns
dict representation of watch config parsed from json file
- aodncore.pipeline.configlib.load_watch_config(default_config_file='/mnt/ebs/pipeline/etc/watches.conf')[source]
- Parameters
default_config_file – default path to return if not found set in environment variable
- Returns
dict representation of watch config parsed from json file
- aodncore.pipeline.configlib.validate_lazyconfigmanager(o)
aodncore.pipeline.db module
- class aodncore.pipeline.db.DatabaseInteractions(config, schema_base_path, logger)[source]
Bases:
object
Database connection object.
This class should be instantiated via the ‘with DatabaseInteractions() as…’ method, so the __enter__ and __exit__ functions will be correctly implemented.
- compare_schemas()[source]
Placeholder for possible future implementation of schema version checking
- Returns
boolean - True if schemas match, else False
- create_table_from_yaml_file(step)[source]
Function to read an yaml file and use it to build a CREATE TABLE script for execution against the database.
- Parameters
step – A dict containing ‘name’ and ‘type’ (at least) keys
step.name is the name used as part of the match regular expression
step.type is the type of database object. Type should always be table in this context
- drop_object(step)[source]
Drop the specified database object.
The database transaction uses the IF EXISTS parameter, so will not error if the database object does not exist; and also the CASCADE parameter meaning that a previous call to this method may have already cascaded to the current database object.
- Parameters
step – A dict containing ‘name’ and ‘type’ (at least) keys
step.name is the name of the database object
step.type is the type of database object
- execute_sql_file(step)[source]
Function to read an SQL file prior to executing against the database.
- Parameters
step – A dict containing ‘name’ (at least) key
step.name is the name used as part of the match regular expression
- get_spatial_extent(db_schema, table, column, resolution)[source]
Function to retrieve spatial data from the database.
- Parameters
db_schema – string containing name of schema
table – string containing name of table
column – string containing name of column
resolution – int as resolution of polygons
- get_temporal_extent(table, column)[source]
Function to retrieve temporal data from the database.
- Parameters
table – string containing name of table
column – string containing name of column
- get_vertical_extent(table, column)[source]
Function to retrieve vertical data from the database.
- Parameters
table – string containing name of table
column – string containing name of column
- load_data_from_csv(step)[source]
Function to read a csv file prior to loading into the specified table.
Currently uses the utf-8 encoding to read the csv file, and reads the headings into the COPY FROM statement - the latter may not be necessary as it is assumed that the file has been validated in a previous handler step. :param step: A dict containing ‘name’ and ‘local_path’ (at least) keys - step.name is the name of the target table - step.local_path is the full path to the source file (csv)
- refresh_materialized_view(step)[source]
Refresh the specified materialized view.
- Parameters
step – A dict containing ‘name’ and ‘type’ (at least) keys
step.name is the name of the database object
- step.type is the type of database object - the database transaction will only be performed
if type = ‘materialized view’
aodncore.pipeline.destpath module
This module provides code to determine which function/method will be used in order for the handler to generate the destination path (dest_path) and/or archive path (archive_path) for each file being processed. Since the path function may be in the handler class itself, or provided as a runtime parameter, this code is to ensure that the discovery mechanism is consistent.
- aodncore.pipeline.destpath.get_path_function(handler_instance, entry_point_group, archive_mode=False)[source]
Return a tuple containing a reference to a path function, and the function’s fully qualified, printable name
- Parameters
handler_instance –
HandlerBase
instanceentry_point_group – name of the entry point group used discover path functions
archive_mode –
bool
flag to modify which attributes are used
- Returns
result of
PathFunctionResolver.resolve()
method
aodncore.pipeline.exceptions module
This module provides custom exceptions used throughout the aodncore.pipeline
package.
- exception aodncore.pipeline.exceptions.AttributeNotSetError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.AttributeValidationError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.ComplianceCheckFailedError[source]
Bases:
PipelineProcessingError
- exception aodncore.pipeline.exceptions.DuplicatePipelineFileError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.GeonetworkConnectionError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.GeonetworkRequestError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.HandlerAlreadyRunError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidCheckSuiteError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidCheckTypeError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidConfigError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidFileContentError[source]
Bases:
PipelineProcessingError
- exception aodncore.pipeline.exceptions.InvalidFileFormatError[source]
Bases:
PipelineProcessingError
- exception aodncore.pipeline.exceptions.InvalidFileNameError[source]
Bases:
PipelineProcessingError
- exception aodncore.pipeline.exceptions.InvalidHandlerError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidHarvestMapError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidHarvesterError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidInputFileError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidPathFunctionError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidRecipientError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidSQLConnectionError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidSQLTransactionError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidSchemaError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.InvalidStoreUrlError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.MissingConfigFileError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.MissingConfigParameterError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.MissingFileError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.NotificationFailedError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.PipelineProcessingError[source]
Bases:
AodnBaseError
Base class for all exceptions which indicate that there was a problem processing the file as opposed to an internal configuration or environmental error. Handler classes should typically raise exceptions based on this exception to signal non-compliance of the file or some other user correctable problem.
- exception aodncore.pipeline.exceptions.PipelineSystemError[source]
Bases:
AodnBaseError
Base class for all exceptions not related to file processing and which would typically not be suitable to return to an end user
- exception aodncore.pipeline.exceptions.StorageBrokerError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.UnexpectedCsvFilesError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.UnmappedFilesError[source]
Bases:
PipelineSystemError
- exception aodncore.pipeline.exceptions.UnmatchedFilesError[source]
Bases:
PipelineSystemError
aodncore.pipeline.fileclassifier module
FileClassifier - Generic class for working out the destination path of a file to be published. The idea is to define the common functionality here, then create subclasses to customise for each specific incoming handler.
Expected use:
class MyFileClassifier(FileClassifier):
def dest_path(self, input_file):
path = <case-specific logic>
...
return path
try:
dest_path = MyFileClassifier.dest_path(input_file)
except FileClassifierException, e:
print >>sys.stderr, e
raise
print dest_path
aodncore.pipeline.files module
- class aodncore.pipeline.files.PipelineFile(local_path, name=None, archive_path=None, dest_path=None, is_deletion=False, late_deletion=False, file_update_callback=None, check_type=None, publish_type=None)[source]
Bases:
PipelineFileBase
Represents a single file in order to store state information relating to the intended actions to be performed on the file, and the actions that were performed on the file
- Parameters
local_path (
str
) – absolute source path to the file being representedname (
str
) – arbitrary name (defaults to the output ofos.path.basename()
on local_path)archive_path (
str
) – relative path used when archiving the filedest_path (
str
) – relative path used when publishing the fileis_deletion (
bool
) – flag designating whether this is a deletionlate_deletion (
bool
) – flag to indicate that this file should be deleted after additions are performed (note: ignored if is_deletion=False)file_update_callback (
callable
) – optional callback to call when a file property is updatedcheck_type (PipelineFileCheckType) – check type assigned to the file
publish_type (PipelineFilePublishType) – publish type assigned to the file
- property archive_path
- property check_log
- property check_passed
- property check_result
- property check_type
- property dest_path
- property file_checksum
- property file_update_callback
- classmethod from_remotepipelinefile(remotepipelinefile, name=None, is_deletion=False, late_deletion=False, file_update_callback=None, check_type=None, publish_type=None)[source]
Construct a PipelineFile instance from an existing RemotePipelineFile instance
- Parameters
remotepipelinefile – RemotePipelineFile instance used to instantiate a PipelineFile instance
name – name flag passed to __init__ (defaults to remotepipelinefile.name)
is_deletion – is_deletion flag passed to __init__
late_deletion – late_deletion flag passed to __init__
file_update_callback – file_update_callback flag passed to __init__
check_type – check_type flag passed to __init__
publish_type – publish_type flag passed to __init__
- Returns
PipelineFile instance
- property is_archived
- property is_checked
- property is_deleted
- property is_deletion
- property is_harvest_undone
- property is_harvested
- property is_overwrite
- property is_stored
- property is_upload_undone
- property is_uploaded
- property late_deletion
- property mime_type
- property pending_archive
- property pending_harvest
- property pending_harvest_addition
- property pending_harvest_deletion
- property pending_harvest_early_deletion
- property pending_harvest_late_deletion
- property pending_harvest_undo
- property pending_store
- property pending_store_addition
- property pending_store_deletion
- property pending_store_undo
- property pending_undo
- property publish_type
- property published
- property should_archive
- property should_harvest
- property should_store
- property should_undo
- property src_path
- class aodncore.pipeline.files.PipelineFileCollection(data=None, validate_unique=True)[source]
Bases:
PipelineFileCollectionBase
A PipelineFileCollectionBase subclass to hold a set of PipelineFile instances
- add(pipeline_file, is_deletion=False, overwrite=False, validate_unique=True, **kwargs)[source]
Add a file to the collection
- Parameters
pipeline_file –
PipelineFile
or file pathkwargs –
dict
additional keywords passed to to PipelineFileBase __init__ methodoverwrite –
bool
which, if True, will overwrite an existing matching file in the collectionvalidate_unique –
bool
which, if True, will validate unique attributes when adding the file
- Returns
bool
which indicates whether the file was successfully added
- classmethod from_remotepipelinefilecollection(remotepipelinefilecollection, are_deletions=False)[source]
- member_class[source]
alias of
PipelineFile
- property member_from_string_method
- member_validator()
- set_archive_paths(archive_path_function)[source]
Set archive_path attributes for each file in the collection
- Parameters
archive_path_function – function used to determine archive destination path
- Returns
None
- set_bool_attribute(attribute, value)[source]
Set a
bool
attribute for each file in the collection- Parameters
attribute – attribute to set
value – value to set the attribute
- Returns
None
- set_check_types(check_type)[source]
Set check_type attributes for each file in the collection
- Parameters
check_type –
PipefileFileCheckType
enum member- Returns
None
- set_default_check_types(check_params=None)[source]
Set check_type attribute for each file in the collection to the default value, based on the file type and presence of compliance checker checks in the check parameters
- Parameters
check_params –
dict
or None- Returns
None
- set_dest_paths(dest_path_function)[source]
Set dest_path attributes for each file in the collection
- Parameters
dest_path_function – function used to determine publishing destination path
- Returns
None
- set_file_update_callback(file_update_callback)[source]
Set a callback function in each
PipelineFile
in this collection- Parameters
file_update_callback – callback (function)
- Returns
None
- set_publish_types(publish_type)[source]
Set publish_type attributes for each file in the collection
- Parameters
publish_type –
PipefileFilePublishType
enum member- Returns
None
- set_publish_types_from_regexes(include_regexes, exclude_regexes, addition_type, deletion_type)[source]
Set publish_type attribute for each file in the collection depending on whether it is considered “included” according to the regex parameters
- Parameters
include_regexes – regex(es) for which a file must match one or more to be included
exclude_regexes – regex(es) which will exclude an already included file
addition_type –
PipefileFilePublishType
enum member set for included addition filesdeletion_type –
PipefileFilePublishType
enum member set for included deletion files
- Returns
None
- set_string_attribute(attribute, value)[source]
Set a string attribute for each file in the collection
- Parameters
attribute – attribute to set
value – value to set the attribute
- Returns
None
- unique_attributes = ('archive_path', 'dest_path')
- class aodncore.pipeline.files.RemotePipelineFile(dest_path, local_path=None, name=None, last_modified=None, size=None)[source]
Bases:
PipelineFileBase
Implementation of PipelineFileBase to represents a single remote file. This is used to provide a common interface for a remote file, to facilitate querying and downloading operations where the current state of the storage is relevant information
- property file_checksum
- classmethod from_pipelinefile(pipeline_file)[source]
Construct a RemotePipelineFile instance from an existing PipelineFile instance
Note: the local_path is deliberately not set in the new instance, primarily to avoid accidental overwriting of local files in case of conversion to a remote file and having the remote file downloaded. The local_path attribute may still be set, but it is an explicit operation.
- Parameters
pipeline_file – PipelineFile instance used to instantiate a RemotePipelineFile instance
- Returns
RemotePipelineFile instance
- property last_modified
- property local_path
- property size
- class aodncore.pipeline.files.RemotePipelineFileCollection(data=None, validate_unique=True)[source]
Bases:
PipelineFileCollectionBase
A PipelineFileCollectionBase subclass to hold a set of RemotePipelineFile instances
- download(broker, local_path)[source]
Helper method to download the current collection from a given broker to a given local path
- Parameters
broker – BaseStorageBroker subclass to download from
local_path – local path into which files are downloaded
- Returns
None
- member_class[source]
alias of
RemotePipelineFile
- property member_from_string_method
- member_validator()
- unique_attributes = ('local_path', 'dest_path')
- aodncore.pipeline.files.ensure_pipelinefilecollection(o)[source]
Function to accept either a single PipelineFile OR a PipelineFileCollection and ensure that a PipelineFileCollection object is returned in either case
- Parameters
o – PipelineFile or PipelineFileCollection object
- Returns
PipelineFileCollection object
- aodncore.pipeline.files.ensure_remotepipelinefilecollection(o)[source]
Function to accept either a single RemotePipelineFile OR a RemotePipelineFileCollection and ensure that a RemotePipelineFileCollection object is returned in either case
- Parameters
o – RemotePipelineFile or RemotePipelineFileCollection object
- Returns
RemotePipelineFileCollection object
- aodncore.pipeline.files.validate_pipelinefile_or_pipelinefilecollection(o)
- aodncore.pipeline.files.validate_pipelinefile_or_string(o)
- aodncore.pipeline.files.validate_pipelinefilecollection(o)
aodncore.pipeline.geonetwork module
Geonetwork Library
- class aodncore.pipeline.geonetwork.Geonetwork(base_url, username, password)[source]
Bases:
object
Geonetwork API session handler
- Parameters
base_url – Geonetwork instance base url
username – username for the Geonetwork API
password – password for the Geonetwork API
logger – an instance of the logger
- class aodncore.pipeline.geonetwork.GeonetworkMetadataHandler(conn, session, metadata, logger)[source]
Bases:
object
Handle changes to Geonetwork metadata from Harvester
Build the geonetwork payload and push changes :param conn: the database connection class (DatabaseInteractions) :param session: the geonetwork API session :param metadata: dict containing extents for a single metadata record :param logger: instance of the logger
- aodncore.pipeline.geonetwork.dict_to_xml(tag, value=None, attr=None, elems=None, display=True)[source]
Convert a dictionary of XML nodes into a nested XML string
- Parameters
tag – a string or list containing element tag(s). If tag is a list then recursively build the parent elements and apply the rest of the logic to the last element.
value – string representing text for an element
attr – dict containing key:value pairs representing attributes of the element
elems – list of dicts containing nested element definitions. If this is present then the value will be overridden with recursive child elements
display – boolen to identify whether the element (and child nodes) should be rendered to the XML string
- Returns
an XML string
aodncore.pipeline.handlerbase module
- class aodncore.pipeline.handlerbase.HandlerBase(input_file, allowed_archive_path_regexes=None, allowed_dest_path_regexes=None, allowed_extensions=None, allowed_regexes=None, archive_input_file=False, archive_path_function=None, celery_task=None, check_params=None, config=None, custom_params=None, dest_path_function=None, error_cleanup_regexes=None, exclude_regexes=None, harvest_params=None, harvest_type='talend', include_regexes=None, notify_params=None, upload_path=None, resolve_params=None)[source]
Bases:
object
Base class for pipeline handler sub-classes.
Implements common handler methods and defines state machine states and transitions.
- Parameters
input_file (str) –
Path to the file being handled. A non-existent file will cause the handler to exit with an error during the initialise step.
Note
input_file
is the only positional argument. Other arguments may be provided in any order.allowed_archive_path_regexes (list) – List of allowed regular expressions of which
PipelineFile.archive_path
must match at least one. If any non-matching values are found, the handler will exit with an error during the publish step before publishing anything.allowed_dest_path_regexes (list) – List of allowed regular expressions of which
PipelineFile.dest_path
must match at least one. If any non-matching values are found, the handler will exit with an error during the publish step before publishing anything.allowed_extensions (list) – List of allowed extensions for
input_file
. Non-matching input files will cause the handler to exit with an error during the initialise step.allowed_regexes (list) –
List of allowed regular expressions for
input_file
. Non-matching input files will cause the handler to exit with an error during the initialise step.Note
allowed_regexes
are checked afterallowed_extensions
archive_input_file (bool) – Flags whether the original input file should be uploaded to the archive, the location of which is configured by the environment configuration. The file will be archived at ARCHIVE_URI/PIPELINE_NAME/BASENAME.
archive_path_function (str, function) – See
dest_path_function
. This operates identically, except that it is used to calculate thePipelineFile.archive_path
attribute and that the path is relative to the ARCHIVE_URI.celery_task (
celery.Task
) –A Celery task object, in order for the handler instance to derive runtime information such as the current task name and UUID.
Note
If absent (e.g. when unit testing), the handler will revert to having no task information available, and will log output to standard output.
check_params (
dict
) – A dict containing parameters passed directly to the check step (e.g. compliance checker suites). The structure of the dict is defined by theCHECK_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.config (
aodncore.pipeline.config.LazyConfigManager
) –A configuration object which the handler uses to retrieve configuration from it’s environment. If absent, the handler will exit with an error during the
__init__()
method (i.e. will not instantiate).Note
While this attribute is mandatory, it is not generally required to supply it directly in normal use cases, unless instantiating the handler class manually.
When deployed, the parameter is automatically included by the worker service configuration.
When testing, unit tests inheriting from
HandlerTestCase
contain a pre-prepared config object available asself.config
. TheHandlerTestCase.run_handler()
andHandlerTestCase.run_handler_with_exception()
helper methods automatically assign the test config to the handler being tested.custom_params (
dict
) – A dict containing parameters which are ignored by the base class, but allow passing arbitrary custom values to subclasses. The structure of the dict is defined by theCUSTOM_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.dest_path_function (
str
,callable
) –The function used to determine the
PipelineFile.dest_path
attribute, relative to the UPLOAD_URI configuration item. If absent, the handler will attempt to use thedest_path()
method in the handler itself. If a function is not found by either mechanism, the handler will exit with an error during the initialise step.Note
When the value is a string, it is assumed that it refers to the name of a function advertised in the pipeline.handlers entry point group.
error_cleanup_regexes (
list
) – A list of regular expressions which, when a cleanup policy of DELETE_CUSTOM_REGEXES_FROM_ERROR_STORE is set, controls which files are deleted from the error store upon successful execution of the handler instanceexclude_regexes (
list
) – Seeinclude_regexes
.harvest_params (
dict
) – A dict containing parameters passed directly to the harvest step (e.g. slice size, undo behaviour). The structure of the dict is defined by theHARVEST_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.harvest_type (
str
) –String to inform the
aodncore.pipeline.steps.harvest
step factory function which HarvesterRunner implementation to use during the publish step.Note
Currently the only valid value is ‘talend’, which is the default.
include_regexes (list) –
A list of regexes which, when combined with
exclude_regexes
, determines which files in the collection are assigned with thedefault_addition_publish_type
ordefault_deletion_publish_type
types (depending on whether the file is an addition or a deletion). If set, to be considered included, file paths must match one of the regexes ininclude_regexes
but not any of the regexes inexclude_regexes
.Files not matching the inclusion criteria will remain with a
publish_type
attribute ofPipelineFilePublishType.NO_ACTION
, meaning they will be ignored by the publish step.Note
If omitted, the default is to select all files in
file_collection
for publication.Note
This relates only to the files in
file_collection
, and has no relation to theinput_file
path, unless the input file is itself in the collection (e.g. when handling a single file).For example, a single ‘.nc’ file could feasibly match the
allowed_extensions
for the handler, but still be excluded by this mechanism once it is added tofile_collection
during theaodncore.pipeline.steps.resolve
step.notify_params (
dict
) – A dict containing parameters passed directly to theaodncore.pipeline.steps.notify
step (e.g. owner/success/failure notify lists). The structure of the dict is defined by theNOTIFY_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.upload_path (
str
) –A string attribute to hold the original upload path of the
input_file
.Note
This is intended for information purposes only (e.g. to appear in notification templates), since there is a distinction between the original path, and the
input_file
as provided to the handler, which represents where the file was moved to for processing.resolve_params (
dict
) – A dict containing parameters passed directly to the resolve step (e.g. the root path prepended to relative paths in manifest files). The structure of the dict is defined by theRESOLVE_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.
- add_to_collection(pipeline_file, **kwargs)[source]
Helper method to add a PipelineFile object or path to the handler’s file_collection, with the handler instance’s file_update_callback method assigned
Note: as this is a wrapper to the PipelineFileCollection.add method, kwargs are only applied when pipeline_file is a path (string). When adding an existing PipelineFile object, the object is added “as-is”, and attributes must be set explicitly on the object.
- Parameters
pipeline_file –
PipelineFile
or file pathkwargs – keyword arguments passed through to the PipelineFileCollection.add method
- Returns
None
- all_states = ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED', 'HANDLER_NOTIFIED_SUCCESS', 'HANDLER_NOTIFIED_ERROR', 'HANDLER_COMPLETED_SUCCESS', 'HANDLER_COMPLETED_ERROR']
- all_transitions = [{'trigger': '_trigger_initialise', 'source': 'HANDLER_INITIAL', 'dest': 'HANDLER_INITIALISED', 'before': '_initialise'}, {'trigger': '_trigger_resolve', 'source': 'HANDLER_INITIALISED', 'dest': 'HANDLER_RESOLVED', 'before': '_resolve'}, {'trigger': '_trigger_preprocess', 'source': 'HANDLER_RESOLVED', 'dest': 'HANDLER_PREPROCESSED', 'before': 'preprocess'}, {'trigger': '_trigger_check', 'source': 'HANDLER_PREPROCESSED', 'dest': 'HANDLER_CHECKED', 'before': '_check'}, {'trigger': '_trigger_process', 'source': 'HANDLER_CHECKED', 'dest': 'HANDLER_PROCESSED', 'before': 'process'}, {'trigger': '_trigger_publish', 'source': 'HANDLER_PROCESSED', 'dest': 'HANDLER_PUBLISHED', 'before': '_publish'}, {'trigger': '_trigger_postprocess', 'source': 'HANDLER_PUBLISHED', 'dest': 'HANDLER_POSTPROCESSED', 'before': 'postprocess'}, {'trigger': '_trigger_notify_success', 'source': 'HANDLER_POSTPROCESSED', 'dest': 'HANDLER_NOTIFIED_SUCCESS', 'before': '_notify_success'}, {'trigger': '_trigger_notify_error', 'source': ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED'], 'dest': 'HANDLER_NOTIFIED_ERROR', 'before': '_notify_error'}, {'trigger': '_trigger_complete_success', 'source': 'HANDLER_NOTIFIED_SUCCESS', 'dest': 'HANDLER_COMPLETED_SUCCESS', 'before': '_complete_success'}, {'trigger': '_trigger_complete_with_errors', 'source': 'HANDLER_NOTIFIED_ERROR', 'dest': 'HANDLER_COMPLETED_ERROR', 'before': '_complete_with_errors'}]
- property celery_task_id
Read-only property to access Celery task ID
- Returns
Celery task ID (if applicable)
- Return type
str
,None
- property celery_task_name
Read-only property to access Celery task name
- Returns
Celery task name (if applicable)
- Return type
str
,None
- property collection_dir
Temporary subdirectory where the initial input file collection will be unpacked
Warning
Any new files created during the handler execution (i.e. were not in the original input file) should be created in
self.products_dir
rather than here.- Returns
collection subdirectory of instance working directory (as populated by
aodncore.pipeline.steps.resolve
step)- Return type
str
,None
- property config
Property to access the
config
attribute- Returns
configuration object
- Return type
aodncore.pipeline.config.LazyConfigManager
- property default_addition_publish_type
Property to manage attribute which determines the default publish type assigned to ‘addition’
PipelineFile
instances- Returns
default addition publish type
- Return type
aodncore.pipeline.common.PipelinePublishType
- property default_deletion_publish_type
Property to manage attribute which determines the default publish type assigned to ‘deletion’
PipelineFile
instances- Returns
default deletion publish type
- Return type
aodncore.pipeline.common.PipelinePublishType
- property error
Read-only property to access
Exception
object from handler instance- Returns
the exception object which caused the handler to fail (if applicable)
- Return type
Exception
,None
- property error_details
Read-only property to retrieve string description of error from handler instance
- Returns
error description string (if applicable)
- Return type
str
,None
- property exclude_regexes
Property to manage exclude_regexes attribute
- Returns
- Return type
list
- property file_basename
Read-only property to access the
input_file
basename- Returns
input_file
basename- Return type
str
- property file_checksum
Read-only property to access the
input_file
checksum- Returns
input_file
checksum string- Return type
str
- property file_collection
Read-only property to access the handler’s primary PipelineFileCollection instance
- Returns
handler file collection
- Return type
PipelineFileCollection
- property file_extension
Read-only property to access the
input_file
extension- Returns
input_file
extension string- Return type
str
- property file_type
Read-only property to access the
input_file
type- Returns
input_file
type- Return type
FileType
- property include_regexes
Property to manage include_regexes attribute
- Returns
- Return type
list
- property input_file_archive_path
Property used to determine the archive path for the original input file
- Returns
string containing the archive path
- Return type
str
- property input_file_object
Read-only property to access the original input file represented as a PipelineFile object
- Returns
input file object
- Return type
PipelineFile
- property instance_working_directory
Read-only property to retrieve the instance working directory
- Returns
string containing path to top level working directory for this instance
- Return type
str
,None
- property logger
Read-only property to access the instance Logger
- Returns
Logger
- property notification_results
Read-only property to retrieve the notification results, including the sent status of each recipient
- Returns
list of
aodncore.pipeline.steps.notify.NotifyRecipient
instances- Return type
- ordered_states = ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED']
- ordered_transitions = [{'trigger': '_trigger_initialise', 'source': 'HANDLER_INITIAL', 'dest': 'HANDLER_INITIALISED', 'before': '_initialise'}, {'trigger': '_trigger_resolve', 'source': 'HANDLER_INITIALISED', 'dest': 'HANDLER_RESOLVED', 'before': '_resolve'}, {'trigger': '_trigger_preprocess', 'source': 'HANDLER_RESOLVED', 'dest': 'HANDLER_PREPROCESSED', 'before': 'preprocess'}, {'trigger': '_trigger_check', 'source': 'HANDLER_PREPROCESSED', 'dest': 'HANDLER_CHECKED', 'before': '_check'}, {'trigger': '_trigger_process', 'source': 'HANDLER_CHECKED', 'dest': 'HANDLER_PROCESSED', 'before': 'process'}, {'trigger': '_trigger_publish', 'source': 'HANDLER_PROCESSED', 'dest': 'HANDLER_PUBLISHED', 'before': '_publish'}, {'trigger': '_trigger_postprocess', 'source': 'HANDLER_PUBLISHED', 'dest': 'HANDLER_POSTPROCESSED', 'before': 'postprocess'}]
- other_states = ['HANDLER_NOTIFIED_SUCCESS', 'HANDLER_NOTIFIED_ERROR', 'HANDLER_COMPLETED_SUCCESS', 'HANDLER_COMPLETED_ERROR']
- other_transitions = [{'trigger': '_trigger_notify_success', 'source': 'HANDLER_POSTPROCESSED', 'dest': 'HANDLER_NOTIFIED_SUCCESS', 'before': '_notify_success'}, {'trigger': '_trigger_notify_error', 'source': ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED'], 'dest': 'HANDLER_NOTIFIED_ERROR', 'before': '_notify_error'}, {'trigger': '_trigger_complete_success', 'source': 'HANDLER_NOTIFIED_SUCCESS', 'dest': 'HANDLER_COMPLETED_SUCCESS', 'before': '_complete_success'}, {'trigger': '_trigger_complete_with_errors', 'source': 'HANDLER_NOTIFIED_ERROR', 'dest': 'HANDLER_COMPLETED_ERROR', 'before': '_complete_with_errors'}]
- postprocess()[source]
Method designed to be overridden by child handlers in order to execute code between publish and notify steps
- Returns
None
- preprocess()[source]
Method designed to be overridden by child handlers in order to execute code between resolve and check steps
- Returns
None
- process()[source]
Method designed to be overridden by child handlers in order to execute code between check and publish steps
- Returns
None
- property products_dir
Temporary subdirectory in which products may be created
- Returns
products subdirectory of instance working directory
- Return type
str
,None
- property result
Read-only property to retrieve the overall end result of the handler instance
- Returns
handler result
- Return type
- run()[source]
The entry point to the handler instance. Executes the automatic state machine transitions, and populates the
result
attribute to signal success or failure of the handler instance.
- property should_notify
Read-only property to retrieve the list of intended recipients after being assembled based on
notify_params
- Returns
list of intended recipients
- Return type
list
- property start_time
Read-only property containing the timestamp of when this instance was created
- Returns
timestamp of handler starting time
- Return type
datetime.datetime
- property state_query
Read-only property containing an initialised StateQuery instance, for querying existing pipeline state
- Returns
StateQuery instance
- Return type
StateQuery
- property temp_dir
Temporary subdirectory where any other arbitrary temporary files may be created by handler sub-classes
- Returns
temporary subdirectory of instance working directory
- Return type
str
,None
- property versions
Read-only property to access module versions
- Returns
module version strings for aodncore, aodndata and compliance checker modules
- Return type
dict
aodncore.pipeline.log module
This module provides logging related objects for dynamically constructing logging configurations and retrieving
configured LoggerAdapter
/Logger
instances
- class aodncore.pipeline.log.WorkerLoggingConfigBuilder(pipeline_config)[source]
Bases:
object
- add_watch_config(name, formatter='pipeline_formatter', level=None)[source]
Add logging configuration for a single pipeline watch
- Parameters
name – name of the pipeline for which the config is being generated
formatter – name of the formatter to use for handler
level – logging level
- Returns
dict containing handlers/loggers for a single watch
- aodncore.pipeline.log.get_pipeline_logger(name, extra=None, logger_function=<function getLogger>)[source]
Return a logger adapter prepared with given extra metadata and SYSINFO logging level
- Parameters
name – logger name
extra – extra dict to pass to
LoggerAdapter
logger_function – factory function which accepts a logger name and returns a
Logger
instance
- Returns
Logger
instance
- aodncore.pipeline.log.get_watchservice_logging_config(pipeline_config)[source]
Generate logging configuration for the ‘watchservice’ service, suitable for use by
logging.config.dictConfig()
- Parameters
pipeline_config –
LazyConfigManager.pipeline_config
instance- Returns
rendered watchservice logging config
aodncore.pipeline.schema module
This module holds schema definitions for validating the various dicts
which are used through the
pipeline, and also the helper functions necessary to validate an object against their respective schema.
aodncore.pipeline.serviceconfig module
aodncore.pipeline.statequery module
- class aodncore.pipeline.statequery.StateQuery(storage_broker, wfs_broker)[source]
Bases:
object
Simple state query interface, to provide user friendly access for querying existing Pipeline state
- download(remotepipelinefilecollection, local_path)[source]
Helper method to download a RemotePipelineFileCollection or RemotePipelineFile
- Parameters
remotepipelinefilecollection – RemotePipelineFileCollection to download
local_path – local path where files will be downloaded.
- Returns
None
- query_storage(query)[source]
Query the storage backend and return existing files matching the given query
- Parameters
query – S3-style prefix for filtering query results
- Returns
RemotePipelineFileCollection of files matching the prefix
- query_wfs_file_exists(layer, name)[source]
Returns a bool representing whether a given ‘file_url’ is present in a layer
- Parameters
layer – layer name supplied to GetFeature typename parameter
name – ‘file_url’ inserted into OGC filter, and supplied to GetFeature filter parameter
- Returns
whether the given file is present in the layer
- query_wfs_files(layer, **kwargs)[source]
Return a RemotePipelineFileCollection containing all files for a given layer, or files matching the filter specified in the kwarg ogc_expression (of type OgcExpression)
- Parameters
layer – layer name supplied to GetFeature typename parameter
kwargs – keyword arguments passed to underlying broker method
- Returns
RemotePipelineFileCollection containing list of files for the layer
- query_wfs_getfeature_dict(layer=None, **kwargs)[source]
Make a GetFeature request, and return the response in a native dict
- Parameters
layer – layer name supplied to GetFeature typename parameter
kwargs – keyword arguments passed to the underlying WebFeatureService.getfeature method
- Returns
dict containing the parsed GetFeature response
- property wfs
Read-only property to access the instantiated WebFeatureService object
- Returns
WebFeatureService instance
aodncore.pipeline.storage module
- class aodncore.pipeline.storage.LocalFileStorageBroker(prefix)[source]
Bases:
BaseStorageBroker
StorageBroker to interact with a local directory
- class aodncore.pipeline.storage.S3StorageBroker(bucket, prefix)[source]
Bases:
BaseStorageBroker
StorageBroker to interact with an S3
- Note: this does not and should not attempt to support any authentication code. Multiple mechanisms for loading
credentials are far more appropriately handled directly by the boto3, and it is expected that the credentials are supplied using one of these mechanisms by the environment (e.g. deployed from configuration management, set as environment variables etc.)
Refer: http://boto3.readthedocs.io/en/latest/guide/configuration.html
- retry_kwargs = {'backoff': 2, 'delay': 5, 'exceptions': (<class 'botocore.exceptions.ClientError'>, <class 'botocore.exceptions.ConnectionError'>, <class 'http.client.IncompleteRead'>, <class 'ssl.SSLError'>), 'tries': 3}
- class aodncore.pipeline.storage.SftpStorageBroker(server, prefix)[source]
Bases:
BaseStorageBroker
StorageBroker to interact with a directory on a remote SFTP server
Note: similar to the S3 storage broker, this does not implement any authentication code, as this is better handled by the environment in the form of public key authentication
- aodncore.pipeline.storage.get_storage_broker(store_url)[source]
Factory function to return appropriate storage broker class based on URL scheme
- Parameters
store_url – URL base
- Returns
BaseStorageBroker sub-class
- aodncore.pipeline.storage.sftp_makedirs(sftpclient, name, mode=493)[source]
- Recursively create a directory path on a remote SFTP server
Based on os.makedirs, with local calls replaced with SFTPClient equivalents calls.
- Parameters
sftpclient – SFTPClient object
name – directory path to create
mode – permissions for the newly created directory
- Returns
None
- aodncore.pipeline.storage.sftp_mkdir_p(sftpclient, name, mode=493)[source]
- Replicate ‘mkdir -p’ shell command behaviour by wrapping sftp_makedirs and suppressing exceptions where the
directory already exists.
- Parameters
sftpclient – SFTPClient object
name – directory path to create
mode – permissions for the newly created directory
- Returns
None
- aodncore.pipeline.storage.sftp_path_exists(sftpclient, path)[source]
Test whether a path exists on a remote SFTP server
- Parameters
sftpclient – SFTPClient object
path – path to test for existence
- Returns
True if the path exists, False if not
- aodncore.pipeline.storage.validate_storage_broker(o)
aodncore.pipeline.watch module
This module provides the code to implement the “watchservice” component of the pipeline.
This includes setting up directory watches, handling incoming inotify events, defining the Celery tasks and routing/queuing events.
The watchservice itself is designed as an executable module, with
the entry point being the aodncore.pipeline.watchservice
module.
This means that once aodncore
is installed, running the
watchservice is simply a matter of running the following:
python -m aodncore.pipeline.watchservice
This is typically run as an operating system service by something like supervisord, but can be run from the command-line for debugging.
- class aodncore.pipeline.watch.CeleryConfig(routes=None)[source]
Bases:
object
- accept_content = ['json']
- broker_url = 'amqp://'
- result_serializer = 'json'
- task_routes = {}
- task_serializer = 'json'
- worker_max_tasks_per_child = 1
- class aodncore.pipeline.watch.CeleryContext(application, config, celeryconfig)[source]
Bases:
object
- property application
Return the configured Celery application instance
- Returns
Celery application instance with config applied and tasks registered
- class aodncore.pipeline.watch.IncomingFileEventHandler(config)[source]
Bases:
ProcessEvent
- process_default(event)[source]
Default processing event method. By default does nothing. Subclass ProcessEvent and redefine this method in order to modify its behavior.
- @param event: Event to be processed. Can be of any type of events but
IN_Q_OVERFLOW events (see method process_IN_Q_OVERFLOW).
@type event: Event instance
- queue_task(directory, pathname, event_id=None)[source]
Add a task to the queue corresponding with the given directory, handling the given file
- Parameters
directory – the watched directory
pathname – the fully qualified path to the file which triggered the event
event_id – UUID to identify this event in log files (will be generated if not present)
- Returns
None
- class aodncore.pipeline.watch.IncomingFileStateManager(input_file, pipeline_name, config, logger, celery_request, error_exit_policies=None, success_exit_policies=None, error_broker=None)[source]
Bases:
object
- property basename
- property error_broker
- error_mode = 436
- property error_name
- property error_uri
- property incoming_dir
- property processing_dir
- processing_mode = 292
- property processing_path
- property relative_path
- states = ['FILE_IN_INCOMING', 'FILE_IN_PROCESSING', 'FILE_IN_ERROR', 'FILE_SUCCESS']
- transitions = [{'trigger': 'move_to_processing', 'source': 'FILE_IN_INCOMING', 'dest': 'FILE_IN_PROCESSING', 'before': ['_pre_processing_checks', '_move_to_processing']}, {'trigger': 'move_to_error', 'source': 'FILE_IN_PROCESSING', 'dest': 'FILE_IN_ERROR', 'before': ['_run_error_callbacks', '_move_to_error']}, {'trigger': 'move_to_success', 'source': 'FILE_IN_PROCESSING', 'dest': 'FILE_SUCCESS', 'before': '_run_success_callbacks', 'after': '_remove_processing_file'}]
- class aodncore.pipeline.watch.WatchServiceContext(config)[source]
Bases:
object
Class to create instances required for WatchServiceManager
aodncore.pipeline.watchservice module
Module contents
- class aodncore.pipeline.CheckResult(compliant, log, errors=False)[source]
Bases:
object
Simple container class to hold a check result
- property compliant
- property errors
- property log
- class aodncore.pipeline.FileClassifier[source]
Bases:
object
Base class for working out where a file should be published.
- class aodncore.pipeline.FileType(value)[source]
Bases:
Enum
Represents each known file type, including extensions, mime type and validator function
Each enum member may have it’s attributes accessed by name when required for comparisons and filtering, e.g.
lookup the extension and mime-type for PNG file types in general:
FileType.PNG.extensions ('.png',) FileType.PNG.mime_type 'image/png'
assign a type attribute to an object, and query the type-specific values directly from the object:
class Object(object): pass o = Object() o.file_type = FileType.JPEG o.file_type.extensions ('.jpg', '.jpeg') o.file_type.mime_type 'image/jpeg'
The function referenced by validator must accept a single parameter, which is the path to file being checked, and return True if the path represents a valid file of that type. For example, this would typically attempt to open/parse/read the file using a corresponding library (e.g. an NC file can be opened as a valid Dataset by the NetCDF library, a ZIP file can be read using the zipfile module etc.). If
- CSV = (('.csv',), 'text/csv', <function is_nonempty_file>)
- DELETE_MANIFEST = (('.delete_manifest',), 'text/csv', <function is_nonempty_file>)
- DIR_MANIFEST = (('.dir_manifest',), 'text/plain', <function is_nonempty_file>)
- GZIP = (('.gz',), 'application/gzip', <function is_gzip_file>)
- JPEG = (('.jpg', '.jpeg'), 'image/jpeg', <function is_jpeg_file>)
- JSON_MANIFEST = (('.json_manifest',), 'application/json', <function is_json_file>)
- MAP_MANIFEST = (('.map_manifest',), 'text/csv', <function is_nonempty_file>)
- NETCDF = (('.nc',), 'application/octet-stream', <function is_netcdf_file>)
- PDF = (('.pdf',), 'application/pdf', <function is_pdf_file>)
- PNG = (('.png',), 'image/png', <function is_png_file>)
- RSYNC_MANIFEST = (('.rsync_manifest',), 'text/plain', <function is_nonempty_file>)
- SIMPLE_MANIFEST = (('.manifest',), 'text/plain', <function is_nonempty_file>)
- TIFF = (('.tif', '.tiff'), 'image/tiff', <function is_tiff_file>)
- UNKNOWN = ((), None, <function is_nonempty_file>)
- ZIP = (('.zip',), 'application/zip', <function is_zip_file>)
- extensions
- property is_image_type
Read-only boolean property indicating whether the file type instance is an image type.
- is_type(value)[source]
Check whether the type (i.e. the information before the slash) equals the given value
- Parameters
value – type to compare against
- Returns
True if this type matches the given value, otherwise False
- mime_type
- validator
- class aodncore.pipeline.HandlerBase(input_file, allowed_archive_path_regexes=None, allowed_dest_path_regexes=None, allowed_extensions=None, allowed_regexes=None, archive_input_file=False, archive_path_function=None, celery_task=None, check_params=None, config=None, custom_params=None, dest_path_function=None, error_cleanup_regexes=None, exclude_regexes=None, harvest_params=None, harvest_type='talend', include_regexes=None, notify_params=None, upload_path=None, resolve_params=None)[source]
Bases:
object
Base class for pipeline handler sub-classes.
Implements common handler methods and defines state machine states and transitions.
- Parameters
input_file (str) –
Path to the file being handled. A non-existent file will cause the handler to exit with an error during the initialise step.
Note
input_file
is the only positional argument. Other arguments may be provided in any order.allowed_archive_path_regexes (list) – List of allowed regular expressions of which
PipelineFile.archive_path
must match at least one. If any non-matching values are found, the handler will exit with an error during the publish step before publishing anything.allowed_dest_path_regexes (list) – List of allowed regular expressions of which
PipelineFile.dest_path
must match at least one. If any non-matching values are found, the handler will exit with an error during the publish step before publishing anything.allowed_extensions (list) – List of allowed extensions for
input_file
. Non-matching input files will cause the handler to exit with an error during the initialise step.allowed_regexes (list) –
List of allowed regular expressions for
input_file
. Non-matching input files will cause the handler to exit with an error during the initialise step.Note
allowed_regexes
are checked afterallowed_extensions
archive_input_file (bool) – Flags whether the original input file should be uploaded to the archive, the location of which is configured by the environment configuration. The file will be archived at ARCHIVE_URI/PIPELINE_NAME/BASENAME.
archive_path_function (str, function) – See
dest_path_function
. This operates identically, except that it is used to calculate thePipelineFile.archive_path
attribute and that the path is relative to the ARCHIVE_URI.celery_task (
celery.Task
) –A Celery task object, in order for the handler instance to derive runtime information such as the current task name and UUID.
Note
If absent (e.g. when unit testing), the handler will revert to having no task information available, and will log output to standard output.
check_params (
dict
) – A dict containing parameters passed directly to the check step (e.g. compliance checker suites). The structure of the dict is defined by theCHECK_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.config (
aodncore.pipeline.config.LazyConfigManager
) –A configuration object which the handler uses to retrieve configuration from it’s environment. If absent, the handler will exit with an error during the
__init__()
method (i.e. will not instantiate).Note
While this attribute is mandatory, it is not generally required to supply it directly in normal use cases, unless instantiating the handler class manually.
When deployed, the parameter is automatically included by the worker service configuration.
When testing, unit tests inheriting from
HandlerTestCase
contain a pre-prepared config object available asself.config
. TheHandlerTestCase.run_handler()
andHandlerTestCase.run_handler_with_exception()
helper methods automatically assign the test config to the handler being tested.custom_params (
dict
) – A dict containing parameters which are ignored by the base class, but allow passing arbitrary custom values to subclasses. The structure of the dict is defined by theCUSTOM_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.dest_path_function (
str
,callable
) –The function used to determine the
PipelineFile.dest_path
attribute, relative to the UPLOAD_URI configuration item. If absent, the handler will attempt to use thedest_path()
method in the handler itself. If a function is not found by either mechanism, the handler will exit with an error during the initialise step.Note
When the value is a string, it is assumed that it refers to the name of a function advertised in the pipeline.handlers entry point group.
error_cleanup_regexes (
list
) – A list of regular expressions which, when a cleanup policy of DELETE_CUSTOM_REGEXES_FROM_ERROR_STORE is set, controls which files are deleted from the error store upon successful execution of the handler instanceexclude_regexes (
list
) – Seeinclude_regexes
.harvest_params (
dict
) – A dict containing parameters passed directly to the harvest step (e.g. slice size, undo behaviour). The structure of the dict is defined by theHARVEST_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.harvest_type (
str
) –String to inform the
aodncore.pipeline.steps.harvest
step factory function which HarvesterRunner implementation to use during the publish step.Note
Currently the only valid value is ‘talend’, which is the default.
include_regexes (list) –
A list of regexes which, when combined with
exclude_regexes
, determines which files in the collection are assigned with thedefault_addition_publish_type
ordefault_deletion_publish_type
types (depending on whether the file is an addition or a deletion). If set, to be considered included, file paths must match one of the regexes ininclude_regexes
but not any of the regexes inexclude_regexes
.Files not matching the inclusion criteria will remain with a
publish_type
attribute ofPipelineFilePublishType.NO_ACTION
, meaning they will be ignored by the publish step.Note
If omitted, the default is to select all files in
file_collection
for publication.Note
This relates only to the files in
file_collection
, and has no relation to theinput_file
path, unless the input file is itself in the collection (e.g. when handling a single file).For example, a single ‘.nc’ file could feasibly match the
allowed_extensions
for the handler, but still be excluded by this mechanism once it is added tofile_collection
during theaodncore.pipeline.steps.resolve
step.notify_params (
dict
) – A dict containing parameters passed directly to theaodncore.pipeline.steps.notify
step (e.g. owner/success/failure notify lists). The structure of the dict is defined by theNOTIFY_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.upload_path (
str
) –A string attribute to hold the original upload path of the
input_file
.Note
This is intended for information purposes only (e.g. to appear in notification templates), since there is a distinction between the original path, and the
input_file
as provided to the handler, which represents where the file was moved to for processing.resolve_params (
dict
) – A dict containing parameters passed directly to the resolve step (e.g. the root path prepended to relative paths in manifest files). The structure of the dict is defined by theRESOLVE_PARAMS_SCHEMA
object in theaodncore.pipeline.schema
module.
- add_to_collection(pipeline_file, **kwargs)[source]
Helper method to add a PipelineFile object or path to the handler’s file_collection, with the handler instance’s file_update_callback method assigned
Note: as this is a wrapper to the PipelineFileCollection.add method, kwargs are only applied when pipeline_file is a path (string). When adding an existing PipelineFile object, the object is added “as-is”, and attributes must be set explicitly on the object.
- Parameters
pipeline_file –
PipelineFile
or file pathkwargs – keyword arguments passed through to the PipelineFileCollection.add method
- Returns
None
- all_states = ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED', 'HANDLER_NOTIFIED_SUCCESS', 'HANDLER_NOTIFIED_ERROR', 'HANDLER_COMPLETED_SUCCESS', 'HANDLER_COMPLETED_ERROR']
- all_transitions = [{'trigger': '_trigger_initialise', 'source': 'HANDLER_INITIAL', 'dest': 'HANDLER_INITIALISED', 'before': '_initialise'}, {'trigger': '_trigger_resolve', 'source': 'HANDLER_INITIALISED', 'dest': 'HANDLER_RESOLVED', 'before': '_resolve'}, {'trigger': '_trigger_preprocess', 'source': 'HANDLER_RESOLVED', 'dest': 'HANDLER_PREPROCESSED', 'before': 'preprocess'}, {'trigger': '_trigger_check', 'source': 'HANDLER_PREPROCESSED', 'dest': 'HANDLER_CHECKED', 'before': '_check'}, {'trigger': '_trigger_process', 'source': 'HANDLER_CHECKED', 'dest': 'HANDLER_PROCESSED', 'before': 'process'}, {'trigger': '_trigger_publish', 'source': 'HANDLER_PROCESSED', 'dest': 'HANDLER_PUBLISHED', 'before': '_publish'}, {'trigger': '_trigger_postprocess', 'source': 'HANDLER_PUBLISHED', 'dest': 'HANDLER_POSTPROCESSED', 'before': 'postprocess'}, {'trigger': '_trigger_notify_success', 'source': 'HANDLER_POSTPROCESSED', 'dest': 'HANDLER_NOTIFIED_SUCCESS', 'before': '_notify_success'}, {'trigger': '_trigger_notify_error', 'source': ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED'], 'dest': 'HANDLER_NOTIFIED_ERROR', 'before': '_notify_error'}, {'trigger': '_trigger_complete_success', 'source': 'HANDLER_NOTIFIED_SUCCESS', 'dest': 'HANDLER_COMPLETED_SUCCESS', 'before': '_complete_success'}, {'trigger': '_trigger_complete_with_errors', 'source': 'HANDLER_NOTIFIED_ERROR', 'dest': 'HANDLER_COMPLETED_ERROR', 'before': '_complete_with_errors'}]
- property celery_task_id
Read-only property to access Celery task ID
- Returns
Celery task ID (if applicable)
- Return type
str
,None
- property celery_task_name
Read-only property to access Celery task name
- Returns
Celery task name (if applicable)
- Return type
str
,None
- property collection_dir
Temporary subdirectory where the initial input file collection will be unpacked
Warning
Any new files created during the handler execution (i.e. were not in the original input file) should be created in
self.products_dir
rather than here.- Returns
collection subdirectory of instance working directory (as populated by
aodncore.pipeline.steps.resolve
step)- Return type
str
,None
- property config
Property to access the
config
attribute- Returns
configuration object
- Return type
aodncore.pipeline.config.LazyConfigManager
- property default_addition_publish_type
Property to manage attribute which determines the default publish type assigned to ‘addition’
PipelineFile
instances- Returns
default addition publish type
- Return type
aodncore.pipeline.common.PipelinePublishType
- property default_deletion_publish_type
Property to manage attribute which determines the default publish type assigned to ‘deletion’
PipelineFile
instances- Returns
default deletion publish type
- Return type
aodncore.pipeline.common.PipelinePublishType
- property error
Read-only property to access
Exception
object from handler instance- Returns
the exception object which caused the handler to fail (if applicable)
- Return type
Exception
,None
- property error_details
Read-only property to retrieve string description of error from handler instance
- Returns
error description string (if applicable)
- Return type
str
,None
- property exclude_regexes
Property to manage exclude_regexes attribute
- Returns
- Return type
list
- property file_basename
Read-only property to access the
input_file
basename- Returns
input_file
basename- Return type
str
- property file_checksum
Read-only property to access the
input_file
checksum- Returns
input_file
checksum string- Return type
str
- property file_collection
Read-only property to access the handler’s primary PipelineFileCollection instance
- Returns
handler file collection
- Return type
- property file_extension
Read-only property to access the
input_file
extension- Returns
input_file
extension string- Return type
str
- property file_type
Read-only property to access the
input_file
type- Returns
input_file
type- Return type
- property include_regexes
Property to manage include_regexes attribute
- Returns
- Return type
list
- property input_file_archive_path
Property used to determine the archive path for the original input file
- Returns
string containing the archive path
- Return type
str
- property input_file_object
Read-only property to access the original input file represented as a PipelineFile object
- Returns
input file object
- Return type
- property instance_working_directory
Read-only property to retrieve the instance working directory
- Returns
string containing path to top level working directory for this instance
- Return type
str
,None
- property logger
Read-only property to access the instance Logger
- Returns
Logger
- property notification_results
Read-only property to retrieve the notification results, including the sent status of each recipient
- Returns
list of
aodncore.pipeline.steps.notify.NotifyRecipient
instances- Return type
- ordered_states = ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED']
- ordered_transitions = [{'trigger': '_trigger_initialise', 'source': 'HANDLER_INITIAL', 'dest': 'HANDLER_INITIALISED', 'before': '_initialise'}, {'trigger': '_trigger_resolve', 'source': 'HANDLER_INITIALISED', 'dest': 'HANDLER_RESOLVED', 'before': '_resolve'}, {'trigger': '_trigger_preprocess', 'source': 'HANDLER_RESOLVED', 'dest': 'HANDLER_PREPROCESSED', 'before': 'preprocess'}, {'trigger': '_trigger_check', 'source': 'HANDLER_PREPROCESSED', 'dest': 'HANDLER_CHECKED', 'before': '_check'}, {'trigger': '_trigger_process', 'source': 'HANDLER_CHECKED', 'dest': 'HANDLER_PROCESSED', 'before': 'process'}, {'trigger': '_trigger_publish', 'source': 'HANDLER_PROCESSED', 'dest': 'HANDLER_PUBLISHED', 'before': '_publish'}, {'trigger': '_trigger_postprocess', 'source': 'HANDLER_PUBLISHED', 'dest': 'HANDLER_POSTPROCESSED', 'before': 'postprocess'}]
- other_states = ['HANDLER_NOTIFIED_SUCCESS', 'HANDLER_NOTIFIED_ERROR', 'HANDLER_COMPLETED_SUCCESS', 'HANDLER_COMPLETED_ERROR']
- other_transitions = [{'trigger': '_trigger_notify_success', 'source': 'HANDLER_POSTPROCESSED', 'dest': 'HANDLER_NOTIFIED_SUCCESS', 'before': '_notify_success'}, {'trigger': '_trigger_notify_error', 'source': ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED'], 'dest': 'HANDLER_NOTIFIED_ERROR', 'before': '_notify_error'}, {'trigger': '_trigger_complete_success', 'source': 'HANDLER_NOTIFIED_SUCCESS', 'dest': 'HANDLER_COMPLETED_SUCCESS', 'before': '_complete_success'}, {'trigger': '_trigger_complete_with_errors', 'source': 'HANDLER_NOTIFIED_ERROR', 'dest': 'HANDLER_COMPLETED_ERROR', 'before': '_complete_with_errors'}]
- postprocess()[source]
Method designed to be overridden by child handlers in order to execute code between publish and notify steps
- Returns
None
- preprocess()[source]
Method designed to be overridden by child handlers in order to execute code between resolve and check steps
- Returns
None
- process()[source]
Method designed to be overridden by child handlers in order to execute code between check and publish steps
- Returns
None
- property products_dir
Temporary subdirectory in which products may be created
- Returns
products subdirectory of instance working directory
- Return type
str
,None
- property result
Read-only property to retrieve the overall end result of the handler instance
- Returns
handler result
- Return type
- run()[source]
The entry point to the handler instance. Executes the automatic state machine transitions, and populates the
result
attribute to signal success or failure of the handler instance.
- property should_notify
Read-only property to retrieve the list of intended recipients after being assembled based on
notify_params
- Returns
list of intended recipients
- Return type
list
- property start_time
Read-only property containing the timestamp of when this instance was created
- Returns
timestamp of handler starting time
- Return type
datetime.datetime
- property state_query
Read-only property containing an initialised StateQuery instance, for querying existing pipeline state
- Returns
StateQuery instance
- Return type
StateQuery
- property temp_dir
Temporary subdirectory where any other arbitrary temporary files may be created by handler sub-classes
- Returns
temporary subdirectory of instance working directory
- Return type
str
,None
- property versions
Read-only property to access module versions
- Returns
module version strings for aodncore, aodndata and compliance checker modules
- Return type
dict
- class aodncore.pipeline.HandlerResult(value)[source]
Bases:
Enum
An enumeration.
- ERROR = 2
- SUCCESS = 1
- UNKNOWN = 0
- class aodncore.pipeline.NotificationRecipientType(value)[source]
Bases:
Enum
Notification recipient type
- EMAIL = ('email', <function is_valid_email_address>, 'invalid email address')
- INVALID = ('invalid', <function NotificationRecipientType.<lambda>>, 'invalid protocol')
- SNS = ('sns', <function NotificationRecipientType.<lambda>>, 'invalid SNS topic')
- property address_validation_function
- property error_string
- property protocol
- class aodncore.pipeline.PipelineFile(local_path, name=None, archive_path=None, dest_path=None, is_deletion=False, late_deletion=False, file_update_callback=None, check_type=None, publish_type=None)[source]
Bases:
PipelineFileBase
Represents a single file in order to store state information relating to the intended actions to be performed on the file, and the actions that were performed on the file
- Parameters
local_path (
str
) – absolute source path to the file being representedname (
str
) – arbitrary name (defaults to the output ofos.path.basename()
on local_path)archive_path (
str
) – relative path used when archiving the filedest_path (
str
) – relative path used when publishing the fileis_deletion (
bool
) – flag designating whether this is a deletionlate_deletion (
bool
) – flag to indicate that this file should be deleted after additions are performed (note: ignored if is_deletion=False)file_update_callback (
callable
) – optional callback to call when a file property is updatedcheck_type (PipelineFileCheckType) – check type assigned to the file
publish_type (PipelineFilePublishType) – publish type assigned to the file
- property archive_path
- property check_log
- property check_passed
- property check_result
- property check_type
- property dest_path
- property file_checksum
- property file_update_callback
- classmethod from_remotepipelinefile(remotepipelinefile, name=None, is_deletion=False, late_deletion=False, file_update_callback=None, check_type=None, publish_type=None)[source]
Construct a PipelineFile instance from an existing RemotePipelineFile instance
- Parameters
remotepipelinefile – RemotePipelineFile instance used to instantiate a PipelineFile instance
name – name flag passed to __init__ (defaults to remotepipelinefile.name)
is_deletion – is_deletion flag passed to __init__
late_deletion – late_deletion flag passed to __init__
file_update_callback – file_update_callback flag passed to __init__
check_type – check_type flag passed to __init__
publish_type – publish_type flag passed to __init__
- Returns
PipelineFile instance
- property is_archived
- property is_checked
- property is_deleted
- property is_deletion
- property is_harvest_undone
- property is_harvested
- property is_overwrite
- property is_stored
- property is_upload_undone
- property is_uploaded
- property late_deletion
- property mime_type
- property pending_archive
- property pending_harvest
- property pending_harvest_addition
- property pending_harvest_deletion
- property pending_harvest_early_deletion
- property pending_harvest_late_deletion
- property pending_harvest_undo
- property pending_store
- property pending_store_addition
- property pending_store_deletion
- property pending_store_undo
- property pending_undo
- property publish_type
- property published
- property should_archive
- property should_harvest
- property should_store
- property should_undo
- property src_path
- class aodncore.pipeline.PipelineFileCheckType(value)[source]
Bases:
Enum
Each
PipelineFile
may individually specify which checks are performed against it- FORMAT_CHECK = 3
- NC_COMPLIANCE_CHECK = 4
- NONEMPTY_CHECK = 2
- NO_ACTION = 1
- TABLE_SCHEMA_CHECK = 5
- UNSET = 0
- all_checkable_types = {<PipelineFileCheckType.FORMAT_CHECK: 3>, <PipelineFileCheckType.NONEMPTY_CHECK: 2>, <PipelineFileCheckType.TABLE_SCHEMA_CHECK: 5>, <PipelineFileCheckType.NC_COMPLIANCE_CHECK: 4>}
- all_settable_types = {<PipelineFileCheckType.NONEMPTY_CHECK: 2>, <PipelineFileCheckType.NO_ACTION: 1>, <PipelineFileCheckType.NC_COMPLIANCE_CHECK: 4>, <PipelineFileCheckType.FORMAT_CHECK: 3>, <PipelineFileCheckType.TABLE_SCHEMA_CHECK: 5>}
- class aodncore.pipeline.PipelineFileCollection(data=None, validate_unique=True)[source]
Bases:
PipelineFileCollectionBase
A PipelineFileCollectionBase subclass to hold a set of PipelineFile instances
- add(pipeline_file, is_deletion=False, overwrite=False, validate_unique=True, **kwargs)[source]
Add a file to the collection
- Parameters
pipeline_file –
PipelineFile
or file pathkwargs –
dict
additional keywords passed to to PipelineFileBase __init__ methodoverwrite –
bool
which, if True, will overwrite an existing matching file in the collectionvalidate_unique –
bool
which, if True, will validate unique attributes when adding the file
- Returns
bool
which indicates whether the file was successfully added
- classmethod from_remotepipelinefilecollection(remotepipelinefilecollection, are_deletions=False)[source]
- member_class[source]
alias of
PipelineFile
- property member_from_string_method
- member_validator()
- set_archive_paths(archive_path_function)[source]
Set archive_path attributes for each file in the collection
- Parameters
archive_path_function – function used to determine archive destination path
- Returns
None
- set_bool_attribute(attribute, value)[source]
Set a
bool
attribute for each file in the collection- Parameters
attribute – attribute to set
value – value to set the attribute
- Returns
None
- set_check_types(check_type)[source]
Set check_type attributes for each file in the collection
- Parameters
check_type –
PipefileFileCheckType
enum member- Returns
None
- set_default_check_types(check_params=None)[source]
Set check_type attribute for each file in the collection to the default value, based on the file type and presence of compliance checker checks in the check parameters
- Parameters
check_params –
dict
or None- Returns
None
- set_dest_paths(dest_path_function)[source]
Set dest_path attributes for each file in the collection
- Parameters
dest_path_function – function used to determine publishing destination path
- Returns
None
- set_file_update_callback(file_update_callback)[source]
Set a callback function in each
PipelineFile
in this collection- Parameters
file_update_callback – callback (function)
- Returns
None
- set_publish_types(publish_type)[source]
Set publish_type attributes for each file in the collection
- Parameters
publish_type –
PipefileFilePublishType
enum member- Returns
None
- set_publish_types_from_regexes(include_regexes, exclude_regexes, addition_type, deletion_type)[source]
Set publish_type attribute for each file in the collection depending on whether it is considered “included” according to the regex parameters
- Parameters
include_regexes – regex(es) for which a file must match one or more to be included
exclude_regexes – regex(es) which will exclude an already included file
addition_type –
PipefileFilePublishType
enum member set for included addition filesdeletion_type –
PipefileFilePublishType
enum member set for included deletion files
- Returns
None
- set_string_attribute(attribute, value)[source]
Set a string attribute for each file in the collection
- Parameters
attribute – attribute to set
value – value to set the attribute
- Returns
None
- unique_attributes = ('archive_path', 'dest_path')
- class aodncore.pipeline.PipelineFilePublishType(value)[source]
Bases:
Enum
Each
PipelineFile
may individually specify which combination of archive/upload/harvest actions must occur before it is considered “published”Enum member values are a tuple containing boolean flags used for querying/validating types, which are provided to the
__init__()
for each member (since confusingly, eachEnum
member is an instance of this class).Each member’s valid is therefore a tuple of
bool
values representing the following flags:(is_addition_type, is_deletion_type, is_archive_type, is_store_type, is_harvest_type)
- ARCHIVE_ONLY = (True, False, True, False, False)
- DELETE_ONLY = (False, True, False, True, False)
- DELETE_UNHARVEST = (False, True, False, True, True)
- HARVEST_ARCHIVE = (True, False, True, False, True)
- HARVEST_ARCHIVE_UPLOAD = (True, False, True, True, True)
- HARVEST_ONLY = (True, False, False, False, True)
- HARVEST_UPLOAD = (True, False, False, True, True)
- NO_ACTION = (True, True, False, False, False)
- UNHARVEST_ONLY = (False, True, False, False, True)
- UNSET = (None, None, None, None, None)
- UPLOAD_ONLY = (True, False, False, True, False)
- all_addition_types = {<PipelineFilePublishType.ARCHIVE_ONLY: (True, False, True, False, False)>, <PipelineFilePublishType.NO_ACTION: (True, True, False, False, False)>, <PipelineFilePublishType.HARVEST_UPLOAD: (True, False, False, True, True)>, <PipelineFilePublishType.HARVEST_ONLY: (True, False, False, False, True)>, <PipelineFilePublishType.HARVEST_ARCHIVE: (True, False, True, False, True)>, <PipelineFilePublishType.UPLOAD_ONLY: (True, False, False, True, False)>, <PipelineFilePublishType.HARVEST_ARCHIVE_UPLOAD: (True, False, True, True, True)>}
- all_deletion_types = {<PipelineFilePublishType.DELETE_UNHARVEST: (False, True, False, True, True)>, <PipelineFilePublishType.NO_ACTION: (True, True, False, False, False)>, <PipelineFilePublishType.DELETE_ONLY: (False, True, False, True, False)>, <PipelineFilePublishType.UNHARVEST_ONLY: (False, True, False, False, True)>}
- property is_addition_type
- property is_archive_type
- property is_deletion_type
- property is_harvest_type
- property is_store_type
- class aodncore.pipeline.RemotePipelineFile(dest_path, local_path=None, name=None, last_modified=None, size=None)[source]
Bases:
PipelineFileBase
Implementation of PipelineFileBase to represents a single remote file. This is used to provide a common interface for a remote file, to facilitate querying and downloading operations where the current state of the storage is relevant information
- property file_checksum
- classmethod from_pipelinefile(pipeline_file)[source]
Construct a RemotePipelineFile instance from an existing PipelineFile instance
Note: the local_path is deliberately not set in the new instance, primarily to avoid accidental overwriting of local files in case of conversion to a remote file and having the remote file downloaded. The local_path attribute may still be set, but it is an explicit operation.
- Parameters
pipeline_file – PipelineFile instance used to instantiate a RemotePipelineFile instance
- Returns
RemotePipelineFile instance
- property last_modified
- property local_path
- property size
- class aodncore.pipeline.RemotePipelineFileCollection(data=None, validate_unique=True)[source]
Bases:
PipelineFileCollectionBase
A PipelineFileCollectionBase subclass to hold a set of RemotePipelineFile instances
- download(broker, local_path)[source]
Helper method to download the current collection from a given broker to a given local path
- Parameters
broker – BaseStorageBroker subclass to download from
local_path – local path into which files are downloaded
- Returns
None
- member_class[source]
alias of
RemotePipelineFile
- property member_from_string_method
- member_validator()
- unique_attributes = ('local_path', 'dest_path')
- aodncore.pipeline.validate_pipelinefile_or_string(o)
- aodncore.pipeline.validate_pipelinefilecollection(o)