Source code for pyPreservica.webHooksAPI

"""
pyPreservica WebHooksAPI module definition

A client library for the Preservica Repository web services Webhook API
https://us.preservica.com/api/webhook/documentation.html

author:     James Carr
licence:    Apache License 2.0

"""
import json
from http.server import BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import hmac
from pyPreservica.common import *

logger = logging.getLogger(__name__)

BASE_ENDPOINT = '/api/webhook'


class WebHookHandler(BaseHTTPRequestHandler):
    """
    A sample web hook web server which provides handshake verification
    The shared secret key is passed in via the HTTPServer

    Extend the class and implement do_WORK() method
    The JSON document is passed into do_WORK()

    """

    def hmac(self, key, message):
        return hmac.new(key=bytes(key, 'latin-1'), msg=bytes(message, 'latin-1'), digestmod=hashlib.sha256).hexdigest()

    def do_POST(self):
        result = urlparse(self.path)
        q = parse_qs(result.query)
        if 'challengeCode' in q:
            code = q['challengeCode'][0]
            signature = self.hmac(self.server.secret_key, code)
            response = f'{{ "challengeCode": "{code}",     "challengeResponse": "{signature}" }}'
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()
            self.wfile.write(bytes(response.encode('utf-8')))
            self.log_message(f"Handshake Completed. {response.encode('utf-8')}")
        else:
            verif_sig = self.headers.get("Preservica-Signature", None)
            if "chunked" in self.headers.get("Transfer-Encoding", "") and (verif_sig is not None):
                payload = ""
                while True:
                    line = self.rfile.readline().strip()
                    chunk_length = int(line, 16)
                    if chunk_length != 0:
                        chunk = self.rfile.read(chunk_length)
                        payload = payload + chunk.decode("utf-8")
                    self.rfile.readline()
                    if chunk_length == 0:
                        verify_body = f"preservica-webhook-auth{payload}"
                        signature = self.hmac(self.server.secret_key, verify_body)
                        if signature == verif_sig:
                            self.log_message("Signature Verified. Doing Work...")
                            self.log_message(payload)
                            self.send_response(200)
                            self.end_headers()
                            self.do_WORK(json.loads(payload))
                        break


[docs] class TriggerType(Enum): """ Enumeration of the Web hooks Trigger Types """ MOVED = "MOVED" INDEXED = "FULL_TEXT_INDEXED" SECURITY_CHANGED = "CHANGED_SECURITY_DESCRIPTOR"
[docs] class WebHooksAPI(AuthenticatedAPI): """ Class to register new webhook endpoints """
[docs] def subscriptions(self): """ Return all the current active web hook subscriptions as a json document :return: list of web hooks """ self._check_if_user_has_manager_role() headers = {HEADER_TOKEN: self.token} response = self.session.get(f'{self.protocol}://{self.server}{BASE_ENDPOINT}/subscriptions', headers=headers) if response.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.subscriptions() if response.status_code == requests.codes.ok: json_response = str(response.content.decode('utf-8')) doc = json.loads(json_response) return doc else: exception = HTTPException("", response.status_code, response.url, "subscriptions", response.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def unsubscribe_all(self): """ Unsubscribe from all webhooks. :return: """ self._check_if_user_has_manager_role() subscriptions = self.subscriptions() for sub in subscriptions: self.unsubscribe(sub['id'])
[docs] def unsubscribe(self, subscription_id: str): """ Unsubscribe from the provided webhook. :param subscription_id: :return: """ self._check_if_user_has_manager_role() headers = {HEADER_TOKEN: self.token} response = self.session.delete( f'{self.protocol}://{self.server}{BASE_ENDPOINT}/subscriptions/{subscription_id}', headers=headers) if response.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.unsubscribe(subscription_id) if response.status_code == requests.codes.no_content: json_response = str(response.content.decode('utf-8')) logger.debug(json_response) return json_response else: exception = HTTPException(str(subscription_id), response.status_code, response.url, "unsubscribe", response.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def subscribe(self, url: str, triggerType: TriggerType, secret: str): """ Subscribe to a new web hook :param url: :param triggerType: :param secret: :return: json_response """ self._check_if_user_has_manager_role() headers = {HEADER_TOKEN: self.token, 'Accept': 'application/json', 'Content-Type': 'application/json'} json_payload = f'{{"url": "{url}", "triggerType": "{triggerType.value}", "secret": "{secret}", ' \ f'"includeIdentifiers": "true"}}' response = self.session.post(f'{self.protocol}://{self.server}{BASE_ENDPOINT}/subscriptions', headers=headers, data=json.dumps(json.loads(json_payload))) if response.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.subscribe(url, triggerType, secret) if response.status_code == requests.codes.ok: json_response = str(response.content.decode('utf-8')) logger.debug(json_response) return json_response else: exception = HTTPException(str(url), response.status_code, response.url, "subscribe", response.content.decode('utf-8')) logger.error(response.content.decode('utf-8')) raise exception