aodncore.pipeline package

Subpackages

Submodules

aodncore.pipeline.common module

This module contains common “types” (typically enums or simple data classes) used by more than one module in the project.

class aodncore.pipeline.common.CheckResult(compliant, log, errors=False)[source]

Bases: object

Simple container class to hold a check result

property compliant
property errors
property log
class aodncore.pipeline.common.FileType(value)[source]

Bases: Enum

Represents each known file type, including extensions, mime type and validator function

Each enum member may have it’s attributes accessed by name when required for comparisons and filtering, e.g.

  • lookup the extension and mime-type for PNG file types in general:

    FileType.PNG.extensions
    ('.png',)
    
    FileType.PNG.mime_type
    'image/png'
    
  • assign a type attribute to an object, and query the type-specific values directly from the object:

    class Object(object):
        pass
    
    o = Object()
    o.file_type = FileType.JPEG
    o.file_type.extensions
    ('.jpg', '.jpeg')
    o.file_type.mime_type
    'image/jpeg'
    

The function referenced by validator must accept a single parameter, which is the path to file being checked, and return True if the path represents a valid file of that type. For example, this would typically attempt to open/parse/read the file using a corresponding library (e.g. an NC file can be opened as a valid Dataset by the NetCDF library, a ZIP file can be read using the zipfile module etc.). If

CSV = (('.csv',), 'text/csv', <function is_nonempty_file>)
DELETE_MANIFEST = (('.delete_manifest',), 'text/csv', <function is_nonempty_file>)
DIR_MANIFEST = (('.dir_manifest',), 'text/plain', <function is_nonempty_file>)
GZIP = (('.gz',), 'application/gzip', <function is_gzip_file>)
JPEG = (('.jpg', '.jpeg'), 'image/jpeg', <function is_jpeg_file>)
JSON_MANIFEST = (('.json_manifest',), 'application/json', <function is_json_file>)
MAP_MANIFEST = (('.map_manifest',), 'text/csv', <function is_nonempty_file>)
NETCDF = (('.nc',), 'application/octet-stream', <function is_netcdf_file>)
PDF = (('.pdf',), 'application/pdf', <function is_pdf_file>)
PNG = (('.png',), 'image/png', <function is_png_file>)
RSYNC_MANIFEST = (('.rsync_manifest',), 'text/plain', <function is_nonempty_file>)
SIMPLE_MANIFEST = (('.manifest',), 'text/plain', <function is_nonempty_file>)
TIFF = (('.tif', '.tiff'), 'image/tiff', <function is_tiff_file>)
UNKNOWN = ((), None, <function is_nonempty_file>)
ZIP = (('.zip',), 'application/zip', <function is_zip_file>)
extensions
classmethod get_type_from_extension(extension)[source]
classmethod get_type_from_name(name)[source]
property is_image_type

Read-only boolean property indicating whether the file type instance is an image type.

is_type(value)[source]

Check whether the type (i.e. the information before the slash) equals the given value

Parameters

value – type to compare against

Returns

True if this type matches the given value, otherwise False

mime_type
validator
class aodncore.pipeline.common.HandlerResult(value)[source]

Bases: Enum

An enumeration.

ERROR = 2
SUCCESS = 1
UNKNOWN = 0
class aodncore.pipeline.common.NotificationRecipientType(value)[source]

Bases: Enum

Notification recipient type

EMAIL = ('email', <function is_valid_email_address>, 'invalid email address')
INVALID = ('invalid', <function NotificationRecipientType.<lambda>>, 'invalid protocol')
SNS = ('sns', <function NotificationRecipientType.<lambda>>, 'invalid SNS topic')
property address_validation_function
property error_string
classmethod get_type_from_protocol(protocol)[source]
property protocol
class aodncore.pipeline.common.PipelineFileCheckType(value)[source]

Bases: Enum

Each PipelineFile may individually specify which checks are performed against it

FORMAT_CHECK = 3
NC_COMPLIANCE_CHECK = 4
NONEMPTY_CHECK = 2
NO_ACTION = 1
TABLE_SCHEMA_CHECK = 5
UNSET = 0
all_checkable_types = {<PipelineFileCheckType.FORMAT_CHECK: 3>, <PipelineFileCheckType.NONEMPTY_CHECK: 2>, <PipelineFileCheckType.TABLE_SCHEMA_CHECK: 5>, <PipelineFileCheckType.NC_COMPLIANCE_CHECK: 4>}
all_settable_types = {<PipelineFileCheckType.NONEMPTY_CHECK: 2>, <PipelineFileCheckType.NO_ACTION: 1>, <PipelineFileCheckType.NC_COMPLIANCE_CHECK: 4>, <PipelineFileCheckType.FORMAT_CHECK: 3>, <PipelineFileCheckType.TABLE_SCHEMA_CHECK: 5>}
class aodncore.pipeline.common.PipelineFilePublishType(value)[source]

Bases: Enum

Each PipelineFile may individually specify which combination of archive/upload/harvest actions must occur before it is considered “published”

Enum member values are a tuple containing boolean flags used for querying/validating types, which are provided to the __init__() for each member (since confusingly, each Enum member is an instance of this class).

Each member’s valid is therefore a tuple of bool values representing the following flags:

(is_addition_type, is_deletion_type, is_archive_type, is_store_type, is_harvest_type)
ARCHIVE_ONLY = (True, False, True, False, False)
DELETE_ONLY = (False, True, False, True, False)
DELETE_UNHARVEST = (False, True, False, True, True)
HARVEST_ARCHIVE = (True, False, True, False, True)
HARVEST_ARCHIVE_UPLOAD = (True, False, True, True, True)
HARVEST_ONLY = (True, False, False, False, True)
HARVEST_UPLOAD = (True, False, False, True, True)
NO_ACTION = (True, True, False, False, False)
UNHARVEST_ONLY = (False, True, False, False, True)
UNSET = (None, None, None, None, None)
UPLOAD_ONLY = (True, False, False, True, False)
all_addition_types = {<PipelineFilePublishType.ARCHIVE_ONLY: (True, False, True, False, False)>, <PipelineFilePublishType.NO_ACTION: (True, True, False, False, False)>, <PipelineFilePublishType.HARVEST_UPLOAD: (True, False, False, True, True)>, <PipelineFilePublishType.HARVEST_ONLY: (True, False, False, False, True)>, <PipelineFilePublishType.HARVEST_ARCHIVE: (True, False, True, False, True)>, <PipelineFilePublishType.UPLOAD_ONLY: (True, False, False, True, False)>, <PipelineFilePublishType.HARVEST_ARCHIVE_UPLOAD: (True, False, True, True, True)>}
all_deletion_types = {<PipelineFilePublishType.DELETE_UNHARVEST: (False, True, False, True, True)>, <PipelineFilePublishType.NO_ACTION: (True, True, False, False, False)>, <PipelineFilePublishType.DELETE_ONLY: (False, True, False, True, False)>, <PipelineFilePublishType.UNHARVEST_ONLY: (False, True, False, False, True)>}
classmethod get_type_from_name(name)[source]
property is_addition_type
property is_archive_type
property is_deletion_type
property is_harvest_type
property is_store_type
aodncore.pipeline.common.validate_addition_publishtype(o)
aodncore.pipeline.common.validate_checkresult(o)
aodncore.pipeline.common.validate_checktype(o)
aodncore.pipeline.common.validate_deletion_publishtype(o)
aodncore.pipeline.common.validate_publishtype(o)
aodncore.pipeline.common.validate_recipienttype(o)
aodncore.pipeline.common.validate_settable_checktype(o)

aodncore.pipeline.config module

This module is separate from the aodncore.pipeline.configlib definitions in order for it to act as the shared configuration “singleton” for the package.

aodncore.pipeline.configlib module

This module provides code to support loading and accessing a central configuration object. Other modules should typically access the configuration by importing CONFIG from aodncore.pipeline.config rather than manually creating a new LazyConfigManager instance.

class aodncore.pipeline.configlib.LazyConfigManager[source]

Bases: object

Configuration object to consolidate configuration values.

Different configuration sources are represented as lazy properties so that it is still efficient to pass an instance of this class to several modules, even if they only require one particular type of configuration to operate.

property celery_application
property celery_routes
property discovered_dest_path_functions
property discovered_handlers
property discovered_module_versions
get_worker_logging_config(task_name)[source]

Get the logging config for an individual task

Parameters

task_name – name of the task to retrieve the log config for

Returns

logging config for use by logging.config.dictConfig

property pipeline_config
property trigger_config
property watch_config
property watch_directory_map
property watchservice_logging_config
aodncore.pipeline.configlib.load_json_file(default_config_file, envvar='', object_pairs_hook=<class 'aodncore.util.misc.WriteOnceOrderedDict'>)[source]

Load a JSON file into a dict

Parameters
  • default_config_file

  • envvar – environment variable containing path to load

  • object_pairs_hook – class used for json.load ‘object_pairs_hook’ parameter

Returns

object containing loaded JSON config

aodncore.pipeline.configlib.load_pipeline_config(default_config_file='/mnt/ebs/pipeline/etc/watches.conf', envvar='PIPELINE_CONFIG_FILE')[source]
aodncore.pipeline.configlib.load_trigger_config(default_config_file='/usr/local/talend/etc/trigger.conf')[source]
Parameters

default_config_file – default path to return if not found set in environment variable

Returns

dict representation of watch config parsed from json file

aodncore.pipeline.configlib.load_watch_config(default_config_file='/mnt/ebs/pipeline/etc/watches.conf')[source]
Parameters

default_config_file – default path to return if not found set in environment variable

Returns

dict representation of watch config parsed from json file

aodncore.pipeline.configlib.validate_lazyconfigmanager(o)

aodncore.pipeline.db module

class aodncore.pipeline.db.DatabaseInteractions(config, schema_base_path, logger)[source]

