aodncore.pipeline.steps package

Submodules

aodncore.pipeline.steps.basestep module

This module provides a common base class for step runner classes in for all steps (which in turn have a common base class for each step, inheriting from this one)

class aodncore.pipeline.steps.basestep.BaseStepRunner(config, logger)[source]

Bases: object

Common parent class of all “step runner” child classes

aodncore.pipeline.steps.check module

This module provides the step runner classes for the check step.

Checking is performed by a BaseCheckRunner class, which is used to determine whether a file conforms to a “check”, the definition of which is contained in the specific class. A check typically involves reading a file, and testing whether the file conforms to some arbitrary criteria.

The most common use of this step is to test for compliance using the IOOS Compliance Checker.

class aodncore.pipeline.steps.check.CheckRunnerAdapter(config, logger, check_params=None)[source]

Bases: BaseCheckRunner

run(pipeline_files)[source]
class aodncore.pipeline.steps.check.ComplianceCheckerCheckRunner(config, logger, check_params=None)[source]

Bases: BaseCheckRunner

run(pipeline_files)[source]
class aodncore.pipeline.steps.check.FormatCheckRunner(config, logger)[source]

Bases: BaseCheckRunner

run(pipeline_files)[source]
class aodncore.pipeline.steps.check.NonEmptyCheckRunner(config, logger)[source]

Bases: BaseCheckRunner

run(pipeline_files)[source]
class aodncore.pipeline.steps.check.TableSchemaCheckRunner(config, logger, check_params=None)[source]

Bases: BaseCheckRunner

run(pipeline_files)[source]
validate(path)[source]
aodncore.pipeline.steps.check.get_check_runner(config, logger, check_params=None)[source]
aodncore.pipeline.steps.check.get_child_check_runner(check_type, config, logger, check_params=None)[source]

Factory function to return appropriate checker class based on check type value

Parameters
  • check_typePipelineFileCheckType enum member

  • check_params – dict of parameters to pass to BaseCheckRunner class for runtime configuration

  • configLazyConfigManager instance

  • loggerLogger instance

Returns

BaseCheckRunner sub-class

aodncore.pipeline.steps.harvest module

This module provides the step runner classes for the “harvest” step, which is a sub-step of the publish step.

Harvesting is performed by a BaseHarvesterRunner class.

This currently supports “talend” and “csv” as harvesting tools.

The “talend” harvester runner runs Talend instances as subprocesses, whereas the “csv” harvester runner calls the core “DatabaseInteractions” harvester class directly.

class aodncore.pipeline.steps.harvest.CsvHarvesterRunner(storage_broker, harvest_params, config, logger)[source]

Bases: BaseHarvesterRunner

BaseHarvesterRunner implementation to load csv pipeline files to the database.

build_dependency_tree(obj)[source]

Update one item from the db_objects list to include indirect dependencies. (i.e. dependencies of dependencies, etc…).

Parameters

obj – dict describing a database object (from db_objects config)

Returns

copy of obj with updated dependencies

build_runsheet(pf)[source]

Function to generate a runsheet for the harvest process.

Returns

bool

get_config_file(filename)[source]

Function to return database connection object for schema

Returns

dict containing database connection parameters.

get_process_sequence(conn)[source]

Function to return the database transaction process sequence based in ingest_type.

Parameters

conn – instance of DatabaseInteractions().

Returns

dict containing process sequence.

run(pipeline_files)[source]

The entry point to the generic csv harvester

Returns

None

class aodncore.pipeline.steps.harvest.HarvesterMap[source]

Bases: object

add_event(harvester, event)[source]

Add a TriggerEvent to this map, under the given harvester

Parameters
  • harvester – harvester name

  • eventTriggerEvent object

Returns

None

property all_events

Get a flattened list of all events from all harvesters.

Returns

list of all TriggerEvent instances from all harvesters

property all_pipeline_files

Get a flattened collection containing all PipelineFile instances from all harvester events

Returns

PipelineFileCollection containing all PipelineFile objects in map

is_collection_superset(pipeline_files)[source]

Determine whether all PipelineFile instances in the given PipelineFileCollection are present in this map

Parameters

pipeline_filesPipelineFileCollection for comparison

Returns

True if all files in the collection are in one or more events in this map

property map
merge(other)[source]

Merge another HarvesterMap instance into this one

Parameters

other – other HarvesterMap instance

Returns

None

set_pipelinefile_bool_attribute(attribute, value)[source]

Set a boolean attribute on all PipelineFile instances in all events

Parameters
  • attribute – attribute to set

  • value – value to set

