Source code for aodncore.pipeline.fileclassifier

"""FileClassifier - Generic class for working out the destination path of
a file to be published. The idea is to define the common functionality
here, then create subclasses to customise for each specific incoming
handler. 

Expected use::

    class MyFileClassifier(FileClassifier):
        def dest_path(self, input_file):
            path = <case-specific logic> 
            ...
            return path

    try:
        dest_path = MyFileClassifier.dest_path(input_file)
    except FileClassifierException, e:
        print >>sys.stderr, e
        raise

    print dest_path

"""

import os
import re

from datetime import datetime

from netCDF4 import Dataset

from .exceptions import InvalidFileFormatError, InvalidFileNameError, InvalidFileContentError


[docs]class FileClassifier(object): """Base class for working out where a file should be published.""" @classmethod def _get_file_name_fields(cls, input_file, min_fields=6): """Return the '_'-separated fields in the file name as a list. Raise an exception if the number of fields is less than min_fields. """ # trim off dirs & extention basename = os.path.basename(input_file) just_the_name = re.sub(r'.\w*$', '', basename) fields = just_the_name.split('_') if len(fields) < min_fields: raise InvalidFileNameError( "'{name}' has less than {nfld:d} fields in file name.".format(name=input_file, nfld=min_fields) ) return fields @classmethod def _get_facility(cls, input_file, check_sub=True): """Get the facility/sub-facility from the file name and return as a tuple ('facility', 'sub-facility'). Raise exception if no sub-facility is present, unless check_sub is False. """ name_field = cls._get_file_name_fields(input_file, min_fields=2) fac_subfac = name_field[1].split('-') if check_sub and len(fac_subfac) < 2: raise InvalidFileNameError("Missing sub-facility in file name '{name}'".format(name=input_file)) return tuple(fac_subfac) @classmethod def _open_nc_file(cls, file_path): """Open a NetCDF file for reading""" try: return Dataset(file_path, mode='r') except Exception: raise InvalidFileFormatError("Could not open NetCDF file '{path}'.".format(path=file_path)) @classmethod def _get_nc_att(cls, file_path, att_name, default=None, time_format=None): """Return the value of a global attribute from a NetCDF file. If a list of attribute names is given, a list of values is returned. Unless a default value other than None is given, a missing attribute raises an exception. If time_format is not None, the value of the attribute is converted into a datetime object using the given format. If this fails an error is raised. If time_format=True, use the format required by the IMOS conventions. """ dataset = cls._open_nc_file(file_path) if isinstance(att_name, list): att_list = att_name else: att_list = [att_name] values = [] if time_format is True: time_format = '%Y-%m-%dT%H:%M:%SZ' for att in att_list: if not hasattr(dataset, att): if default is None: raise InvalidFileContentError( "File '{path}' has no attribute '{att}'".format(path=file_path, att=att) ) else: values.append(default) continue val = getattr(dataset, att) if time_format: try: val = datetime.strptime(val, time_format) except ValueError: raise InvalidFileContentError( "Could not parse attribute {att}='{val}' as a datetime" " (file '{file_path}')".format(att=att, val=val, file_path=file_path) ) values.append(val) dataset.close() if isinstance(att_name, list): return values return values[0] @classmethod def _get_site_code(cls, input_file): """Return the site_code attribute of the input_file""" return cls._get_nc_att(input_file, 'site_code') @classmethod def _get_variable_names(cls, input_file): """Return a list of the variable names in the file.""" dataset = cls._open_nc_file(input_file) names = list(dataset.variables.keys()) dataset.close() return names @classmethod def _make_path(cls, dir_list): """Create a path from a list of directory names, making sure the result is a plain ascii string, not unicode (which could happen if some of the components of dir_list come from NetCDF file attributes). """ for i in range(len(dir_list)): dir_list[i] = str(dir_list[i]) return os.path.join(*dir_list)