Bases: object

Database connection object.

This class should be instantiated via the ‘with DatabaseInteractions() as…’ method, so the __enter__ and __exit__ functions will be correctly implemented.

compare_schemas()[source]

Placeholder for possible future implementation of schema version checking

Returns

boolean - True if schemas match, else False

create_table_from_yaml_file(step)[source]

Function to read an yaml file and use it to build a CREATE TABLE script for execution against the database.

Parameters

step – A dict containing ‘name’ and ‘type’ (at least) keys

  • step.name is the name used as part of the match regular expression

  • step.type is the type of database object. Type should always be table in this context

drop_object(step)[source]

Drop the specified database object.

The database transaction uses the IF EXISTS parameter, so will not error if the database object does not exist; and also the CASCADE parameter meaning that a previous call to this method may have already cascaded to the current database object.

Parameters

step – A dict containing ‘name’ and ‘type’ (at least) keys

  • step.name is the name of the database object

  • step.type is the type of database object

execute_sql_file(step)[source]

Function to read an SQL file prior to executing against the database.

Parameters

step – A dict containing ‘name’ (at least) key

  • step.name is the name used as part of the match regular expression

get_spatial_extent(db_schema, table, column, resolution)[source]

Function to retrieve spatial data from the database.

Parameters
  • db_schema – string containing name of schema

  • table – string containing name of table

  • column – string containing name of column

  • resolution – int as resolution of polygons

get_temporal_extent(table, column)[source]

Function to retrieve temporal data from the database.

Parameters
  • table – string containing name of table

  • column – string containing name of column

get_vertical_extent(table, column)[source]

Function to retrieve vertical data from the database.

Parameters
  • table – string containing name of table

  • column – string containing name of column

load_data_from_csv(step)[source]

Function to read a csv file prior to loading into the specified table.

Currently uses the utf-8 encoding to read the csv file, and reads the headings into the COPY FROM statement - the latter may not be necessary as it is assumed that the file has been validated in a previous handler step. :param step: A dict containing ‘name’ and ‘local_path’ (at least) keys - step.name is the name of the target table - step.local_path is the full path to the source file (csv)

refresh_materialized_view(step)[source]

Refresh the specified materialized view.

Parameters

step – A dict containing ‘name’ and ‘type’ (at least) keys

  • step.name is the name of the database object

  • step.type is the type of database object - the database transaction will only be performed

    if type = ‘materialized view’

truncate_table(step)[source]

Truncate the specified table.

Parameters

step – A dict containing ‘name’ and ‘type’ (at least) keys

  • step.name is the name of the database object

  • step.type is the type of database object - the database transaction will only be performed

    if type = ‘table’

aodncore.pipeline.destpath module

This module provides code to determine which function/method will be used in order for the handler to generate the destination path (dest_path) and/or archive path (archive_path) for each file being processed. Since the path function may be in the handler class itself, or provided as a runtime parameter, this code is to ensure that the discovery mechanism is consistent.

aodncore.pipeline.destpath.get_path_function(handler_instance, entry_point_group, archive_mode=False)[source]

Return a tuple containing a reference to a path function, and the function’s fully qualified, printable name

Parameters
  • handler_instanceHandlerBase instance

  • entry_point_group – name of the entry point group used discover path functions

  • archive_modebool flag to modify which attributes are used

Returns

result of PathFunctionResolver.resolve() method

aodncore.pipeline.exceptions module

This module provides custom exceptions used throughout the aodncore.pipeline package.

exception aodncore.pipeline.exceptions.AttributeNotSetError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.AttributeValidationError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.ComplianceCheckFailedError[source]

Bases: PipelineProcessingError

exception aodncore.pipeline.exceptions.DuplicatePipelineFileError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.GeonetworkConnectionError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.GeonetworkRequestError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.HandlerAlreadyRunError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidCheckSuiteError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidCheckTypeError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidConfigError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidFileContentError[source]

Bases: PipelineProcessingError

exception aodncore.pipeline.exceptions.InvalidFileFormatError[source]

Bases: PipelineProcessingError

exception aodncore.pipeline.exceptions.InvalidFileNameError[source]

Bases: PipelineProcessingError

exception aodncore.pipeline.exceptions.InvalidHandlerError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidHarvestMapError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidHarvesterError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidInputFileError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidPathFunctionError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidRecipientError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidSQLConnectionError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidSQLTransactionError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidSchemaError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.InvalidStoreUrlError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.MissingConfigFileError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.MissingConfigParameterError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.MissingFileError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.NotificationFailedError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.PipelineProcessingError[source]

Bases: AodnBaseError

Base class for all exceptions which indicate that there was a problem processing the file as opposed to an internal configuration or environmental error. Handler classes should typically raise exceptions based on this exception to signal non-compliance of the file or some other user correctable problem.

exception aodncore.pipeline.exceptions.PipelineSystemError[source]

Bases: AodnBaseError

Base class for all exceptions not related to file processing and which would typically not be suitable to return to an end user

exception aodncore.pipeline.exceptions.StorageBrokerError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.UnexpectedCsvFilesError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.UnmappedFilesError[source]

Bases: PipelineSystemError

exception aodncore.pipeline.exceptions.UnmatchedFilesError[source]

Bases: PipelineSystemError

aodncore.pipeline.fileclassifier module

FileClassifier - Generic class for working out the destination path of a file to be published. The idea is to define the common functionality here, then create subclasses to customise for each specific incoming handler.

Expected use:

class MyFileClassifier(FileClassifier):
    def dest_path(self, input_file):
        path = <case-specific logic> 
        ...
        return path

try:
    dest_path = MyFileClassifier.dest_path(input_file)
except FileClassifierException, e:
    print >>sys.stderr, e
    raise

print dest_path
class aodncore.pipeline.fileclassifier.FileClassifier[source]

Bases: object

Base class for working out where a file should be published.

aodncore.pipeline.files module

class aodncore.pipeline.files.PipelineFile(local_path, name=None, archive_path=None, dest_path=None, is_deletion=False, late_deletion=False, file_update_callback=None, check_type=None, publish_type=None)[source]

Bases: PipelineFileBase

Represents a single file in order to store state information relating to the intended actions to be performed on the file, and the actions that were performed on the file

Parameters
  • local_path (str) – absolute source path to the file being represented

  • name (str) – arbitrary name (defaults to the output of os.path.basename() on local_path)

  • archive_path (str) – relative path used when archiving the file

  • dest_path (str) – relative path used when publishing the file

  • is_deletion (bool) – flag designating whether this is a deletion

  • late_deletion (bool) – flag to indicate that this file should be deleted after additions are performed (note: ignored if is_deletion=False)

  • file_update_callback (callable) – optional callback to call when a file property is updated

  • check_type (PipelineFileCheckType) – check type assigned to the file

  • publish_type (PipelineFilePublishType) – publish type assigned to the file

property archive_path
property check_log
property check_passed
property check_result
property check_type
property dest_path
property file_checksum
property file_update_callback
classmethod from_remotepipelinefile(remotepipelinefile, name=None, is_deletion=False, late_deletion=False, file_update_callback=None, check_type=None, publish_type=None)[source]

Construct a PipelineFile instance from an existing RemotePipelineFile instance

Parameters
  • remotepipelinefile – RemotePipelineFile instance used to instantiate a PipelineFile instance

  • name – name flag passed to __init__ (defaults to remotepipelinefile.name)

  • is_deletion – is_deletion flag passed to __init__

  • late_deletion – late_deletion flag passed to __init__

  • file_update_callback – file_update_callback flag passed to __init__

  • check_type – check_type flag passed to __init__

  • publish_type – publish_type flag passed to __init__

Returns

PipelineFile instance

property is_archived
property is_checked
property is_deleted
property is_deletion
property is_harvest_undone
property is_harvested
property is_overwrite
property is_stored
property is_upload_undone
property is_uploaded
property late_deletion
property mime_type
property pending_archive
property pending_harvest
property pending_harvest_addition
property pending_harvest_deletion
property pending_harvest_early_deletion
property pending_harvest_late_deletion
property pending_harvest_undo
property pending_store
property pending_store_addition
property pending_store_deletion
property pending_store_undo
property pending_undo
property publish_type
property published
property should_archive
property should_harvest
property should_store
property should_undo
property src_path
class aodncore.pipeline.files.PipelineFileCollection(data=None, validate_unique=True)[source]

Bases: PipelineFileCollectionBase

A PipelineFileCollectionBase subclass to hold a set of PipelineFile instances

add(pipeline_file, is_deletion=False, overwrite=False, validate_unique=True, **kwargs)[source]

Add a file to the collection

Parameters
  • pipeline_filePipelineFile or file path

  • kwargsdict additional keywords passed to to PipelineFileBase __init__ method

  • overwritebool which, if True, will overwrite an existing matching file in the collection

  • validate_uniquebool which, if True, will validate unique attributes when adding the file

Returns

bool which indicates whether the file was successfully added

classmethod from_remotepipelinefilecollection(remotepipelinefilecollection, are_deletions=False)[source]
member_class[source]

alias of PipelineFile

property member_from_string_method
member_validator()
set_archive_paths(archive_path_function)[source]

Set archive_path attributes for each file in the collection

Parameters

archive_path_function – function used to determine archive destination path

Returns

None

set_bool_attribute(attribute, value)[source]

Set a bool attribute for each file in the collection

Parameters
  • attribute – attribute to set

  • value – value to set the attribute

Returns

None

set_check_types(check_type)[source]

Set check_type attributes for each file in the collection

Parameters

check_typePipefileFileCheckType enum member

Returns

None

set_default_check_types(check_params=None)[source]

Set check_type attribute for each file in the collection to the default value, based on the file type and presence of compliance checker checks in the check parameters

Parameters

check_paramsdict or None

Returns

None

set_dest_paths(dest_path_function)[source]