Returns

None

class aodncore.pipeline.steps.harvest.TalendHarvesterRunner(storage_broker, harvest_params, tmp_base_dir, config, logger, deletion=False)[source]

Bases: BaseHarvesterRunner

BaseHarvesterRunner implementation to execute Talend harvesters

execute_talend(executor, pipeline_files, talend_base_dir, success_attribute='is_harvested')[source]
match_harvester_to_files(pipeline_files)[source]
run(pipeline_files)[source]

The entry point to the ported talend trigger code to execute the harvester(s) for each file

Returns

None

run_additions(harvester_map, tmp_base_dir)[source]

Function to harvest and upload files using the appropriate file upload runner.

Operates in newly created temporary directory and creates symlink between source and destination file. Talend will then operate on the destination file (symlink).

Parameters
  • harvester_mapHarvesterMap containing the events to be added

  • tmp_base_dir – temporary directory base for talend operation

run_deletions(harvester_map, tmp_base_dir)[source]

Function to un-harvest and delete files using the appropriate file upload runner.

Operates in newly created temporary directory as talend requires a non-existent file to perform un-harvesting

Parameters
  • harvester_mapHarvesterMap containing the events to be deleted

  • tmp_base_dir – temporary directory base for talend operation

run_undo_deletions(harvester_map)[source]

Function to un-harvest and undo stored files as appropriate in the case of errors.

Operates in newly created temporary directory as talend requires a non-existent file to perform “unharvesting”

Parameters

harvester_mapHarvesterMap containing the events to be undone

undo_processed_files(undo_map)[source]
class aodncore.pipeline.steps.harvest.TriggerEvent(matched_files, extra_params=None)[source]

Bases: object

property extra_params
property matched_files
aodncore.pipeline.steps.harvest.create_input_file_list(talend_base_dir, matched_file_list)[source]
aodncore.pipeline.steps.harvest.executor_conversion(executor)[source]
aodncore.pipeline.steps.harvest.get_harvester_runner(harvester_name, store_runner, harvest_params, tmp_base_dir, config, logger)[source]

Factory function to return appropriate harvester class

Parameters
  • harvester_name – harvester name used to retrieve BaseHarvesterRunner class

  • store_runnerBaseStoreRunner instance to use for uploads

  • harvest_params – dict of parameters to pass to BaseCheckRunner class for runtime configuration

  • tmp_base_dir – base temporary directory

  • configLazyConfigManager instance

  • loggerLogger instance

Returns

BaseHarvesterRunner class

aodncore.pipeline.steps.harvest.validate_harvester_mapping(pipeline_files, harvester_map)[source]

Validate whether all files in the given PipelineFileCollection are present at least once in the given HarvesterMap

Parameters
  • pipeline_filesPipelineFileCollection instance

  • harvester_mapHarvesterMap instance

Returns

None

aodncore.pipeline.steps.harvest.validate_harvestermap(o)
aodncore.pipeline.steps.harvest.validate_triggerevent(o)

aodncore.pipeline.steps.notify module

This module provides the step runner classes for the notify step.

Notification is performed by a BaseNotifyRunner class, which interacts with an endpoint representing a notification protocol, in order to send a report detailing the status of the files processed by a handler class.

The most common use of this step is to send email notifications.

class aodncore.pipeline.steps.notify.EmailNotifyRunner(notification_data, config, logger)[source]

Bases: BaseNotifyRunner

run(notify_list)[source]

Attempt to send notification email to recipients in notify_list parameter.

The status of each individual attempt is stored in a dict instance, as described in the smtplib.SMTP.sendmail() method docs, which allows per-recipient status inspection/error logging.

Parameters

notify_listNotifyList instance

Returns

None

class aodncore.pipeline.steps.notify.LogFailuresNotifyRunner(notification_data, config, logger)[source]

Bases: BaseNotifyRunner

run(notify_list)[source]
class aodncore.pipeline.steps.notify.NotificationRecipient(address, notify_type, raw_string='', error=None)[source]

Bases: object

property address
classmethod from_string(recipient_string)[source]

From a given ‘recipient string’, expected to be in the format of ‘protocol:address’, return a new NotificationRecipient object with attributes set according to the content/validity of the input string

Parameters

recipient_string – string in format of ‘protocol:address’

Returns

NotificationRecipient object

property notification_attempted
property notification_succeeded
property notify_type
property raw_string
class aodncore.pipeline.steps.notify.NotifyList(data=None)[source]

Bases: object

