Source code for ckan_api_client.low_level

import json
import urlparse

import requests

from .exceptions import HTTPError, BadApiError
from .utils import SuppressExceptionIf


[docs]class CkanLowlevelClient(object): """ Ckan low-level client. - Handles authentication and response validation - Handles request body serialization and response body deserialization - Raises HTTPError exceptions on failed HTTP requests - Performs some checks on return values from the API """ def __init__(self, base_url, api_key=None): """ :param basestring base_url: Base url for the Ckan installation :param basestring api_key: API key to be used for authentication. If omitted, no authentication information will be sent. """ self.base_url = base_url self.api_key = api_key @property def anonymous(self): """ Property, returning a copy of this client, without an api_key set """ return CkanLowlevelClient(self.base_url)
[docs] def request(self, method, path, **kwargs): """ Wrapper around :py:func:`requests.request`. Extra functionality provided: - Add ``Authorization`` header to requests - If data is an object, serialize it with json and add the ``Content-type: application/json`` header. - If the response didn't contain an "ok" code, raises a :py:exc:`HTTPError` exception. :param method: HTTP method to be used :param path: Path, relative to the Ckan root. For example: ``/api/3/action/package_list`` :param headers: HTTP headers to be added to the request :param data: Data to be sent in the request body :param kwargs: Extra keyword arguments will be passed directly to the ``requests.request()`` call. :raises ckan_api_client.exceptions.HTTPError: in case the HTTP request returned a non-ok status code :return: a requests response object """ headers = kwargs.get('headers') or {} kwargs['headers'] = headers # Update headers for authorization if self.api_key is not None: headers['Authorization'] = self.api_key # Serialize data to json, if not already if 'data' in kwargs: if not isinstance(kwargs['data'], basestring): kwargs['data'] = json.dumps(kwargs['data']) headers['content-type'] = 'application/json' if isinstance(path, (list, tuple)): path = '/'.join(path) url = urlparse.urljoin(self.base_url, path) response = requests.request(method, url, **kwargs) if not response.ok: # ------------------------------------------------------------ # todo: attach message, if any available.. # ------------------------------------------------------------ # todo: we should find a way to figure out how to attach # original text message to the exception # as it might be: json string, part of json object, # part of html document # ------------------------------------------------------------ raise HTTPError( status_code=response.status_code, message="Error while performing request", original=self._figure_out_error_message(response)) return response
def _figure_out_error_message(self, response): """ We have a response, which probably contains an error message, but we need to figure that out.. Usual places for errors are: - a json message, with {'error': {'message': ..., '__type': ...}} - a html page, usually when things went seriously bad """ with SuppressExceptionIf(True): return self._figure_out_error_from_json(response.json()) def _figure_out_error_from_json(self, data): if isinstance(data, dict) and 'error' in data: with SuppressExceptionIf(True): return '{0}: {1}'.format(data['error']['__type'], data['error']['message']) with SuppressExceptionIf(True): return '{0}'.format(data['error']['message']) raise ValueError("Unable to find message in JSON data") # ============================================================ # Validation helpers # ============================================================ def _validate_response_idlist(self, response, name='object'): if not isinstance(response, list): raise BadApiError( "Bad {0} id list returned from the api (not a list)" .format(name)) if not all(isinstance(x, basestring) for x in response): raise BadApiError( "Bad {0} id list returned from the api (an element " "is not a string)".format(name)) def _validate_response_dict(self, response, name='object'): if not isinstance(response, dict): raise BadApiError("Bad {0} returned from the api (not a dict)" .format(name)) def _validate_response_list_of_dict(self, response, name='object'): if not isinstance(response, list): raise BadApiError("Bad {0} list returned from the api (not a list)" .format(name)) if not all(isinstance(x, dict) for x in response): raise BadApiError( "Bad {0} list returned from the api (an element " "is not a dict)".format(name)) # ============================================================ # Datasets # ============================================================
[docs] def list_datasets(self): """Return a list of all dataset ids""" path = '/api/2/rest/dataset' response = self.request('GET', path) data = response.json() self._validate_response_idlist(data) return data
[docs] def iter_datasets(self): """ Generator yielding dataset objects, iterating over the whole database. """ for ds_id in self.list_datasets(): yield self.get_dataset(ds_id)
[docs] def get_dataset(self, dataset_id): """ Get a dataset, using API v2 :param dataset_id: ID of the requested dataset :return: a dict containing the data as returned from the API :rtype: dict """ path = '/api/2/rest/dataset/{0}'.format(dataset_id) response = self.request('GET', path) data = response.json() self._validate_response_dict(data) return data
[docs] def post_dataset(self, dataset): """ POST a dataset, using API v2 (usually for creation) :param dict dataset: a dict containing data to be sent to Ckan. Should not already contain an id :return: a dict containing the data as returned from the API :rtype: dict """ path = '/api/2/rest/dataset' response = self.request('POST', path, data=dataset) data = response.json() self._validate_response_dict(data) return data
[docs] def put_dataset(self, dataset): """ PUT a dataset, using API v2 (usually for update) :param dict dataset: a dict containing data to be sent to Ckan. Must contain an id, that will be used to build the URL :return: a dict containing the updated dataset as returned from the API :rtype: dict """ path = '/api/2/rest/dataset/{0}'.format(dataset['id']) response = self.request('PUT', path, data=dataset) data = response.json() self._validate_response_dict(data) return data
[docs] def delete_dataset(self, dataset_id, ignore_404=True): """ DELETE a dataset, using API v2 :param dataset_id: if of the dataset to be deleted :param bool ignore_404: if ``True`` (the default), will simply ignore http 404 errors from the API """ ign404 = SuppressExceptionIf( lambda e: ignore_404 and (isinstance(e, HTTPError) and e.status_code == 404)) path = '/api/2/rest/dataset/{0}'.format(dataset_id) with ign404: self.request('DELETE', path, data={'id': dataset_id})
# ============================================================ # Groups # ============================================================ # =====[!!]=========== IMPORTANT NOTE ===============[!!]===== # BEWARE! API v2 only considers actual groups, organizations # are not handled / returned by this one! # ============================================================
[docs] def list_groups(self): path = '/api/2/rest/group' response = self.request('GET', path) data = response.json() self._validate_response_idlist(data) return data
[docs] def iter_groups(self): all_groups = self.list_groups() for group_id in all_groups: yield self.get_group(group_id)
[docs] def get_group(self, group_id): path = '/api/2/rest/group/{0}'.format(group_id) response = self.request('GET', path) data = response.json() self._validate_response_dict(data) return data
[docs] def post_group(self, group): path = '/api/2/rest/group' response = self.request('POST', path, data=group) data = response.json() self._validate_response_dict(data) return data
[docs] def put_group(self, group): path = '/api/2/rest/group/{0}'.format(group['id']) response = self.request('PUT', path, data=group) data = response.json() self._validate_response_dict(data) return data
[docs] def delete_group(self, group_id, ignore_404=True): ign404 = SuppressExceptionIf( lambda e: ignore_404 and (isinstance(e, HTTPError) and e.status_code == 404)) path = '/api/2/rest/group/{0}'.format(group_id) with ign404: self.request('DELETE', path) path = '/api/3/action/group_purge' with ign404: self.request('POST', path, data={'id': group_id})
# ============================================================ # Organizations # ============================================================ # --- [!!] NOTE --------------------------------------------- # We need to fallback to api v3 here, as v2 doesn't support # doing things with organizations.. # ------------------------------------------------------------
[docs] def list_organizations(self): path = '/api/3/action/organization_list' response = self.request('GET', path) data = response.json()['result'] self._validate_response_idlist(data) return data
[docs] def iter_organizations(self): for org_id in self.list_organizations(): yield self.get_organization(org_id)
[docs] def get_organization(self, id): path = '/api/3/action/organization_show?id={0}'.format(id) response = self.request('GET', path) data = response.json()['result'] self._validate_response_dict(data) # API v3 returns the whole objects here, but we just # want the ids.. if 'groups' in data: data['groups'] = [g['id'] for g in data['groups']] return data
[docs] def post_organization(self, organization): path = '/api/3/action/organization_create' response = self.request('POST', path, data=organization) data = response.json()['result'] self._validate_response_dict(data) return data
[docs] def put_organization(self, organization): """Warning! with api v3 we need to use POST!""" path = '/api/3/action/organization_update' response = self.request('POST', path, data=organization) data = response.json()['result'] self._validate_response_dict(data) return data
[docs] def delete_organization(self, id, ignore_404=True): ign404 = SuppressExceptionIf( lambda e: ignore_404 and (isinstance(e, HTTPError) and e.status_code == 404)) path = '/api/3/action/organization_delete' with ign404: self.request('PUT', path, data={'id': id}) path = '/api/3/action/organization_purge' with ign404: self.request('POST', path, data={'id': id})
# ============================================================ # Licenses # ============================================================
[docs] def list_licenses(self): path = '/api/2/rest/licenses' response = self.request('GET', path) data = response.json() self._validate_response_list_of_dict(data) return data