Set dest_path attributes for each file in the collection

Parameters

dest_path_function – function used to determine publishing destination path

Returns

None

set_file_update_callback(file_update_callback)[source]

Set a callback function in each PipelineFile in this collection

Parameters

file_update_callback – callback (function)

Returns

None

set_publish_types(publish_type)[source]

Set publish_type attributes for each file in the collection

Parameters

publish_typePipefileFilePublishType enum member

Returns

None

set_publish_types_from_regexes(include_regexes, exclude_regexes, addition_type, deletion_type)[source]

Set publish_type attribute for each file in the collection depending on whether it is considered “included” according to the regex parameters

Parameters
  • include_regexes – regex(es) for which a file must match one or more to be included

  • exclude_regexes – regex(es) which will exclude an already included file

  • addition_typePipefileFilePublishType enum member set for included addition files

  • deletion_typePipefileFilePublishType enum member set for included deletion files

Returns

None

set_string_attribute(attribute, value)[source]

Set a string attribute for each file in the collection

Parameters
  • attribute – attribute to set

  • value – value to set the attribute

Returns

None

unique_attributes = ('archive_path', 'dest_path')
class aodncore.pipeline.files.RemotePipelineFile(dest_path, local_path=None, name=None, last_modified=None, size=None)[source]

Bases: PipelineFileBase

Implementation of PipelineFileBase to represents a single remote file. This is used to provide a common interface for a remote file, to facilitate querying and downloading operations where the current state of the storage is relevant information

property file_checksum
classmethod from_pipelinefile(pipeline_file)[source]

Construct a RemotePipelineFile instance from an existing PipelineFile instance

Note: the local_path is deliberately not set in the new instance, primarily to avoid accidental overwriting of local files in case of conversion to a remote file and having the remote file downloaded. The local_path attribute may still be set, but it is an explicit operation.

Parameters

pipeline_file – PipelineFile instance used to instantiate a RemotePipelineFile instance

Returns

RemotePipelineFile instance

property last_modified
property local_path
remove_local()[source]
property size
class aodncore.pipeline.files.RemotePipelineFileCollection(data=None, validate_unique=True)[source]

Bases: PipelineFileCollectionBase

A PipelineFileCollectionBase subclass to hold a set of RemotePipelineFile instances

download(broker, local_path)[source]

Helper method to download the current collection from a given broker to a given local path

Parameters
  • broker – BaseStorageBroker subclass to download from

  • local_path – local path into which files are downloaded

Returns

None

classmethod from_pipelinefilecollection(pipelinefilecollection)[source]
keys()[source]
member_class[source]

alias of RemotePipelineFile

property member_from_string_method
member_validator()
unique_attributes = ('local_path', 'dest_path')
aodncore.pipeline.files.ensure_pipelinefilecollection(o)[source]

Function to accept either a single PipelineFile OR a PipelineFileCollection and ensure that a PipelineFileCollection object is returned in either case

Parameters

o – PipelineFile or PipelineFileCollection object

Returns

PipelineFileCollection object

aodncore.pipeline.files.ensure_remotepipelinefilecollection(o)[source]

Function to accept either a single RemotePipelineFile OR a RemotePipelineFileCollection and ensure that a RemotePipelineFileCollection object is returned in either case

Parameters

o – RemotePipelineFile or RemotePipelineFileCollection object

Returns

RemotePipelineFileCollection object

aodncore.pipeline.files.validate_pipelinefile_or_pipelinefilecollection(o)
aodncore.pipeline.files.validate_pipelinefile_or_string(o)
aodncore.pipeline.files.validate_pipelinefilecollection(o)

aodncore.pipeline.geonetwork module

Geonetwork Library

class aodncore.pipeline.geonetwork.Geonetwork(base_url, username, password)[source]

Bases: object

Geonetwork API session handler

Parameters
  • base_url – Geonetwork instance base url

  • username – username for the Geonetwork API

  • password – password for the Geonetwork API

  • logger – an instance of the logger

get_record(_uuid)[source]

Retrieve a metadata record

Parameters

_uuid – Geonetwork record ID

Returns

xml of specified metadata record

update_record(_uuid, changes)[source]

Update Geonetwork record

Parameters
  • _uuid – Geonetwork record ID

  • changes – list of change dicts where each change contains a value and an xpath

class aodncore.pipeline.geonetwork.GeonetworkMetadataHandler(conn, session, metadata, logger)[source]

Bases: object

Handle changes to Geonetwork metadata from Harvester

Build the geonetwork payload and push changes :param conn: the database connection class (DatabaseInteractions) :param session: the geonetwork API session :param metadata: dict containing extents for a single metadata record :param logger: instance of the logger

build_api_payload()[source]

Build the batchedit API payload based on dict template

The payload is a complete replacement of all extents for the provided metadata record

get_namespace_dict()[source]

Scrape relevant namespaces from source metadata record

run()[source]
aodncore.pipeline.geonetwork.dict_to_xml(tag, value=None, attr=None, elems=None, display=True)[source]

Convert a dictionary of XML nodes into a nested XML string

Parameters
  • tag – a string or list containing element tag(s). If tag is a list then recursively build the parent elements and apply the rest of the logic to the last element.

  • value – string representing text for an element

  • attr – dict containing key:value pairs representing attributes of the element

  • elems – list of dicts containing nested element definitions. If this is present then the value will be overridden with recursive child elements

  • display – boolen to identify whether the element (and child nodes) should be rendered to the XML string

Returns

an XML string

aodncore.pipeline.geonetwork.geonetwork_exception_handler()[source]

aodncore.pipeline.handlerbase module

class aodncore.pipeline.handlerbase.HandlerBase(input_file, allowed_archive_path_regexes=None, allowed_dest_path_regexes=None, allowed_extensions=None, allowed_regexes=None, archive_input_file=False, archive_path_function=None, celery_task=None, check_params=None, config=None, custom_params=None, dest_path_function=None, error_cleanup_regexes=None, exclude_regexes=None, harvest_params=None, harvest_type='talend', include_regexes=None, notify_params=None, upload_path=None, resolve_params=None)[source]

Bases: object

Base class for pipeline handler sub-classes.

Implements common handler methods and defines state machine states and transitions.

Parameters
  • input_file (str) –

    Path to the file being handled. A non-existent file will cause the handler to exit with an error during the initialise step.

    Note

    input_file is the only positional argument. Other arguments may be provided in any order.

  • allowed_archive_path_regexes (list) – List of allowed regular expressions of which PipelineFile.archive_path must match at least one. If any non-matching values are found, the handler will exit with an error during the publish step before publishing anything.

  • allowed_dest_path_regexes (list) – List of allowed regular expressions of which PipelineFile.dest_path must match at least one. If any non-matching values are found, the handler will exit with an error during the publish step before publishing anything.

  • allowed_extensions (list) – List of allowed extensions for input_file. Non-matching input files will cause the handler to exit with an error during the initialise step.

  • allowed_regexes (list) –

    List of allowed regular expressions for input_file. Non-matching input files will cause the handler to exit with an error during the initialise step.

    Note

    allowed_regexes are checked after allowed_extensions

  • archive_input_file (bool) – Flags whether the original input file should be uploaded to the archive, the location of which is configured by the environment configuration. The file will be archived at ARCHIVE_URI/PIPELINE_NAME/BASENAME.

  • archive_path_function (str, function) – See dest_path_function. This operates identically, except that it is used to calculate the PipelineFile.archive_path attribute and that the path is relative to the ARCHIVE_URI.

  • celery_task (celery.Task) –

    A Celery task object, in order for the handler instance to derive runtime information such as the current task name and UUID.

    Note

    If absent (e.g. when unit testing), the handler will revert to having no task information available, and will log output to standard output.

  • check_params (dict) – A dict containing parameters passed directly to the check step (e.g. compliance checker suites). The structure of the dict is defined by the CHECK_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

  • config (aodncore.pipeline.config.LazyConfigManager) –

    A configuration object which the handler uses to retrieve configuration from it’s environment. If absent, the handler will exit with an error during the __init__() method (i.e. will not instantiate).

    Note

    While this attribute is mandatory, it is not generally required to supply it directly in normal use cases, unless instantiating the handler class manually.

    When deployed, the parameter is automatically included by the worker service configuration.

    When testing, unit tests inheriting from HandlerTestCase contain a pre-prepared config object available as self.config. The HandlerTestCase.run_handler() and HandlerTestCase.run_handler_with_exception() helper methods automatically assign the test config to the handler being tested.

  • custom_params (dict) – A dict containing parameters which are ignored by the base class, but allow passing arbitrary custom values to subclasses. The structure of the dict is defined by the CUSTOM_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

  • dest_path_function (str, callable) –

    The function used to determine the PipelineFile.dest_path attribute, relative to the UPLOAD_URI configuration item. If absent, the handler will attempt to use the dest_path() method in the handler itself. If a function is not found by either mechanism, the handler will exit with an error during the initialise step.

    Note

    When the value is a string, it is assumed that it refers to the name of a function advertised in the pipeline.handlers entry point group.

  • error_cleanup_regexes (list) – A list of regular expressions which, when a cleanup policy of DELETE_CUSTOM_REGEXES_FROM_ERROR_STORE is set, controls which files are deleted from the error store upon successful execution of the handler instance

  • exclude_regexes (list) – See include_regexes.

  • harvest_params (dict) – A dict containing parameters passed directly to the harvest step (e.g. slice size, undo behaviour). The structure of the dict is defined by the HARVEST_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

  • harvest_type (str) –

    String to inform the aodncore.pipeline.steps.harvest step factory function which HarvesterRunner implementation to use during the publish step.

    Note

    Currently the only valid value is ‘talend’, which is the default.

  • include_regexes (list) –

    A list of regexes which, when combined with exclude_regexes, determines which files in the collection are assigned with the default_addition_publish_type or default_deletion_publish_type types (depending on whether the file is an addition or a deletion). If set, to be considered included, file paths must match one of the regexes in include_regexes but not any of the regexes in exclude_regexes.

    Files not matching the inclusion criteria will remain with a publish_type attribute of PipelineFilePublishType.NO_ACTION, meaning they will be ignored by the publish step.

    Note

    If omitted, the default is to select all files in file_collection for publication.

    Note

    This relates only to the files in file_collection, and has no relation to the input_file path, unless the input file is itself in the collection (e.g. when handling a single file).

    For example, a single ‘.nc’ file could feasibly match the allowed_extensions for the handler, but still be excluded by this mechanism once it is added to file_collection during the aodncore.pipeline.steps.resolve step.

  • notify_params (dict) – A dict containing parameters passed directly to the aodncore.pipeline.steps.notify step (e.g. owner/success/failure notify lists). The structure of the dict is defined by the NOTIFY_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

  • upload_path (str) –

    A string attribute to hold the original upload path of the input_file.

    Note

    This is intended for information purposes only (e.g. to appear in notification templates), since there is a distinction between the original path, and the input_file as provided to the handler, which represents where the file was moved to for processing.

  • resolve_params (dict) – A dict containing parameters passed directly to the resolve step (e.g. the root path prepended to relative paths in manifest files). The structure of the dict is defined by the RESOLVE_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

