Source code for aodncore.pipeline.statequery

import warnings

from .files import RemotePipelineFileCollection

__all__ = [
    'StateQuery'
]


[docs]class StateQuery(object): """Simple state query interface, to provide user friendly access for querying existing Pipeline state """ def __init__(self, storage_broker, wfs_broker): self._storage_broker = storage_broker self._wfs_broker = wfs_broker # WFS methods @property def wfs(self): """Read-only property to access the instantiated WebFeatureService object :return: WebFeatureService instance """ return self._wfs_broker.wfs
[docs] def query_wfs_getfeature_dict(self, layer=None, **kwargs): """Make a GetFeature request, and return the response in a native dict :param layer: layer name supplied to GetFeature typename parameter :param kwargs: keyword arguments passed to the underlying WebFeatureService.getfeature method :return: dict containing the parsed GetFeature response """ # TODO: once aodndata code uses the layer parameter, this deprecation can be removed, and the parameter # converted into a mandatory positional argument if not layer: warnings.warn("This method signature will be updated to require a layer positional argument in a future" "version. Please update code to use `query_wfs_getfeature_dict(layer, **kwargs)` signature" "instead.", DeprecationWarning) layer = kwargs.pop('typename') return self._wfs_broker.getfeature_dict(layer, **kwargs)
[docs] def query_wfs_files(self, layer, **kwargs): # pragma: no cover """Return a RemotePipelineFileCollection containing all files for a given layer, or files matching the filter specified in the kwarg `ogc_expression` (of type OgcExpression) :param layer: layer name supplied to GetFeature typename parameter :param kwargs: keyword arguments passed to underlying broker method :return: RemotePipelineFileCollection containing list of files for the layer """ return RemotePipelineFileCollection(self._wfs_broker.query_files(layer, **kwargs))
[docs] def query_wfs_urls_for_layer(self, layer, **kwargs): # pragma: no cover warnings.warn("This method will be removed in a future version. Please update code to use " "`query_wfs_files` instead.", DeprecationWarning) return self._wfs_broker.query_files(layer, **kwargs)
[docs] def query_wfs_file_exists(self, layer, name): # pragma: no cover """Returns a bool representing whether a given 'file_url' is present in a layer :param layer: layer name supplied to GetFeature typename parameter :param name: 'file_url' inserted into OGC filter, and supplied to GetFeature filter parameter :return: whether the given file is present in the layer """ return self._wfs_broker.query_file_exists(layer, name)
# Storage methods
[docs] def download(self, remotepipelinefilecollection, local_path): """Helper method to download a RemotePipelineFileCollection or RemotePipelineFile :param remotepipelinefilecollection: RemotePipelineFileCollection to download :param local_path: local path where files will be downloaded. :return: None """ self._storage_broker.download(remotepipelinefilecollection, local_path)
[docs] def query_storage(self, query): # pragma: no cover """Query the storage backend and return existing files matching the given query :param query: S3-style prefix for filtering query results :return: RemotePipelineFileCollection of files matching the prefix """ return self._storage_broker.query(query)