add(recipient)[source]
append(recipient)
difference(sequence)[source]
discard(recipient)[source]
filter_by_failed()[source]
filter_by_notify_type(notify_type)[source]

Return a new NotifyList containing only recipients of the given notify_type

Parameters

notify_typeNotificationRecipientType enum member by which to filter PipelineFile instances

Returns

NotifyList containing only NotifyRecipient instances of the given type

filter_by_succeeded()[source]
classmethod from_collection(recipient_collection)[source]
issubset(sequence)[source]
issuperset(sequence)[source]
set_error(error)[source]

Set the error attribute for all elements

Parameters

errorException instance

Returns

None

set_notification_attempted()[source]
union(sequence)[source]
update(sequence)[source]
update_from_error_dict(error_dict)[source]

Update recipient statuses according to the given error dictionary parameter. The absence of an address in the dict keys will be interpreted as “successfully sent”.

Parameters

error_dict – dict as returned by smtplib.SMTP.sendmail() method

Returns

None

class aodncore.pipeline.steps.notify.NotifyRunnerAdapter(notification_data, config, logger, notify_params)[source]

Bases: BaseNotifyRunner

run(notify_list)[source]
class aodncore.pipeline.steps.notify.SnsNotifyRunner(notification_data, config, logger)[source]

Bases: BaseNotifyRunner

run(notify_list)[source]
aodncore.pipeline.steps.notify.get_notify_runner(notification_data, config, logger, notify_params=None)[source]

Factory function to return notify runner class

Parameters
  • notification_data – dictionary containing notification data (i.e. template values)

  • configLazyConfigManager instance

  • loggerLogger instance

  • notify_params – dict of parameters to pass to BaseNotifyRunner class for runtime configuration

Returns

BaseNotifyRunner class

aodncore.pipeline.steps.resolve module

This module provides the step runner classes for the resolve step.

Resolution is performed by a BaseResolveRunner class, which is responsible for resolving an input file into a PipelineFileCollection. The input file may resolve into more than one file, for example in the case of a ZIP file (which may contain multiple files to be processed) or a manifest file (which may refer to multiple file paths).

The __init__() method is supplied with the input file and the output_dir. Based on the file extension, the appropriate ResolveRunner class is first determined, before the abstract run() method causes the file to be extracted into the actual files to be processed. If this step is successful, the following will have occurred:

  • in the case of a non-manifest input file, the directory defined by the output_dir parameter will contain all of the files being handled

  • in the case of a manifest file, the existence of the files defined in the manifest will be confirmed

  • the run method returns a PipelineFileCollection instance populated with all of these files

This means the rest of the handler code has no further need to be aware of the source of the files, and the file collection may then be processed in a generic way.

class aodncore.pipeline.steps.resolve.DeleteManifestResolveRunner(*args, **kwargs)[source]

Bases: BaseCsvManifestResolveRunner

Handles a delete manifest file which only contains a list of source files, and optionally a valid

“deletion publish type” string, as defined by the PipelineFilePublishType enum. If delete_publish_type is omitted, the value will remain UNSET, and the handler will assume responsibility for setting the appropriate type

File format must be as follows:

destination/path/for/delete1,DELETE_PUBLISH_TYPE
destination/path/for/delete2,DELETE_PUBLISH_TYPE
property schema

Defines the tableschema schema describing the input file :returns: tableschema.Schema instance

class aodncore.pipeline.steps.resolve.DirManifestResolveRunner(*args, **kwargs)[source]

Bases: BaseManifestResolveRunner

Handles a simple manifest file which only contains a list of source files or directories, which will have all files recursively added to the collection

File format must be as follows:

/path/to/source/file1
/path/to/source/dir1
run()[source]
class aodncore.pipeline.steps.resolve.GzipFileResolveRunner(input_file, output_dir, config, logger)[source]

Bases: BaseResolveRunner

run()[source]
class aodncore.pipeline.steps.resolve.JsonManifestResolveRunner(input_file, output_dir, config, logger, resolve_params=None)[source]

Bases: BaseManifestResolveRunner

Handles a JSON manifest file, optionally with a pre-determined destination path. Unlike other resolve runners, this creates PipelineFile objects to add to the collection rather than allowing the collection to generate the objects.

If a “files” attribute is present, the files will be added to the collection. The elements of the “files” attribute may be one of the following types:

1. an object, in which the 'local_path' attribute represents the source path of the file
    1. optionally, if a the 'dest_path' attribute is provided, this will be used as a predetermined destination
        path, (similar to the MapManifestResolveRunner)
e.g.