add_to_collection(pipeline_file, **kwargs)[source]

Helper method to add a PipelineFile object or path to the handler’s file_collection, with the handler instance’s file_update_callback method assigned

Note: as this is a wrapper to the PipelineFileCollection.add method, kwargs are only applied when pipeline_file is a path (string). When adding an existing PipelineFile object, the object is added “as-is”, and attributes must be set explicitly on the object.

Parameters
  • pipeline_filePipelineFile or file path

  • kwargs – keyword arguments passed through to the PipelineFileCollection.add method

Returns

None

all_states = ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED', 'HANDLER_NOTIFIED_SUCCESS', 'HANDLER_NOTIFIED_ERROR', 'HANDLER_COMPLETED_SUCCESS', 'HANDLER_COMPLETED_ERROR']
all_transitions = [{'trigger': '_trigger_initialise', 'source': 'HANDLER_INITIAL', 'dest': 'HANDLER_INITIALISED', 'before': '_initialise'}, {'trigger': '_trigger_resolve', 'source': 'HANDLER_INITIALISED', 'dest': 'HANDLER_RESOLVED', 'before': '_resolve'}, {'trigger': '_trigger_preprocess', 'source': 'HANDLER_RESOLVED', 'dest': 'HANDLER_PREPROCESSED', 'before': 'preprocess'}, {'trigger': '_trigger_check', 'source': 'HANDLER_PREPROCESSED', 'dest': 'HANDLER_CHECKED', 'before': '_check'}, {'trigger': '_trigger_process', 'source': 'HANDLER_CHECKED', 'dest': 'HANDLER_PROCESSED', 'before': 'process'}, {'trigger': '_trigger_publish', 'source': 'HANDLER_PROCESSED', 'dest': 'HANDLER_PUBLISHED', 'before': '_publish'}, {'trigger': '_trigger_postprocess', 'source': 'HANDLER_PUBLISHED', 'dest': 'HANDLER_POSTPROCESSED', 'before': 'postprocess'}, {'trigger': '_trigger_notify_success', 'source': 'HANDLER_POSTPROCESSED', 'dest': 'HANDLER_NOTIFIED_SUCCESS', 'before': '_notify_success'}, {'trigger': '_trigger_notify_error', 'source': ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED'], 'dest': 'HANDLER_NOTIFIED_ERROR', 'before': '_notify_error'}, {'trigger': '_trigger_complete_success', 'source': 'HANDLER_NOTIFIED_SUCCESS', 'dest': 'HANDLER_COMPLETED_SUCCESS', 'before': '_complete_success'}, {'trigger': '_trigger_complete_with_errors', 'source': 'HANDLER_NOTIFIED_ERROR', 'dest': 'HANDLER_COMPLETED_ERROR', 'before': '_complete_with_errors'}]
property celery_task_id

Read-only property to access Celery task ID

Returns

Celery task ID (if applicable)

Return type

str, None

property celery_task_name

Read-only property to access Celery task name

Returns

Celery task name (if applicable)

Return type

str, None

property collection_dir

Temporary subdirectory where the initial input file collection will be unpacked

Warning

Any new files created during the handler execution (i.e. were not in the original input file) should be created in self.products_dir rather than here.

Returns

collection subdirectory of instance working directory (as populated by aodncore.pipeline.steps.resolve step)

Return type

str, None

property config

Property to access the config attribute

Returns

configuration object

Return type

aodncore.pipeline.config.LazyConfigManager

property default_addition_publish_type

Property to manage attribute which determines the default publish type assigned to ‘addition’ PipelineFile instances

Returns

default addition publish type

Return type

aodncore.pipeline.common.PipelinePublishType

property default_deletion_publish_type

Property to manage attribute which determines the default publish type assigned to ‘deletion’ PipelineFile instances

Returns

default deletion publish type

Return type

aodncore.pipeline.common.PipelinePublishType

property error

Read-only property to access Exception object from handler instance

Returns

the exception object which caused the handler to fail (if applicable)

Return type

Exception, None

property error_details

Read-only property to retrieve string description of error from handler instance

Returns

error description string (if applicable)

Return type

str, None

property exclude_regexes

Property to manage exclude_regexes attribute

Returns

Return type

list

property file_basename

Read-only property to access the input_file basename

Returns

input_file basename

Return type

str

property file_checksum

Read-only property to access the input_file checksum

Returns

input_file checksum string

Return type

str

property file_collection

Read-only property to access the handler’s primary PipelineFileCollection instance

Returns

handler file collection

Return type

PipelineFileCollection

property file_extension

Read-only property to access the input_file extension

Returns

input_file extension string

Return type

str

property file_type

Read-only property to access the input_file type

Returns

input_file type

Return type

FileType

property include_regexes

Property to manage include_regexes attribute

Returns

Return type

list

property input_file_archive_path

Property used to determine the archive path for the original input file

Returns

string containing the archive path

Return type

str

property input_file_object

Read-only property to access the original input file represented as a PipelineFile object

Returns

input file object

Return type

PipelineFile

property instance_working_directory

Read-only property to retrieve the instance working directory

Returns

string containing path to top level working directory for this instance

Return type

str, None

property logger

Read-only property to access the instance Logger

Returns

Logger

property notification_results

Read-only property to retrieve the notification results, including the sent status of each recipient

Returns

list of aodncore.pipeline.steps.notify.NotifyRecipient instances

Return type

aodncore.pipeline.steps.notify.NotifyList

ordered_states = ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED']
ordered_transitions = [{'trigger': '_trigger_initialise', 'source': 'HANDLER_INITIAL', 'dest': 'HANDLER_INITIALISED', 'before': '_initialise'}, {'trigger': '_trigger_resolve', 'source': 'HANDLER_INITIALISED', 'dest': 'HANDLER_RESOLVED', 'before': '_resolve'}, {'trigger': '_trigger_preprocess', 'source': 'HANDLER_RESOLVED', 'dest': 'HANDLER_PREPROCESSED', 'before': 'preprocess'}, {'trigger': '_trigger_check', 'source': 'HANDLER_PREPROCESSED', 'dest': 'HANDLER_CHECKED', 'before': '_check'}, {'trigger': '_trigger_process', 'source': 'HANDLER_CHECKED', 'dest': 'HANDLER_PROCESSED', 'before': 'process'}, {'trigger': '_trigger_publish', 'source': 'HANDLER_PROCESSED', 'dest': 'HANDLER_PUBLISHED', 'before': '_publish'}, {'trigger': '_trigger_postprocess', 'source': 'HANDLER_PUBLISHED', 'dest': 'HANDLER_POSTPROCESSED', 'before': 'postprocess'}]
other_states = ['HANDLER_NOTIFIED_SUCCESS', 'HANDLER_NOTIFIED_ERROR', 'HANDLER_COMPLETED_SUCCESS', 'HANDLER_COMPLETED_ERROR']
other_transitions = [{'trigger': '_trigger_notify_success', 'source': 'HANDLER_POSTPROCESSED', 'dest': 'HANDLER_NOTIFIED_SUCCESS', 'before': '_notify_success'}, {'trigger': '_trigger_notify_error', 'source': ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED'], 'dest': 'HANDLER_NOTIFIED_ERROR', 'before': '_notify_error'}, {'trigger': '_trigger_complete_success', 'source': 'HANDLER_NOTIFIED_SUCCESS', 'dest': 'HANDLER_COMPLETED_SUCCESS', 'before': '_complete_success'}, {'trigger': '_trigger_complete_with_errors', 'source': 'HANDLER_NOTIFIED_ERROR', 'dest': 'HANDLER_COMPLETED_ERROR', 'before': '_complete_with_errors'}]
postprocess()[source]

Method designed to be overridden by child handlers in order to execute code between publish and notify steps

Returns

None

preprocess()[source]

Method designed to be overridden by child handlers in order to execute code between resolve and check steps

Returns

None

process()[source]

Method designed to be overridden by child handlers in order to execute code between check and publish steps

Returns

None

property products_dir

Temporary subdirectory in which products may be created

Returns

products subdirectory of instance working directory

Return type

str, None

property result

Read-only property to retrieve the overall end result of the handler instance

Returns

handler result

Return type

aodncore.pipeline.common.HandlerResult

run()[source]

The entry point to the handler instance. Executes the automatic state machine transitions, and populates the result attribute to signal success or failure of the handler instance.

property should_notify

Read-only property to retrieve the list of intended recipients after being assembled based on notify_params

Returns

list of intended recipients

Return type

list

property start_time

Read-only property containing the timestamp of when this instance was created

Returns

timestamp of handler starting time

Return type

datetime.datetime

property state_query

Read-only property containing an initialised StateQuery instance, for querying existing pipeline state

Returns

StateQuery instance

Return type

StateQuery

property temp_dir

Temporary subdirectory where any other arbitrary temporary files may be created by handler sub-classes

Returns

temporary subdirectory of instance working directory

Return type

str, None

property versions

Read-only property to access module versions

Returns

module version strings for aodncore, aodndata and compliance checker modules

Return type

dict

aodncore.pipeline.log module

This module provides logging related objects for dynamically constructing logging configurations and retrieving configured LoggerAdapter/Logger instances

class aodncore.pipeline.log.WorkerLoggingConfigBuilder(pipeline_config)[source]

Bases: object

add_watch_config(name, formatter='pipeline_formatter', level=None)[source]

Add logging configuration for a single pipeline watch

Parameters
  • name – name of the pipeline for which the config is being generated

  • formatter – name of the formatter to use for handler

  • level – logging level

Returns

dict containing handlers/loggers for a single watch

add_watches(watches)[source]
build()[source]
aodncore.pipeline.log.get_pipeline_logger(name, extra=None, logger_function=<function getLogger>)[source]

Return a logger adapter prepared with given extra metadata and SYSINFO logging level

Parameters
  • name – logger name

  • extra – extra dict to pass to LoggerAdapter

  • logger_function – factory function which accepts a logger name and returns a Logger instance

Returns

Logger instance

aodncore.pipeline.log.get_watchservice_logging_config(pipeline_config)[source]

Generate logging configuration for the ‘watchservice’ service, suitable for use by logging.config.dictConfig()

Parameters

pipeline_configLazyConfigManager.pipeline_config instance

Returns

rendered watchservice logging config

aodncore.pipeline.schema module

This module holds schema definitions for validating the various dicts which are used through the pipeline, and also the helper functions necessary to validate an object against their respective schema.

aodncore.pipeline.schema.validate_check_params(check_params)[source]
aodncore.pipeline.schema.validate_custom_params(check_params)[source]
aodncore.pipeline.schema.validate_harvest_params(harvest_params)[source]
aodncore.pipeline.schema.validate_json_manifest(json_manifest)[source]
aodncore.pipeline.schema.validate_logging_config(logging_config)[source]
aodncore.pipeline.schema.validate_notify_params(notify_params)[source]
aodncore.pipeline.schema.validate_pipeline_config(pipeline_config)[source]
aodncore.pipeline.schema.validate_resolve_params(resolve_params)[source]

aodncore.pipeline.serviceconfig module

aodncore.pipeline.statequery module

class aodncore.pipeline.statequery.StateQuery(storage_broker, wfs_broker)[source]

Bases: object

Simple state query interface, to provide user friendly access for querying existing Pipeline state

download(remotepipelinefilecollection, local_path)[source]

Helper method to download a RemotePipelineFileCollection or RemotePipelineFile

Parameters
  • remotepipelinefilecollection – RemotePipelineFileCollection to download

  • local_path – local path where files will be downloaded.

Returns

None

query_storage(query)[source]

Query the storage backend and return existing files matching the given query

Parameters

query – S3-style prefix for filtering query results

Returns

RemotePipelineFileCollection of files matching the prefix

query_wfs_file_exists(layer, name)[source]

Returns a bool representing whether a given ‘file_url’ is present in a layer

Parameters
  • layer – layer name supplied to GetFeature typename parameter

  • name – ‘file_url’ inserted into OGC filter, and supplied to GetFeature filter parameter

Returns

whether the given file is present in the layer

query_wfs_files(layer, **kwargs)[source]

Return a RemotePipelineFileCollection containing all files for a given layer, or files matching the filter specified in the kwarg ogc_expression (of type OgcExpression)

Parameters
  • layer – layer name supplied to GetFeature typename parameter

  • kwargs – keyword arguments passed to underlying broker method

Returns

RemotePipelineFileCollection containing list of files for the layer

query_wfs_getfeature_dict(layer=None, **kwargs)[source]

Make a GetFeature request, and return the response in a native dict

Parameters
  • layer – layer name supplied to GetFeature typename parameter

  • kwargs – keyword arguments passed to the underlying WebFeatureService.getfeature method

Returns

dict containing the parsed GetFeature response

query_wfs_urls_for_layer(layer, **kwargs)[source]
property wfs

Read-only property to access the instantiated WebFeatureService object

Returns

WebFeatureService instance

aodncore.pipeline.storage module

class aodncore.pipeline.storage.LocalFileStorageBroker(prefix)[source]

Bases: BaseStorageBroker

StorageBroker to interact with a local directory

class aodncore.pipeline.storage.S3StorageBroker(bucket, prefix)[source]

Bases: BaseStorageBroker

StorageBroker to interact with an S3

Note: this does not and should not attempt to support any authentication code. Multiple mechanisms for loading

credentials are far more appropriately handled directly by the boto3, and it is expected that the credentials are supplied using one of these mechanisms by the environment (e.g. deployed from configuration management, set as environment variables etc.)

Refer: http://boto3.readthedocs.io/en/latest/guide/configuration.html

static result_to_remote_pipelinefile_collection(result)[source]
retry_kwargs = {'backoff': 2, 'delay': 5, 'exceptions': (<class 'botocore.exceptions.ClientError'>, <class 'botocore.exceptions.ConnectionError'>, <class 'http.client.IncompleteRead'>, <class 'ssl.SSLError'>), 'tries': 3}
class aodncore.pipeline.storage.SftpStorageBroker(server, prefix)[source]

Bases: BaseStorageBroker

StorageBroker to interact with a directory on a remote SFTP server

Note: similar to the S3 storage broker, this does not implement any authentication code, as this is better handled by the environment in the form of public key authentication

aodncore.pipeline.storage.get_storage_broker(store_url)[source]

Factory function to return appropriate storage broker class based on URL scheme

Parameters

store_url – URL base

Returns

BaseStorageBroker sub-class

aodncore.pipeline.storage.sftp_makedirs(sftpclient, name, mode=493)[source]
Recursively create a directory path on a remote SFTP server

Based on os.makedirs, with local calls replaced with SFTPClient equivalents calls.

Parameters
  • sftpclient – SFTPClient object

  • name – directory path to create

  • mode – permissions for the newly created directory

Returns

None

aodncore.pipeline.storage.sftp_mkdir_p(sftpclient, name, mode=493)[source]
Replicate ‘mkdir -p’ shell command behaviour by wrapping sftp_makedirs and suppressing exceptions where the

directory already exists.

Parameters
  • sftpclient – SFTPClient object

  • name – directory path to create

  • mode – permissions for the newly created directory

Returns

None

aodncore.pipeline.storage.sftp_path_exists(sftpclient, path)[source]

Test whether a path exists on a remote SFTP server

Parameters
  • sftpclient – SFTPClient object

  • path – path to test for existence

Returns

True if the path exists, False if not

aodncore.pipeline.storage.validate_storage_broker(o)

aodncore.pipeline.watch module

This module provides the code to implement the “watchservice” component of the pipeline.

This includes setting up directory watches, handling incoming inotify events, defining the Celery tasks and routing/queuing events.

The watchservice itself is designed as an executable module, with the entry point being the aodncore.pipeline.watchservice module.

This means that once aodncore is installed, running the watchservice is simply a matter of running the following:

python -m aodncore.pipeline.watchservice

This is typically run as an operating system service by something like supervisord, but can be run from the command-line for debugging.

class aodncore.pipeline.watch.CeleryConfig(routes=None)[source]

Bases: object

accept_content = ['json']
broker_url = 'amqp://'
result_serializer = 'json'
task_routes = {}
task_serializer = 'json'
worker_max_tasks_per_child = 1
class aodncore.pipeline.watch.CeleryContext(application, config, celeryconfig)[source]

Bases: object

property application

Return the configured Celery application instance

Returns

Celery application instance with config applied and tasks registered

class aodncore.pipeline.watch.IncomingFileEventHandler(config)[source]

Bases: ProcessEvent

process_default(event)[source]

Default processing event method. By default does nothing. Subclass ProcessEvent and redefine this method in order to modify its behavior.

@param event: Event to be processed. Can be of any type of events but

IN_Q_OVERFLOW events (see method process_IN_Q_OVERFLOW).

@type event: Event instance

queue_task(directory, pathname, event_id=None)[source]

Add a task to the queue corresponding with the given directory, handling the given file

Parameters
  • directory – the watched directory

  • pathname – the fully qualified path to the file which triggered the event

  • event_id – UUID to identify this event in log files (will be generated if not present)

Returns

None

class aodncore.pipeline.watch.IncomingFileStateManager(input_file, pipeline_name, config, logger, celery_request, error_exit_policies=None, success_exit_policies=None, error_broker=None)[source]

Bases: object

property basename
property error_broker
error_mode = 436
property error_name
property error_uri
property incoming_dir
property processing_dir
processing_mode = 292
property processing_path
property relative_path
states = ['FILE_IN_INCOMING', 'FILE_IN_PROCESSING', 'FILE_IN_ERROR', 'FILE_SUCCESS']
transitions = [{'trigger': 'move_to_processing', 'source': 'FILE_IN_INCOMING', 'dest': 'FILE_IN_PROCESSING', 'before': ['_pre_processing_checks', '_move_to_processing']}, {'trigger': 'move_to_error', 'source': 'FILE_IN_PROCESSING', 'dest': 'FILE_IN_ERROR', 'before': ['_run_error_callbacks', '_move_to_error']}, {'trigger': 'move_to_success', 'source': 'FILE_IN_PROCESSING', 'dest': 'FILE_SUCCESS', 'before': '_run_success_callbacks', 'after': '_remove_processing_file'}]
class aodncore.pipeline.watch.WatchServiceContext(config)[source]

Bases: object

Class to create instances required for WatchServiceManager

class aodncore.pipeline.watch.WatchServiceManager(config, event_handler, watch_manager, notifier)[source]

Bases: object

EVENT_MASK = 136
handle_signal(signo=None, stackframe=None)[source]
stop(reason='unknown')[source]
property watches
aodncore.pipeline.watch.get_task_name(namespace, function_name)[source]

Convenience function for CeleryManager.get_task_name()

Parameters
  • namespace – task namespace

  • function_name – name of function

Returns

string containing qualified task name

aodncore.pipeline.watchservice module

Module contents

class aodncore.pipeline.CheckResult(compliant, log, errors=False)[source]

Bases: object

Simple container class to hold a check result

property compliant
property errors
property log
class aodncore.pipeline.FileClassifier[source]

Bases: object

Base class for working out where a file should be published.

class aodncore.pipeline.FileType(value)[source]

Bases: Enum

Represents each known file type, including extensions, mime type and validator function

Each enum member may have it’s attributes accessed by name when required for comparisons and filtering, e.g.

  • lookup the extension and mime-type for PNG file types in general:

    FileType.PNG.extensions
    ('.png',)
    
    FileType.PNG.mime_type
    'image/png'
    
  • assign a type attribute to an object, and query the type-specific values directly from the object:

    class Object(object):
        pass
    
    o = Object()
    o.file_type = FileType.JPEG
    o.file_type.extensions
    ('.jpg', '.jpeg')
    o.file_type.mime_type
    'image/jpeg'
    

The function referenced by validator must accept a single parameter, which is the path to file being checked, and return True if the path represents a valid file of that type. For example, this would typically attempt to open/parse/read the file using a corresponding library (e.g. an NC file can be opened as a valid Dataset by the NetCDF library, a ZIP file can be read using the zipfile module etc.). If

CSV = (('.csv',), 'text/csv', <function is_nonempty_file>)
DELETE_MANIFEST = (('.delete_manifest',), 'text/csv', <function is_nonempty_file>)
DIR_MANIFEST = (('.dir_manifest',), 'text/plain', <function is_nonempty_file>)
GZIP = (('.gz',), 'application/gzip', <function is_gzip_file>)
JPEG = (('.jpg', '.jpeg'), 'image/jpeg', <function is_jpeg_file>)
JSON_MANIFEST = (('.json_manifest',), 'application/json', <function is_json_file>)
MAP_MANIFEST = (('.map_manifest',), 'text/csv', <function is_nonempty_file>)
NETCDF = (('.nc',), 'application/octet-stream', <function is_netcdf_file>)
PDF = (('.pdf',), 'application/pdf', <function is_pdf_file>)
PNG = (('.png',), 'image/png', <function is_png_file>)
RSYNC_MANIFEST = (('.rsync_manifest',), 'text/plain', <function is_nonempty_file>)
SIMPLE_MANIFEST = (('.manifest',), 'text/plain', <function is_nonempty_file>)
TIFF = (('.tif', '.tiff'), 'image/tiff', <function is_tiff_file>)
UNKNOWN = ((), None, <function is_nonempty_file>)
ZIP = (('.zip',), 'application/zip', <function is_zip_file>)
extensions
classmethod get_type_from_extension(extension)[source]
classmethod get_type_from_name(name)[source]
property is_image_type

Read-only boolean property indicating whether the file type instance is an image type.

is_type(value)[source]

Check whether the type (i.e. the information before the slash) equals the given value

Parameters

value – type to compare against

Returns

True if this type matches the given value, otherwise False

mime_type
validator
class aodncore.pipeline.HandlerBase(input_file, allowed_archive_path_regexes=None, allowed_dest_path_regexes=None, allowed_extensions=None, allowed_regexes=None, archive_input_file=False, archive_path_function=None, celery_task=None, check_params=None, config=None, custom_params=None, dest_path_function=None, error_cleanup_regexes=None, exclude_regexes=None, harvest_params=None, harvest_type='talend', include_regexes=None, notify_params=None, upload_path=None, resolve_params=None)[source]

Bases: object

Base class for pipeline handler sub-classes.

Implements common handler methods and defines state machine states and transitions.

Parameters
  • input_file (str) –

    Path to the file being handled. A non-existent file will cause the handler to exit with an error during the initialise step.

    Note

    input_file is the only positional argument. Other arguments may be provided in any order.

  • allowed_archive_path_regexes (list) – List of allowed regular expressions of which PipelineFile.archive_path must match at least one. If any non-matching values are found, the handler will exit with an error during the publish step before publishing anything.

  • allowed_dest_path_regexes (list) – List of allowed regular expressions of which PipelineFile.dest_path must match at least one. If any non-matching values are found, the handler will exit with an error during the publish step before publishing anything.

  • allowed_extensions (list) – List of allowed extensions for input_file. Non-matching input files will cause the handler to exit with an error during the initialise step.

  • allowed_regexes (list) –

    List of allowed regular expressions for input_file. Non-matching input files will cause the handler to exit with an error during the initialise step.

    Note

    allowed_regexes are checked after allowed_extensions

  • archive_input_file (bool) – Flags whether the original input file should be uploaded to the archive, the location of which is configured by the environment configuration. The file will be archived at ARCHIVE_URI/PIPELINE_NAME/BASENAME.

  • archive_path_function (str, function) – See dest_path_function. This operates identically, except that it is used to calculate the PipelineFile.archive_path attribute and that the path is relative to the ARCHIVE_URI.

  • celery_task (celery.Task) –

    A Celery task object, in order for the handler instance to derive runtime information such as the current task name and UUID.

    Note

    If absent (e.g. when unit testing), the handler will revert to having no task information available, and will log output to standard output.

  • check_params (dict) – A dict containing parameters passed directly to the check step (e.g. compliance checker suites). The structure of the dict is defined by the CHECK_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

  • config (aodncore.pipeline.config.LazyConfigManager) –

    A configuration object which the handler uses to retrieve configuration from it’s environment. If absent, the handler will exit with an error during the __init__() method (i.e. will not instantiate).

    Note

    While this attribute is mandatory, it is not generally required to supply it directly in normal use cases, unless instantiating the handler class manually.

    When deployed, the parameter is automatically included by the worker service configuration.

    When testing, unit tests inheriting from HandlerTestCase contain a pre-prepared config object available as self.config. The HandlerTestCase.run_handler() and HandlerTestCase.run_handler_with_exception() helper methods automatically assign the test config to the handler being tested.

  • custom_params (dict) – A dict containing parameters which are ignored by the base class, but allow passing arbitrary custom values to subclasses. The structure of the dict is defined by the CUSTOM_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

  • dest_path_function (str, callable) –

    The function used to determine the PipelineFile.dest_path attribute, relative to the UPLOAD_URI configuration item. If absent, the handler will attempt to use the dest_path() method in the handler itself. If a function is not found by either mechanism, the handler will exit with an error during the initialise step.

    Note

    When the value is a string, it is assumed that it refers to the name of a function advertised in the pipeline.handlers entry point group.

  • error_cleanup_regexes (list) – A list of regular expressions which, when a cleanup policy of DELETE_CUSTOM_REGEXES_FROM_ERROR_STORE is set, controls which files are deleted from the error store upon successful execution of the handler instance

  • exclude_regexes (list) – See include_regexes.

  • harvest_params (dict) – A dict containing parameters passed directly to the harvest step (e.g. slice size, undo behaviour). The structure of the dict is defined by the HARVEST_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

  • harvest_type (str) –

    String to inform the aodncore.pipeline.steps.harvest step factory function which HarvesterRunner implementation to use during the publish step.

    Note

    Currently the only valid value is ‘talend’, which is the default.

  • include_regexes (list) –

    A list of regexes which, when combined with exclude_regexes, determines which files in the collection are assigned with the default_addition_publish_type or default_deletion_publish_type types (depending on whether the file is an addition or a deletion). If set, to be considered included, file paths must match one of the regexes in include_regexes but not any of the regexes in exclude_regexes.

    Files not matching the inclusion criteria will remain with a publish_type attribute of PipelineFilePublishType.NO_ACTION, meaning they will be ignored by the publish step.

    Note

    If omitted, the default is to select all files in file_collection for publication.

    Note

    This relates only to the files in file_collection, and has no relation to the input_file path, unless the input file is itself in the collection (e.g. when handling a single file).

    For example, a single ‘.nc’ file could feasibly match the allowed_extensions for the handler, but still be excluded by this mechanism once it is added to file_collection during the aodncore.pipeline.steps.resolve step.

  • notify_params (dict) – A dict containing parameters passed directly to the aodncore.pipeline.steps.notify step (e.g. owner/success/failure notify lists). The structure of the dict is defined by the NOTIFY_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

  • upload_path (str) –

    A string attribute to hold the original upload path of the input_file.

    Note

    This is intended for information purposes only (e.g. to appear in notification templates), since there is a distinction between the original path, and the input_file as provided to the handler, which represents where the file was moved to for processing.

  • resolve_params (dict) – A dict containing parameters passed directly to the resolve step (e.g. the root path prepended to relative paths in manifest files). The structure of the dict is defined by the RESOLVE_PARAMS_SCHEMA object in the aodncore.pipeline.schema module.

add_to_collection(pipeline_file, **kwargs)[source]

Helper method to add a PipelineFile object or path to the handler’s file_collection, with the handler instance’s file_update_callback method assigned

Note: as this is a wrapper to the PipelineFileCollection.add method, kwargs are only applied when pipeline_file is a path (string). When adding an existing PipelineFile object, the object is added “as-is”, and attributes must be set explicitly on the object.

Parameters
  • pipeline_filePipelineFile or file path

  • kwargs – keyword arguments passed through to the PipelineFileCollection.add method

Returns

None

all_states = ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED', 'HANDLER_NOTIFIED_SUCCESS', 'HANDLER_NOTIFIED_ERROR', 'HANDLER_COMPLETED_SUCCESS', 'HANDLER_COMPLETED_ERROR']
all_transitions = [{'trigger': '_trigger_initialise', 'source': 'HANDLER_INITIAL', 'dest': 'HANDLER_INITIALISED', 'before': '_initialise'}, {'trigger': '_trigger_resolve', 'source': 'HANDLER_INITIALISED', 'dest': 'HANDLER_RESOLVED', 'before': '_resolve'}, {'trigger': '_trigger_preprocess', 'source': 'HANDLER_RESOLVED', 'dest': 'HANDLER_PREPROCESSED', 'before': 'preprocess'}, {'trigger': '_trigger_check', 'source': 'HANDLER_PREPROCESSED', 'dest': 'HANDLER_CHECKED', 'before': '_check'}, {'trigger': '_trigger_process', 'source': 'HANDLER_CHECKED', 'dest': 'HANDLER_PROCESSED', 'before': 'process'}, {'trigger': '_trigger_publish', 'source': 'HANDLER_PROCESSED', 'dest': 'HANDLER_PUBLISHED', 'before': '_publish'}, {'trigger': '_trigger_postprocess', 'source': 'HANDLER_PUBLISHED', 'dest': 'HANDLER_POSTPROCESSED', 'before': 'postprocess'}, {'trigger': '_trigger_notify_success', 'source': 'HANDLER_POSTPROCESSED', 'dest': 'HANDLER_NOTIFIED_SUCCESS', 'before': '_notify_success'}, {'trigger': '_trigger_notify_error', 'source': ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED'], 'dest': 'HANDLER_NOTIFIED_ERROR', 'before': '_notify_error'}, {'trigger': '_trigger_complete_success', 'source': 'HANDLER_NOTIFIED_SUCCESS', 'dest': 'HANDLER_COMPLETED_SUCCESS', 'before': '_complete_success'}, {'trigger': '_trigger_complete_with_errors', 'source': 'HANDLER_NOTIFIED_ERROR', 'dest': 'HANDLER_COMPLETED_ERROR', 'before': '_complete_with_errors'}]
property celery_task_id

Read-only property to access Celery task ID

Returns

Celery task ID (if applicable)

Return type

str, None

property celery_task_name

Read-only property to access Celery task name

Returns

Celery task name (if applicable)

Return type

str, None

property collection_dir

Temporary subdirectory where the initial input file collection will be unpacked

Warning

Any new files created during the handler execution (i.e. were not in the original input file) should be created in self.products_dir rather than here.

Returns

collection subdirectory of instance working directory (as populated by aodncore.pipeline.steps.resolve step)

Return type

str, None

property config

Property to access the config attribute

Returns

configuration object

Return type

aodncore.pipeline.config.LazyConfigManager

property default_addition_publish_type

Property to manage attribute which determines the default publish type assigned to ‘addition’ PipelineFile instances

Returns

default addition publish type

Return type

aodncore.pipeline.common.PipelinePublishType

property default_deletion_publish_type

Property to manage attribute which determines the default publish type assigned to ‘deletion’ PipelineFile instances

Returns

default deletion publish type

Return type

aodncore.pipeline.common.PipelinePublishType

property error

Read-only property to access Exception object from handler instance

Returns

the exception object which caused the handler to fail (if applicable)

Return type

Exception, None

property error_details

Read-only property to retrieve string description of error from handler instance

Returns

error description string (if applicable)

Return type

str, None

property exclude_regexes

Property to manage exclude_regexes attribute

Returns

Return type

list

property file_basename

Read-only property to access the input_file basename

Returns

input_file basename

Return type

str

property file_checksum

Read-only property to access the input_file checksum

Returns

input_file checksum string

Return type

str

property file_collection

Read-only property to access the handler’s primary PipelineFileCollection instance

Returns

handler file collection

Return type

PipelineFileCollection

property file_extension

Read-only property to access the input_file extension

Returns

input_file extension string

Return type

str

property file_type

Read-only property to access the input_file type

Returns

input_file type

Return type

FileType

property include_regexes

Property to manage include_regexes attribute

Returns

Return type

list

property input_file_archive_path

Property used to determine the archive path for the original input file

Returns

string containing the archive path

Return type

str

property input_file_object

Read-only property to access the original input file represented as a PipelineFile object

Returns

input file object

Return type

PipelineFile

property instance_working_directory

Read-only property to retrieve the instance working directory

Returns

string containing path to top level working directory for this instance

Return type

str, None

property logger

Read-only property to access the instance Logger

Returns

Logger

property notification_results

Read-only property to retrieve the notification results, including the sent status of each recipient

Returns

list of aodncore.pipeline.steps.notify.NotifyRecipient instances

Return type

aodncore.pipeline.steps.notify.NotifyList

ordered_states = ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED']
ordered_transitions = [{'trigger': '_trigger_initialise', 'source': 'HANDLER_INITIAL', 'dest': 'HANDLER_INITIALISED', 'before': '_initialise'}, {'trigger': '_trigger_resolve', 'source': 'HANDLER_INITIALISED', 'dest': 'HANDLER_RESOLVED', 'before': '_resolve'}, {'trigger': '_trigger_preprocess', 'source': 'HANDLER_RESOLVED', 'dest': 'HANDLER_PREPROCESSED', 'before': 'preprocess'}, {'trigger': '_trigger_check', 'source': 'HANDLER_PREPROCESSED', 'dest': 'HANDLER_CHECKED', 'before': '_check'}, {'trigger': '_trigger_process', 'source': 'HANDLER_CHECKED', 'dest': 'HANDLER_PROCESSED', 'before': 'process'}, {'trigger': '_trigger_publish', 'source': 'HANDLER_PROCESSED', 'dest': 'HANDLER_PUBLISHED', 'before': '_publish'}, {'trigger': '_trigger_postprocess', 'source': 'HANDLER_PUBLISHED', 'dest': 'HANDLER_POSTPROCESSED', 'before': 'postprocess'}]
other_states = ['HANDLER_NOTIFIED_SUCCESS', 'HANDLER_NOTIFIED_ERROR', 'HANDLER_COMPLETED_SUCCESS', 'HANDLER_COMPLETED_ERROR']
other_transitions = [{'trigger': '_trigger_notify_success', 'source': 'HANDLER_POSTPROCESSED', 'dest': 'HANDLER_NOTIFIED_SUCCESS', 'before': '_notify_success'}, {'trigger': '_trigger_notify_error', 'source': ['HANDLER_INITIAL', 'HANDLER_INITIALISED', 'HANDLER_RESOLVED', 'HANDLER_PREPROCESSED', 'HANDLER_CHECKED', 'HANDLER_PROCESSED', 'HANDLER_PUBLISHED', 'HANDLER_POSTPROCESSED'], 'dest': 'HANDLER_NOTIFIED_ERROR', 'before': '_notify_error'}, {'trigger': '_trigger_complete_success', 'source': 'HANDLER_NOTIFIED_SUCCESS', 'dest': 'HANDLER_COMPLETED_SUCCESS', 'before': '_complete_success'}, {'trigger': '_trigger_complete_with_errors', 'source': 'HANDLER_NOTIFIED_ERROR', 'dest': 'HANDLER_COMPLETED_ERROR', 'before': '_complete_with_errors'}]
postprocess()[source]

Method designed to be overridden by child handlers in order to execute code between publish and notify steps

Returns

None

preprocess()[source]

Method designed to be overridden by child handlers in order to execute code between resolve and check steps

Returns

None

process()[source]

Method designed to be overridden by child handlers in order to execute code between check and publish steps

Returns

None

property products_dir

Temporary subdirectory in which products may be created

Returns

products subdirectory of instance working directory

Return type

str, None

property result

Read-only property to retrieve the overall end result of the handler instance

Returns

handler result

Return type

aodncore.pipeline.common.HandlerResult

run()[source]

The entry point to the handler instance. Executes the automatic state machine transitions, and populates the result attribute to signal success or failure of the handler instance.

property should_notify

Read-only property to retrieve the list of intended recipients after being assembled based on notify_params

Returns

list of intended recipients

Return type

list

property start_time

Read-only property containing the timestamp of when this instance was created

Returns

timestamp of handler starting time

Return type

datetime.datetime

property state_query

Read-only property containing an initialised StateQuery instance, for querying existing pipeline state

Returns

StateQuery instance

Return type

StateQuery

property temp_dir

Temporary subdirectory where any other arbitrary temporary files may be created by handler sub-classes

Returns

temporary subdirectory of instance working directory

Return type

str, None

property versions

Read-only property to access module versions

Returns

module version strings for aodncore, aodndata and compliance checker modules

Return type

dict

class aodncore.pipeline.HandlerResult(value)[source]

Bases: Enum

An enumeration.

ERROR = 2
SUCCESS = 1
UNKNOWN = 0
class aodncore.pipeline.NotificationRecipientType(value)[source]

Bases: Enum

Notification recipient type

EMAIL = ('email', <function is_valid_email_address>, 'invalid email address')
INVALID = ('invalid', <function NotificationRecipientType.<lambda>>, 'invalid protocol')
SNS = ('sns', <function NotificationRecipientType.<lambda>>, 'invalid SNS topic')
property address_validation_function
property error_string
classmethod get_type_from_protocol(protocol)[source]
property protocol
class aodncore.pipeline.PipelineFile(local_path, name=None, archive_path=None, dest_path=None, is_deletion=False, late_deletion=False, file_update_callback=None, check_type=None, publish_type=None)[source]

Bases: PipelineFileBase

Represents a single file in order to store state information relating to the intended actions to be performed on the file, and the actions that were performed on the file

Parameters
  • local_path (str) – absolute source path to the file being represented

  • name (str) – arbitrary name (defaults to the output of os.path.basename() on local_path)

  • archive_path (str) – relative path used when archiving the file

  • dest_path (str) – relative path used when publishing the file

  • is_deletion (bool) – flag designating whether this is a deletion

  • late_deletion (bool) – flag to indicate that this file should be deleted after additions are performed (note: ignored if is_deletion=False)

  • file_update_callback (callable) – optional callback to call when a file property is updated

  • check_type (PipelineFileCheckType) – check type assigned to the file

  • publish_type (PipelineFilePublishType) – publish type assigned to the file

property archive_path
property check_log
property check_passed
property check_result
property check_type
property dest_path
property file_checksum
property file_update_callback
classmethod from_remotepipelinefile(remotepipelinefile, name=None, is_deletion=False, late_deletion=False, file_update_callback=None, check_type=None, publish_type=None)[source]

Construct a PipelineFile instance from an existing RemotePipelineFile instance

Parameters
  • remotepipelinefile – RemotePipelineFile instance used to instantiate a PipelineFile instance

  • name – name flag passed to __init__ (defaults to remotepipelinefile.name)

  • is_deletion – is_deletion flag passed to __init__

  • late_deletion – late_deletion flag passed to __init__

  • file_update_callback – file_update_callback flag passed to __init__

  • check_type – check_type flag passed to __init__

  • publish_type – publish_type flag passed to __init__

Returns

PipelineFile instance

property is_archived
property is_checked
property is_deleted
property is_deletion
property is_harvest_undone
property is_harvested
property is_overwrite
property is_stored
property is_upload_undone
property is_uploaded
property late_deletion
property mime_type
property pending_archive
property pending_harvest
property pending_harvest_addition
property pending_harvest_deletion
property pending_harvest_early_deletion
property pending_harvest_late_deletion
property pending_harvest_undo
property pending_store
property pending_store_addition
property pending_store_deletion
property pending_store_undo
property pending_undo
property publish_type
property published
property should_archive
property should_harvest
property should_store
property should_undo
property src_path
class aodncore.pipeline.PipelineFileCheckType(value)[source]

Bases: Enum

Each PipelineFile may individually specify which checks are performed against it

FORMAT_CHECK = 3
NC_COMPLIANCE_CHECK = 4
NONEMPTY_CHECK = 2
NO_ACTION = 1
TABLE_SCHEMA_CHECK = 5
UNSET = 0
all_checkable_types = {<PipelineFileCheckType.FORMAT_CHECK: 3>, <PipelineFileCheckType.NONEMPTY_CHECK: 2>, <PipelineFileCheckType.TABLE_SCHEMA_CHECK: 5>, <PipelineFileCheckType.NC_COMPLIANCE_CHECK: 4>}
all_settable_types = {<PipelineFileCheckType.NONEMPTY_CHECK: 2>, <PipelineFileCheckType.NO_ACTION: 1>, <PipelineFileCheckType.NC_COMPLIANCE_CHECK: 4>, <PipelineFileCheckType.FORMAT_CHECK: 3>, <PipelineFileCheckType.TABLE_SCHEMA_CHECK: 5>}
class aodncore.pipeline.PipelineFileCollection(data=None, validate_unique=True)[source]

Bases: PipelineFileCollectionBase

A PipelineFileCollectionBase subclass to hold a set of PipelineFile instances

add(pipeline_file, is_deletion=False, overwrite=False, validate_unique=True, **kwargs)[source]

Add a file to the collection

Parameters
  • pipeline_filePipelineFile or file path

  • kwargsdict additional keywords passed to to PipelineFileBase __init__ method

  • overwritebool which, if True, will overwrite an existing matching file in the collection

  • validate_uniquebool which, if True, will validate unique attributes when adding the file

Returns

bool which indicates whether the file was successfully added

classmethod from_remotepipelinefilecollection(remotepipelinefilecollection, are_deletions=False)[source]
member_class[source]

alias of PipelineFile

property member_from_string_method
member_validator()
set_archive_paths(archive_path_function)[source]

Set archive_path attributes for each file in the collection

Parameters

archive_path_function – function used to determine archive destination path

Returns

None

set_bool_attribute(attribute, value)[source]

Set a bool attribute for each file in the collection

Parameters
  • attribute – attribute to set

  • value – value to set the attribute

Returns

None

set_check_types(check_type)[source]

Set check_type attributes for each file in the collection

Parameters

check_typePipefileFileCheckType enum member

Returns

None

set_default_check_types(check_params=None)[source]

Set check_type attribute for each file in the collection to the default value, based on the file type and presence of compliance checker checks in the check parameters

Parameters

check_paramsdict or None

Returns

None

set_dest_paths(dest_path_function)[source]

Set dest_path attributes for each file in the collection

Parameters

dest_path_function – function used to determine publishing destination path

Returns

None

set_file_update_callback(file_update_callback)[source]

Set a callback function in each PipelineFile in this collection

Parameters

file_update_callback – callback (function)

Returns

None

set_publish_types(publish_type)[source]

Set publish_type attributes for each file in the collection

Parameters

publish_typePipefileFilePublishType enum member

Returns

None

set_publish_types_from_regexes(include_regexes, exclude_regexes, addition_type, deletion_type)[source]

Set publish_type attribute for each file in the collection depending on whether it is considered “included” according to the regex parameters

Parameters
  • include_regexes – regex(es) for which a file must match one or more to be included

  • exclude_regexes – regex(es) which will exclude an already included file

  • addition_typePipefileFilePublishType enum member set for included addition files

  • deletion_typePipefileFilePublishType enum member set for included deletion files

Returns

None

set_string_attribute(attribute, value)[source]

Set a string attribute for each file in the collection

Parameters
  • attribute – attribute to set

  • value – value to set the attribute

Returns

None

unique_attributes = ('archive_path', 'dest_path')
class aodncore.pipeline.PipelineFilePublishType(value)[source]

Bases: Enum

Each PipelineFile may individually specify which combination of archive/upload/harvest actions must occur before it is considered “published”

Enum member values are a tuple containing boolean flags used for querying/validating types, which are provided to the __init__() for each member (since confusingly, each Enum member is an instance of this class).

Each member’s valid is therefore a tuple of bool values representing the following flags:

(is_addition_type, is_deletion_type, is_archive_type, is_store_type, is_harvest_type)
ARCHIVE_ONLY = (True, False, True, False, False)
DELETE_ONLY = (False, True, False, True, False)
DELETE_UNHARVEST = (False, True, False, True, True)
HARVEST_ARCHIVE = (True, False, True, False, True)
HARVEST_ARCHIVE_UPLOAD = (True, False, True, True, True)
HARVEST_ONLY = (True, False, False, False, True)
HARVEST_UPLOAD = (True, False, False, True, True)
NO_ACTION = (True, True, False, False, False)
UNHARVEST_ONLY = (False, True, False, False, True)
UNSET = (None, None, None, None, None)
UPLOAD_ONLY = (True, False, False, True, False)
all_addition_types = {<PipelineFilePublishType.ARCHIVE_ONLY: (True, False, True, False, False)>, <PipelineFilePublishType.NO_ACTION: (True, True, False, False, False)>, <PipelineFilePublishType.HARVEST_UPLOAD: (True, False, False, True, True)>, <PipelineFilePublishType.HARVEST_ONLY: (True, False, False, False, True)>, <PipelineFilePublishType.HARVEST_ARCHIVE: (True, False, True, False, True)>, <PipelineFilePublishType.UPLOAD_ONLY: (True, False, False, True, False)>, <PipelineFilePublishType.HARVEST_ARCHIVE_UPLOAD: (True, False, True, True, True)>}
all_deletion_types = {<PipelineFilePublishType.DELETE_UNHARVEST: (False, True, False, True, True)>, <PipelineFilePublishType.NO_ACTION: (True, True, False, False, False)>, <PipelineFilePublishType.DELETE_ONLY: (False, True, False, True, False)>, <PipelineFilePublishType.UNHARVEST_ONLY: (False, True, False, False, True)>}
classmethod get_type_from_name(name)[source]
property is_addition_type
property is_archive_type
property is_deletion_type
property is_harvest_type
property is_store_type
class aodncore.pipeline.RemotePipelineFile(dest_path, local_path=None, name=None, last_modified=None, size=None)[source]

Bases: PipelineFileBase

Implementation of PipelineFileBase to represents a single remote file. This is used to provide a common interface for a remote file, to facilitate querying and downloading operations where the current state of the storage is relevant information

property file_checksum
classmethod from_pipelinefile(pipeline_file)[source]

Construct a RemotePipelineFile instance from an existing PipelineFile instance

Note: the local_path is deliberately not set in the new instance, primarily to avoid accidental overwriting of local files in case of conversion to a remote file and having the remote file downloaded. The local_path attribute may still be set, but it is an explicit operation.

Parameters

pipeline_file – PipelineFile instance used to instantiate a RemotePipelineFile instance

Returns

RemotePipelineFile instance

property last_modified
property local_path
remove_local()[source]
property size
class aodncore.pipeline.RemotePipelineFileCollection(data=None, validate_unique=True)[source]

Bases: PipelineFileCollectionBase

A PipelineFileCollectionBase subclass to hold a set of RemotePipelineFile instances

download(broker, local_path)[source]

Helper method to download the current collection from a given broker to a given local path

Parameters
  • broker – BaseStorageBroker subclass to download from

  • local_path – local path into which files are downloaded

Returns

None

classmethod from_pipelinefilecollection(pipelinefilecollection)[source]
keys()[source]
member_class[source]

alias of RemotePipelineFile

property member_from_string_method
member_validator()
unique_attributes = ('local_path', 'dest_path')
aodncore.pipeline.validate_pipelinefile_or_string(o)
aodncore.pipeline.validate_pipelinefilecollection(o)