aodncore.pipeline.steps package
Submodules
aodncore.pipeline.steps.basestep module
This module provides a common base class for step runner classes in for all steps (which in turn have a common base class for each step, inheriting from this one)
aodncore.pipeline.steps.check module
This module provides the step runner classes for the check step.
Checking is performed by a BaseCheckRunner
class, which is used to determine whether a file conforms to a
“check”, the definition of which is contained in the specific class. A check typically involves reading a file, and
testing whether the file conforms to some arbitrary criteria.
The most common use of this step is to test for compliance using the IOOS Compliance Checker.
- class aodncore.pipeline.steps.check.CheckRunnerAdapter(config, logger, check_params=None)[source]
Bases:
BaseCheckRunner
- class aodncore.pipeline.steps.check.ComplianceCheckerCheckRunner(config, logger, check_params=None)[source]
Bases:
BaseCheckRunner
- class aodncore.pipeline.steps.check.FormatCheckRunner(config, logger)[source]
Bases:
BaseCheckRunner
- class aodncore.pipeline.steps.check.NonEmptyCheckRunner(config, logger)[source]
Bases:
BaseCheckRunner
- class aodncore.pipeline.steps.check.TableSchemaCheckRunner(config, logger, check_params=None)[source]
Bases:
BaseCheckRunner
- aodncore.pipeline.steps.check.get_child_check_runner(check_type, config, logger, check_params=None)[source]
Factory function to return appropriate checker class based on check type value
- Parameters
check_type –
PipelineFileCheckType
enum membercheck_params – dict of parameters to pass to
BaseCheckRunner
class for runtime configurationconfig –
LazyConfigManager
instancelogger –
Logger
instance
- Returns
BaseCheckRunner
sub-class
aodncore.pipeline.steps.harvest module
This module provides the step runner classes for the “harvest” step, which is a sub-step of the publish step.
Harvesting is performed by a BaseHarvesterRunner
class.
This currently supports “talend” and “csv” as harvesting tools.
The “talend” harvester runner runs Talend instances as subprocesses, whereas the “csv” harvester runner calls the core “DatabaseInteractions” harvester class directly.
- class aodncore.pipeline.steps.harvest.CsvHarvesterRunner(storage_broker, harvest_params, config, logger)[source]
Bases:
BaseHarvesterRunner
BaseHarvesterRunner
implementation to load csv pipeline files to the database.- build_dependency_tree(obj)[source]
Update one item from the db_objects list to include indirect dependencies. (i.e. dependencies of dependencies, etc…).
- Parameters
obj – dict describing a database object (from db_objects config)
- Returns
copy of obj with updated dependencies
- get_config_file(filename)[source]
Function to return database connection object for schema
- Returns
dict containing database connection parameters.
- class aodncore.pipeline.steps.harvest.HarvesterMap[source]
Bases:
object
- add_event(harvester, event)[source]
Add a
TriggerEvent
to this map, under the given harvester- Parameters
harvester – harvester name
event –
TriggerEvent
object
- Returns
None
- property all_events
Get a flattened list of all events from all harvesters.
- Returns
list of all
TriggerEvent
instances from all harvesters
- property all_pipeline_files
Get a flattened collection containing all
PipelineFile
instances from all harvester events- Returns
PipelineFileCollection
containing allPipelineFile
objects in map
- is_collection_superset(pipeline_files)[source]
Determine whether all
PipelineFile
instances in the givenPipelineFileCollection
are present in this map- Parameters
pipeline_files –
PipelineFileCollection
for comparison- Returns
True if all files in the collection are in one or more events in this map
- property map
- merge(other)[source]
Merge another
HarvesterMap
instance into this one- Parameters
other – other
HarvesterMap
instance- Returns
None
- class aodncore.pipeline.steps.harvest.TalendHarvesterRunner(storage_broker, harvest_params, tmp_base_dir, config, logger, deletion=False)[source]
Bases:
BaseHarvesterRunner
BaseHarvesterRunner
implementation to execute Talend harvesters- execute_talend(executor, pipeline_files, talend_base_dir, success_attribute='is_harvested')[source]
- run(pipeline_files)[source]
The entry point to the ported talend trigger code to execute the harvester(s) for each file
- Returns
None
- run_additions(harvester_map, tmp_base_dir)[source]
Function to harvest and upload files using the appropriate file upload runner.
Operates in newly created temporary directory and creates symlink between source and destination file. Talend will then operate on the destination file (symlink).
- Parameters
harvester_map –
HarvesterMap
containing the events to be addedtmp_base_dir – temporary directory base for talend operation
- run_deletions(harvester_map, tmp_base_dir)[source]
Function to un-harvest and delete files using the appropriate file upload runner.
Operates in newly created temporary directory as talend requires a non-existent file to perform un-harvesting
- Parameters
harvester_map –
HarvesterMap
containing the events to be deletedtmp_base_dir – temporary directory base for talend operation
- run_undo_deletions(harvester_map)[source]
Function to un-harvest and undo stored files as appropriate in the case of errors.
Operates in newly created temporary directory as talend requires a non-existent file to perform “unharvesting”
- Parameters
harvester_map –
HarvesterMap
containing the events to be undone
- class aodncore.pipeline.steps.harvest.TriggerEvent(matched_files, extra_params=None)[source]
Bases:
object
- property extra_params
- property matched_files
- aodncore.pipeline.steps.harvest.get_harvester_runner(harvester_name, store_runner, harvest_params, tmp_base_dir, config, logger)[source]
Factory function to return appropriate harvester class
- Parameters
harvester_name – harvester name used to retrieve
BaseHarvesterRunner
classstore_runner –
BaseStoreRunner
instance to use for uploadsharvest_params – dict of parameters to pass to
BaseCheckRunner
class for runtime configurationtmp_base_dir – base temporary directory
config –
LazyConfigManager
instancelogger –
Logger
instance
- Returns
BaseHarvesterRunner
class
- aodncore.pipeline.steps.harvest.validate_harvester_mapping(pipeline_files, harvester_map)[source]
Validate whether all files in the given
PipelineFileCollection
are present at least once in the givenHarvesterMap
- Parameters
pipeline_files –
PipelineFileCollection
instanceharvester_map –
HarvesterMap
instance
- Returns
None
- aodncore.pipeline.steps.harvest.validate_harvestermap(o)
- aodncore.pipeline.steps.harvest.validate_triggerevent(o)
aodncore.pipeline.steps.notify module
This module provides the step runner classes for the notify step.
Notification is performed by a BaseNotifyRunner
class, which interacts with an endpoint representing a
notification protocol, in order to send a report detailing the status of the files processed by a handler class.
The most common use of this step is to send email notifications.
- class aodncore.pipeline.steps.notify.EmailNotifyRunner(notification_data, config, logger)[source]
Bases:
BaseNotifyRunner
- run(notify_list)[source]
Attempt to send notification email to recipients in notify_list parameter.
The status of each individual attempt is stored in a
dict
instance, as described in thesmtplib.SMTP.sendmail()
method docs, which allows per-recipient status inspection/error logging.- Parameters
notify_list –
NotifyList
instance- Returns
None
- class aodncore.pipeline.steps.notify.LogFailuresNotifyRunner(notification_data, config, logger)[source]
Bases:
BaseNotifyRunner
- class aodncore.pipeline.steps.notify.NotificationRecipient(address, notify_type, raw_string='', error=None)[source]
Bases:
object
- property address
- classmethod from_string(recipient_string)[source]
From a given ‘recipient string’, expected to be in the format of ‘protocol:address’, return a new
NotificationRecipient
object with attributes set according to the content/validity of the input string- Parameters
recipient_string – string in format of ‘protocol:address’
- Returns
NotificationRecipient
object
- property notification_attempted
- property notification_succeeded
- property notify_type
- property raw_string
- class aodncore.pipeline.steps.notify.NotifyList(data=None)[source]
Bases:
object
- append(recipient)
- filter_by_notify_type(notify_type)[source]
Return a new
NotifyList
containing only recipients of the given notify_type- Parameters
notify_type –
NotificationRecipientType
enum member by which to filterPipelineFile
instances- Returns
NotifyList
containing onlyNotifyRecipient
instances of the given type
- class aodncore.pipeline.steps.notify.NotifyRunnerAdapter(notification_data, config, logger, notify_params)[source]
Bases:
BaseNotifyRunner
- class aodncore.pipeline.steps.notify.SnsNotifyRunner(notification_data, config, logger)[source]
Bases:
BaseNotifyRunner
- aodncore.pipeline.steps.notify.get_notify_runner(notification_data, config, logger, notify_params=None)[source]
Factory function to return notify runner class
- Parameters
notification_data – dictionary containing notification data (i.e. template values)
config –
LazyConfigManager
instancelogger –
Logger
instancenotify_params – dict of parameters to pass to
BaseNotifyRunner
class for runtime configuration
- Returns
BaseNotifyRunner
class
aodncore.pipeline.steps.resolve module
This module provides the step runner classes for the resolve step.
Resolution is performed by a BaseResolveRunner
class, which is responsible for resolving an input file into
a PipelineFileCollection
. The input file may resolve into more than one file, for example in the case of a
ZIP file (which may contain multiple files to be processed) or a manifest file (which may refer to multiple file paths).
The __init__()
method is supplied with the input file and the output_dir. Based on the file extension, the
appropriate ResolveRunner class is first determined, before the abstract run()
method causes the file to be
extracted into the actual files to be processed. If this step is successful, the following will have occurred:
in the case of a non-manifest input file, the directory defined by the
output_dir
parameter will contain all of the files being handledin the case of a manifest file, the existence of the files defined in the manifest will be confirmed
the run method returns a PipelineFileCollection instance populated with all of these files
This means the rest of the handler code has no further need to be aware of the source of the files, and the file collection may then be processed in a generic way.
- class aodncore.pipeline.steps.resolve.DeleteManifestResolveRunner(*args, **kwargs)[source]
Bases:
BaseCsvManifestResolveRunner
- Handles a delete manifest file which only contains a list of source files, and optionally a valid
“deletion publish type” string, as defined by the PipelineFilePublishType enum. If delete_publish_type is omitted, the value will remain UNSET, and the handler will assume responsibility for setting the appropriate type
File format must be as follows:
destination/path/for/delete1,DELETE_PUBLISH_TYPE destination/path/for/delete2,DELETE_PUBLISH_TYPE
- property schema
Defines the tableschema schema describing the input file :returns: tableschema.Schema instance
- class aodncore.pipeline.steps.resolve.DirManifestResolveRunner(*args, **kwargs)[source]
Bases:
BaseManifestResolveRunner
Handles a simple manifest file which only contains a list of source files or directories, which will have all files recursively added to the collection
File format must be as follows:
/path/to/source/file1 /path/to/source/dir1
- class aodncore.pipeline.steps.resolve.GzipFileResolveRunner(input_file, output_dir, config, logger)[source]
Bases:
BaseResolveRunner
- class aodncore.pipeline.steps.resolve.JsonManifestResolveRunner(input_file, output_dir, config, logger, resolve_params=None)[source]
Bases:
BaseManifestResolveRunner
Handles a JSON manifest file, optionally with a pre-determined destination path. Unlike other resolve runners, this creates
PipelineFile
objects to add to the collection rather than allowing the collection to generate the objects.If a “files” attribute is present, the files will be added to the collection. The elements of the “files” attribute may be one of the following types:
1. an object, in which the 'local_path' attribute represents the source path of the file 1. optionally, if a the 'dest_path' attribute is provided, this will be used as a predetermined destination path, (similar to the MapManifestResolveRunner) e.g. { "files": [ { "local_path": "/path/to/source/file1" }, { "local_path": "/path/to/source/file2", "dest_path": "destination/path/for/upload2" } ] }
- class aodncore.pipeline.steps.resolve.MapManifestResolveRunner(*args, **kwargs)[source]
Bases:
BaseCsvManifestResolveRunner
Handles a manifest file with a pre-determined destination path. Unlike other resolve runners, this creates
PipelineFile
objects to add to the collection rather than allowing the collection to generate the objects.File format must be as follows:
/path/to/source/file1,destination/path/for/upload1 /path/to/source/file2,destination/path/for/upload2
- property schema
Defines the tableschema schema describing the input file :returns: tableschema.Schema instance
- class aodncore.pipeline.steps.resolve.RsyncManifestResolveRunner(input_file, output_dir, config, logger, resolve_params=None)[source]
Bases:
BaseManifestResolveRunner
Handles a manifest file as output by an rsync process
The manifest is generated by capturing the output of an rsync process run with the “-i, –itemize-changes” argument. See the RSYNC man page, https://download.samba.org/pub/rsync/rsync.html, for a detailed description of this format.
File format is expected to have invalid lines(header, whitespace,summary lines), so valid lines are extracted using regular expressions to determine the intended action. A file will typically look as follows. The lines should be classified as follows (text in square brackets not in actual files):
receiving incremental file list [HEADER LINE, IGNORED] *deleting aoml/1900709/profiles/ [DIRECTORY DELETION, IGNORED] .d..t...... aoml/1900709/ [DIRECTORY ADDITION, IGNORED] >f.st...... handlers/dummy/test_manifest.nc [FILE ADDITION] *deleting handlers/dummy/aoml/1900728/1900728_Rtraj.nc [FILE DELETION] [NON-MATCHING LINE, IGNORED] [NON-MATCHING LINE, IGNORED] sent 65477852 bytes received 407818360 bytes 115508.53 bytes/sec [NON-MATCHING LINE, IGNORED] total size is 169778564604 speedup is 358.72 [NON-MATCHING LINE, IGNORED]
- DELETE_PATTERN = re.compile('^\\*deleting')
- DIR_ADD_PATTERN = re.compile('^\\.d.{9}')
- FILE_ADD_PATTERN = re.compile('^>f.{9}')
- HEADER_LINE = 'receiving incremental file list'
- RECORD_PATTERN = re.compile('^\n (?P<operation>\\*deleting|[>.][df].{9}) # file operation type\n \\s{1,3} # space(s) separating operation from path\n , re.VERBOSE)
- class aodncore.pipeline.steps.resolve.SimpleManifestResolveRunner(input_file, output_dir, config, logger, resolve_params=None)[source]
Bases:
BaseManifestResolveRunner
Handles a simple manifest file which only contains a list of source files
File format must be as follows:
/path/to/source/file1 /path/to/source/file2
- class aodncore.pipeline.steps.resolve.SingleFileResolveRunner(input_file, output_dir, config, logger)[source]
Bases:
BaseResolveRunner
- class aodncore.pipeline.steps.resolve.ZipFileResolveRunner(input_file, output_dir, config, logger)[source]
Bases:
BaseResolveRunner
- aodncore.pipeline.steps.resolve.get_resolve_runner(input_file, output_dir, config, logger, resolve_params=None)[source]
Factory function to return appropriate resolver class based on the file extension
- Parameters
input_file – path to the input file
output_dir – directory where the resolved files will be extracted/copied
config –
LazyConfigManager
instancelogger –
Logger
instanceresolve_params – dict of parameters to pass to
BaseResolveRunner
class for runtime configuration
- Returns
BaseResolveRunner
class
aodncore.pipeline.steps.store module
This module provides the step runner class for the “store” step, which is a sub-step of the publish step.
The step runner delegates the low level storage operations to an internal BaseStorageBroker
instance, and so
it’s primary purpose is to abstract the storage operations from the HandlerBase
by providing an interface
similar to the other handler steps.
- class aodncore.pipeline.steps.store.StoreRunner(broker, config, logger, archive_mode=False)[source]
Bases:
BaseStepRunner
- property dest_path_attr
PipelineFile attribute containing the destination path
- Returns
bool
- property is_stored_attr
PipelineFile attribute to flag completion of upload operation
- Returns
bool
- property pending_addition_attr
- aodncore.pipeline.steps.store.get_store_runner(store_base_url, config, logger, archive_mode=False)[source]
Factory function to return store runner class, with it’s storage broker based on URL scheme
- Parameters
store_base_url – URL base for storage location
config – LazyConfigManager instance
logger – Logger instance
archive_mode – flag to indicate archive
- Returns
StoreRunner instance
Module contents
- class aodncore.pipeline.steps.NotificationRecipient(address, notify_type, raw_string='', error=None)[source]
Bases:
object
- property address
- classmethod from_string(recipient_string)[source]
From a given ‘recipient string’, expected to be in the format of ‘protocol:address’, return a new
NotificationRecipient
object with attributes set according to the content/validity of the input string- Parameters
recipient_string – string in format of ‘protocol:address’
- Returns
NotificationRecipient
object
- property notification_attempted
- property notification_succeeded
- property notify_type
- property raw_string
- class aodncore.pipeline.steps.NotifyList(data=None)[source]
Bases:
object
- append(recipient)
- filter_by_notify_type(notify_type)[source]
Return a new
NotifyList
containing only recipients of the given notify_type- Parameters
notify_type –
NotificationRecipientType
enum member by which to filterPipelineFile
instances- Returns
NotifyList
containing onlyNotifyRecipient
instances of the given type
- aodncore.pipeline.steps.get_child_check_runner(check_type, config, logger, check_params=None)[source]
Factory function to return appropriate checker class based on check type value
- Parameters
check_type –
PipelineFileCheckType
enum membercheck_params – dict of parameters to pass to
BaseCheckRunner
class for runtime configurationconfig –
LazyConfigManager
instancelogger –
Logger
instance
- Returns
BaseCheckRunner
sub-class
- aodncore.pipeline.steps.get_harvester_runner(harvester_name, store_runner, harvest_params, tmp_base_dir, config, logger)[source]
Factory function to return appropriate harvester class
- Parameters
harvester_name – harvester name used to retrieve
BaseHarvesterRunner
classstore_runner –
BaseStoreRunner
instance to use for uploadsharvest_params – dict of parameters to pass to
BaseCheckRunner
class for runtime configurationtmp_base_dir – base temporary directory
config –
LazyConfigManager
instancelogger –
Logger
instance
- Returns
BaseHarvesterRunner
class
- aodncore.pipeline.steps.get_notify_runner(notification_data, config, logger, notify_params=None)[source]
Factory function to return notify runner class
- Parameters
notification_data – dictionary containing notification data (i.e. template values)
config –
LazyConfigManager
instancelogger –
Logger
instancenotify_params – dict of parameters to pass to
BaseNotifyRunner
class for runtime configuration
- Returns
BaseNotifyRunner
class
- aodncore.pipeline.steps.get_resolve_runner(input_file, output_dir, config, logger, resolve_params=None)[source]
Factory function to return appropriate resolver class based on the file extension
- Parameters
input_file – path to the input file
output_dir – directory where the resolved files will be extracted/copied
config –
LazyConfigManager
instancelogger –
Logger
instanceresolve_params – dict of parameters to pass to
BaseResolveRunner
class for runtime configuration
- Returns
BaseResolveRunner
class
- aodncore.pipeline.steps.get_store_runner(store_base_url, config, logger, archive_mode=False)[source]
Factory function to return store runner class, with it’s storage broker based on URL scheme
- Parameters
store_base_url – URL base for storage location
config – LazyConfigManager instance
logger – Logger instance
archive_mode – flag to indicate archive
- Returns
StoreRunner instance