{
    "files": [
        {
            "local_path": "/path/to/source/file1"
        },
        {
            "local_path": "/path/to/source/file2",
            "dest_path": "destination/path/for/upload2"
        }
    ]
}
run()[source]
class aodncore.pipeline.steps.resolve.MapManifestResolveRunner(*args, **kwargs)[source]

Bases: BaseCsvManifestResolveRunner

Handles a manifest file with a pre-determined destination path. Unlike other resolve runners, this creates PipelineFile objects to add to the collection rather than allowing the collection to generate the objects.

File format must be as follows:

/path/to/source/file1,destination/path/for/upload1
/path/to/source/file2,destination/path/for/upload2
property schema

Defines the tableschema schema describing the input file :returns: tableschema.Schema instance

class aodncore.pipeline.steps.resolve.RsyncManifestResolveRunner(input_file, output_dir, config, logger, resolve_params=None)[source]

Bases: BaseManifestResolveRunner

Handles a manifest file as output by an rsync process

The manifest is generated by capturing the output of an rsync process run with the “-i, –itemize-changes” argument. See the RSYNC man page, https://download.samba.org/pub/rsync/rsync.html, for a detailed description of this format.

File format is expected to have invalid lines(header, whitespace,summary lines), so valid lines are extracted using regular expressions to determine the intended action. A file will typically look as follows. The lines should be classified as follows (text in square brackets not in actual files):

receiving incremental file list                                     [HEADER LINE, IGNORED]
*deleting   aoml/1900709/profiles/                                  [DIRECTORY DELETION, IGNORED]
.d..t...... aoml/1900709/                                           [DIRECTORY ADDITION, IGNORED]
>f.st...... handlers/dummy/test_manifest.nc                         [FILE ADDITION]
*deleting   handlers/dummy/aoml/1900728/1900728_Rtraj.nc            [FILE DELETION]
                                                                    [NON-MATCHING LINE, IGNORED]
                                                                    [NON-MATCHING LINE, IGNORED]
