Source code for aodncore.pipeline.geonetwork

"""
Geonetwork Library
"""
import contextlib
import json
import os
from io import StringIO
from xml.etree import ElementTree

# 'requests>=2.5' is a dependency of tableschema (and possibly other aodncore requirements), however should tableschema
# no longer be required, it may be necessary to explicitly install 'requests'
import requests
from requests.exceptions import ConnectionError, RequestException

from ..util import list_not_empty, generate_id
from .exceptions import GeonetworkRequestError, GeonetworkConnectionError


BASE_API = "srv/api/0.1"
ENDPOINT_RECORD_GET = 'records'
ENDPOINT_BATCH_UPDATE = 'records/batchediting'


[docs]def dict_to_xml(tag, value=None, attr=None, elems=None, display=True): """Convert a dictionary of XML nodes into a nested XML string :param tag: a string or list containing element tag(s). If tag is a list then recursively build the parent elements and apply the rest of the logic to the last element. :param value: string representing text for an element :param attr: dict containing key:value pairs representing attributes of the element :param elems: list of dicts containing nested element definitions. If this is present then the value will be overridden with recursive child elements :param display: boolen to identify whether the element (and child nodes) should be rendered to the XML string :return: an XML string """ if isinstance(tag, list): nodes = {'tag': tag[-1], 'elems': elems, 'attr': attr, 'value': value} for t in reversed(tag[:-1]): nodes = {'tag': t, 'elems': [nodes]} return dict_to_xml(**nodes) if not display: return '' attr = [] if attr is None else [' {}="{}"'.format(k, v) for k, v in attr.items()] if elems: value = '' for elem in elems: value += dict_to_xml(**elem) return '<{tag}{attributes}>{value}</{tag}>'.format(tag=tag, attributes=' '.join(attr), value=value)
[docs]@contextlib.contextmanager def geonetwork_exception_handler(): try: yield except ConnectionError as e: raise GeonetworkConnectionError(e) except RequestException as e: raise GeonetworkRequestError(e)
[docs]class Geonetwork(object): """Geonetwork API session handler :param base_url: Geonetwork instance base url :param username: username for the Geonetwork API :param password: password for the Geonetwork API :param logger: an instance of the logger """ def __init__(self, base_url, username, password): self.base_url = base_url self.session = requests.Session() self.session.verify = True self.session.auth = (username, password) # init cookies url = os.path.join(self.base_url, BASE_API) with geonetwork_exception_handler(): self.session.post(url) for cookie in self.session.cookies: if cookie.name == "XSRF-TOKEN": self.session.headers.update({'X-XSRF-TOKEN': cookie.value}) self.session.headers.update({'Accept': 'application/xml'}) def _get(self, path): url = os.path.join(self.base_url, path) with geonetwork_exception_handler(): response = self.session.get(url) response.raise_for_status() return response def _post(self, path, data=None, params=None): url = os.path.join(self.base_url, path) with geonetwork_exception_handler(): response = self.session.post(url, data=json.dumps(data), params=params) response.raise_for_status() return response def _put(self, path, data=None, params=None, headers=None): url = os.path.join(self.base_url, path) with geonetwork_exception_handler(): response = self.session.put(url, data=json.dumps(data), params=params, headers=headers) response.raise_for_status() return response
[docs] def get_record(self, _uuid): """Retrieve a metadata record :param _uuid: Geonetwork record ID :return: xml of specified metadata record""" return self._get(os.path.join(BASE_API, ENDPOINT_RECORD_GET, _uuid)).text
[docs] def update_record(self, _uuid, changes): """Update Geonetwork record :param _uuid: Geonetwork record ID :param changes: list of change dicts where each change contains a value and an xpath """ params = {"uuids": _uuid} headers = {"accept": "application/json", "content-type": "application/json"} self._put(os.path.join(BASE_API, ENDPOINT_BATCH_UPDATE), data=changes, params=params, headers=headers)
[docs]class GeonetworkMetadataHandler(object): """Handle changes to Geonetwork metadata from Harvester Build the geonetwork payload and push changes :param conn: the database connection class (DatabaseInteractions) :param session: the geonetwork API session :param metadata: dict containing extents for a single metadata record :param logger: instance of the logger """ def __init__(self, conn, session, metadata, logger): self._logger = logger self._session = session self._conn = conn self.uuid = metadata.get('uuid') self.spatial = metadata.get('spatial') self.temporal = metadata.get('temporal') self.vertical = metadata.get('vertical') self.spatial_data = {} self.vertical_data = {} self.temporal_data = {} self.xml_text = None
[docs] def get_namespace_dict(self): """Scrape relevant namespaces from source metadata record""" ns_raw = dict([ node for (_, node) in ElementTree.iterparse(StringIO(self.xml_text), events=['start-ns']) ]) ns_keep = ['mri', 'gex', 'gml', 'gco'] ns = {} for k, v in ns_raw.items(): if k in ns_keep: ns['xmlns:{}'.format(k)] = v return ns
[docs] def build_api_payload(self): """Build the batchedit API payload based on dict template The payload is a complete replacement of all extents for the provided metadata record """ template = { 'tag': ['gn_replace', 'gex:EX_Extent'], 'attr': self.get_namespace_dict(), 'elems': [ { # geographic extent 'display': bool(self.spatial_data), 'tag': ['gex:geographicElement', 'gex:EX_BoundingPolygon', 'gex:polygon'], 'value': self.spatial_data.get('boundingpolygonasgml3') }, { # vertical extent 'display': bool(self.vertical_data), 'tag': ['gex:verticalElement', 'gex:EX_VerticalExtent'], 'elems': [ { 'tag': ['gex:minimumValue', 'gco:Real'], 'value': self.vertical_data.get('min_value') }, { 'tag': ['gex:maximumValue', 'gco:Real'], 'value': self.vertical_data.get('max_value') } ] }, { # temporal extent 'display': bool(self.temporal_data), 'tag': ['gex:temporalElement', 'gex:EX_TemporalExtent', 'gex:extent', 'gml:TimePeriod'], 'attr': {'gml:id': generate_id()}, 'elems': [ { 'tag': 'gml:beginPosition', 'value': self.temporal_data.get('min_value') }, { 'tag': 'gml:endPosition', 'value': self.temporal_data.get('max_value') } ] } ] } return [ {'value': dict_to_xml(**template), 'xpath': './/mri:MD_DataIdentification/mri:extent'} ]
[docs] def run(self): if list_not_empty([self.spatial, self.temporal, self.vertical]): self._logger.info('Collecting extent data for {}'.format(self.uuid)) if self.spatial: self.spatial_data = self._conn.get_spatial_extent(**self.spatial) if self.temporal: self.temporal_data = self._conn.get_temporal_extent(**self.temporal) if self.vertical: self.vertical_data = self._conn.get_vertical_extent(**self.vertical) self.xml_text = self._session.get_record(self.uuid) payload = self.build_api_payload() self._logger.info('Updating extent data for {}'.format(self.uuid)) self._session.update_record(_uuid=self.uuid, changes=payload)