sent 65477852 bytes  received 407818360 bytes  115508.53 bytes/sec  [NON-MATCHING LINE, IGNORED]
total size is 169778564604  speedup is 358.72                       [NON-MATCHING LINE, IGNORED]
DELETE_PATTERN = re.compile('^\\*deleting')
DIR_ADD_PATTERN = re.compile('^\\.d.{9}')
FILE_ADD_PATTERN = re.compile('^>f.{9}')
HEADER_LINE = 'receiving incremental file list'
RECORD_PATTERN = re.compile('^\n                                (?P<operation>\\*deleting|[>.][df].{9}) # file operation type\n                                \\s{1,3} # space(s) separating operation from path\n                 , re.VERBOSE)
classmethod classify_line(line)[source]
run()[source]
class aodncore.pipeline.steps.resolve.SimpleManifestResolveRunner(input_file, output_dir, config, logger, resolve_params=None)[source]

Bases: BaseManifestResolveRunner

Handles a simple manifest file which only contains a list of source files

File format must be as follows:

/path/to/source/file1
/path/to/source/file2
run()[source]
class aodncore.pipeline.steps.resolve.SingleFileResolveRunner(input_file, output_dir, config, logger)[source]

Bases: BaseResolveRunner

run()[source]
class aodncore.pipeline.steps.resolve.ZipFileResolveRunner(input_file, output_dir, config, logger)[source]

Bases: BaseResolveRunner

run()[source]
aodncore.pipeline.steps.resolve.get_resolve_runner(input_file, output_dir, config, logger, resolve_params=None)[source]

Factory function to return appropriate resolver class based on the file extension

Parameters
  • input_file – path to the input file

  • output_dir – directory where the resolved files will be extracted/copied

  • configLazyConfigManager instance

  • loggerLogger instance

  • resolve_params – dict of parameters to pass to BaseResolveRunner class for runtime configuration

Returns

BaseResolveRunner class

aodncore.pipeline.steps.store module

This module provides the step runner class for the “store” step, which is a sub-step of the publish step.

The step runner delegates the low level storage operations to an internal BaseStorageBroker instance, and so it’s primary purpose is to abstract the storage operations from the HandlerBase by providing an interface similar to the other handler steps.

class aodncore.pipeline.steps.store.StoreRunner(broker, config, logger, archive_mode=False)[source]

Bases: BaseStepRunner

property dest_path_attr

PipelineFile attribute containing the destination path

Returns

bool

property is_stored_attr

PipelineFile attribute to flag completion of upload operation

Returns

bool

property pending_addition_attr
run(pipeline_files)[source]

Execute the pending storage operation(s) for each file in the given collection

Parameters

pipeline_files – PipelineFileCollection instance

Returns

None

set_is_overwrite(pipeline_files)[source]

Set the “is_overwrite” attribute for each file in the given collection

Parameters

pipeline_files – collection to

Returns

None

aodncore.pipeline.steps.store.get_store_runner(store_base_url, config, logger, archive_mode=False)[source]

Factory function to return store runner class, with it’s storage broker based on URL scheme

Parameters
  • store_base_url – URL base for storage location

  • config – LazyConfigManager instance

  • logger – Logger instance

  • archive_mode – flag to indicate archive

Returns

StoreRunner instance

Module contents

class aodncore.pipeline.steps.NotificationRecipient(address, notify_type, raw_string='', error=None)[source]

Bases: object

property address
classmethod from_string(recipient_string)[source]

From a given ‘recipient string’, expected to be in the format of ‘protocol:address’, return a new NotificationRecipient object with attributes set according to the content/validity of the input string

Parameters

recipient_string – string in format of ‘protocol:address’

Returns

NotificationRecipient object

property notification_attempted
property notification_succeeded
property notify_type
property raw_string
class aodncore.pipeline.steps.NotifyList(data=None)[source]

Bases: object

add(recipient)[source]
append(recipient)
difference(sequence)[source]
discard(recipient)[source]
filter_by_failed()[source]
filter_by_notify_type(notify_type)[source]

Return a new NotifyList containing only recipients of the given notify_type

Parameters

notify_typeNotificationRecipientType enum member by which to filter PipelineFile instances

Returns

NotifyList containing only NotifyRecipient instances of the given type

filter_by_succeeded()[source]
classmethod from_collection(recipient_collection)[source]
issubset(sequence)[source]
issuperset(sequence)[source]
set_error(error)[source]

Set the error attribute for all elements

Parameters

errorException instance

Returns

None

set_notification_attempted()[source]
union(sequence)[source]
update(sequence)[source]
update_from_error_dict(error_dict)[source]

Update recipient statuses according to the given error dictionary parameter. The absence of an address in the dict keys will be interpreted as “successfully sent”.

Parameters

error_dict – dict as returned by smtplib.SMTP.sendmail() method

Returns

None

aodncore.pipeline.steps.get_check_runner(config, logger, check_params=None)[source]
aodncore.pipeline.steps.get_child_check_runner(check_type, config, logger, check_params=None)[source]

Factory function to return appropriate checker class based on check type value

Parameters
  • check_typePipelineFileCheckType enum member

  • check_params – dict of parameters to pass to BaseCheckRunner class for runtime configuration

  • configLazyConfigManager instance

  • loggerLogger instance

Returns

BaseCheckRunner sub-class

aodncore.pipeline.steps.get_harvester_runner(harvester_name, store_runner, harvest_params, tmp_base_dir, config, logger)[source]

Factory function to return appropriate harvester class

Parameters
  • harvester_name – harvester name used to retrieve BaseHarvesterRunner class

  • store_runnerBaseStoreRunner instance to use for uploads

  • harvest_params – dict of parameters to pass to BaseCheckRunner class for runtime configuration

  • tmp_base_dir – base temporary directory

  • configLazyConfigManager instance

  • loggerLogger instance

Returns

BaseHarvesterRunner class

aodncore.pipeline.steps.get_notify_runner(notification_data, config, logger, notify_params=None)[source]

Factory function to return notify runner class

Parameters
  • notification_data – dictionary containing notification data (i.e. template values)

  • configLazyConfigManager instance

  • loggerLogger instance

  • notify_params – dict of parameters to pass to BaseNotifyRunner class for runtime configuration

Returns

BaseNotifyRunner class

aodncore.pipeline.steps.get_resolve_runner(input_file, output_dir, config, logger, resolve_params=None)[source]

Factory function to return appropriate resolver class based on the file extension

Parameters
  • input_file – path to the input file

  • output_dir – directory where the resolved files will be extracted/copied

  • configLazyConfigManager instance

  • loggerLogger instance

  • resolve_params – dict of parameters to pass to BaseResolveRunner class for runtime configuration

Returns

BaseResolveRunner class

aodncore.pipeline.steps.get_store_runner(store_base_url, config, logger, archive_mode=False)[source]

Factory function to return store runner class, with it’s storage broker based on URL scheme

Parameters
  • store_base_url – URL base for storage location

  • config – LazyConfigManager instance

  • logger – Logger instance

  • archive_mode – flag to indicate archive

Returns

StoreRunner instance