Source code for pyPreservica.workflowAPI

"""
pyPreservica WorkflowAPI module definition

A client library for the Preservica Repository web services Workflow API
https://us.preservica.com/sdb/rest/workflow/documentation.html

author:     James Carr
licence:    Apache License 2.0

"""

import uuid
import datetime
from xml.etree import ElementTree

from pyPreservica.common import *

logger = logging.getLogger(__name__)


[docs] class WorkflowInstance: """ Defines a workflow Instance. The workflow Instance is context which has been executed """ def __init__(self, instance_id: int): self.instance_id = instance_id self.started = None self.finished = None self.state = None self.display_state = None self.archival_process_id = None self.workflow_group_id = None self.workflow_context_id = None self.workflow_context_name = None self.workflow_definition_id = None self.xml_response = None def __str__(self): return f"Workflow Instance ID: {self.instance_id}" def __repr__(self): return self.__str__()
[docs] class WorkflowContext: """ Defines a workflow context. The workflow context is the pre-defined workflow which is ready to run """ def __init__(self, workflow_id, workflow_name: str): self.workflow_id = workflow_id self.workflow_name = workflow_name def __str__(self): return f""" Workflow ID: {self.workflow_id} Workflow Name: {self.workflow_name} """ def __repr__(self): return self.__str__()
[docs] class WorkflowAPI(AuthenticatedAPI): """ A class for calling the Preservica Workflow API This API can be used to programmatically manage the Preservica Workflows. https://preview.preservica.com/sdb/rest/workflow/documentation.html """ workflow_states = ['Aborted', 'Active', 'Completed', 'Finished_Mixed_Outcome', 'Pending', 'Suspended', 'Unknown', 'Failed'] workflow_types = ['Ingest', 'Access', 'Transformation', 'DataManagement'] def __init__(self, username: str = None, password: str = None, tenant: str = None, server: str = None, use_shared_secret: bool = False, two_fa_secret_key: str = None, protocol: str = "https"): super().__init__(username, password, tenant, server, use_shared_secret, two_fa_secret_key, protocol) self.base_url = "sdb/rest/workflow"
[docs] def get_workflow_contexts_by_type(self, workflow_type: str): """ Return a list of Workflow Contexts which have the same Workflow type :param workflow_type: The Workflow type Ingest, Access, Transformation or DataManagement :type workflow_type: str :return: List of Workflow Contexts :rtype: list """ headers = {HEADER_TOKEN: self.token} params = {"type": workflow_type} workflow_contexts = [] request = self.session.get(f'{self.protocol}://{self.server}/{self.base_url}/contexts', headers=headers, params=params) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) contexts = entity_response.findall(f".//{{{NS_WORKFLOW}}}WorkflowContext") for context in contexts: wrkfl_id = context.find(f".//{{{NS_WORKFLOW}}}Id").text name = context.find(f".//{{{NS_WORKFLOW}}}Name").text workflow_context = WorkflowContext(wrkfl_id, name) workflow_contexts.append(workflow_context) return workflow_contexts elif request.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.get_workflow_contexts(workflow_type) else: logger.error(request.content) raise RuntimeError(request.status_code, "get_workflow_contexts_by_type")
[docs] def get_workflow_contexts(self, definition: str): """ Return a list of Workflow Contexts which have the same Workflow Definition :param definition: The Workflow Definition ID :type definition: str :return: List of Workflow Contexts :rtype: list """ headers = {HEADER_TOKEN: self.token} params = {"workflowDefinitionId": definition} workflow_contexts = [] request = self.session.get(f'{self.protocol}://{self.server}/{self.base_url}/contexts', headers=headers, params=params) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) contexts = entity_response.findall(f".//{{{NS_WORKFLOW}}}WorkflowContext") for context in contexts: wrkfl_id = context.find(f".//{{{NS_WORKFLOW}}}Id").text name = context.find(f".//{{{NS_WORKFLOW}}}Name").text workflow_context = WorkflowContext(wrkfl_id, name) workflow_contexts.append(workflow_context) return workflow_contexts elif request.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.get_workflow_contexts(definition) else: logger.error(request.content) raise RuntimeError(request.status_code, "get_workflow_contexts")
[docs] def start_workflow_instance(self, workflow_context: WorkflowContext, **kwargs): """ Start a workflow context Returns a Correlation Id which is used to monitor the workflow progress :param workflow_context: The workflow context to start :type workflow_context: WorkflowContext :param kwargs: Key/Values to pass to the workflow instance :return: correlation_id :rtype: str """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} correlation_id = str(uuid.uuid4()) request_payload = xml.etree.ElementTree.Element('StartWorkflowRequest ', {"xmlns": "http://workflow.preservica.com"}) xml.etree.ElementTree.SubElement(request_payload, "WorkflowContextId").text = workflow_context.workflow_id xml.etree.ElementTree.SubElement(request_payload, "WorkflowContextName").text = workflow_context.workflow_name for key, value in kwargs.items(): parameter = xml.etree.ElementTree.SubElement(request_payload, "Parameter") xml.etree.ElementTree.SubElement(parameter, "Key").text = key xml.etree.ElementTree.SubElement(parameter, "Value").text = value xml.etree.ElementTree.SubElement(request_payload, "CorrelationId").text = correlation_id xml_request = xml.etree.ElementTree.tostring(request_payload, encoding='utf-8') request = self.session.post(f'{self.protocol}://{self.server}/{self.base_url}/instances', headers=headers, data=xml_request) if request.status_code == requests.codes.created: return correlation_id if request.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.start_workflow_instance(workflow_context, **kwargs) else: logger.error(request.content) raise RuntimeError(request.status_code, "start_workflow_instance failed")
[docs] def terminate_workflow_instance(self, instance_ids): """ Terminate a workflow by its instance id :param instance_ids: The Workflow instance :type instance_ids: int or a list of int """ if isinstance(instance_ids, list): converted_list = [str(int(e)) for e in instance_ids] param_string = ",".join(converted_list) else: param_string = str(int(instance_ids)) headers = {HEADER_TOKEN: self.token} params = {"workflowInstanceIds": param_string} request = self.session.post(f'{self.protocol}://{self.server}/{self.base_url}/instances/terminate', headers=headers, params=params) if request.status_code == requests.codes.accepted: return elif request.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.terminate_workflow_instance(instance_ids) else: logger.error(request.content) raise RuntimeError(request.status_code, "terminate_workflow_instance")
[docs] def workflow_instance(self, instance_id: int) -> WorkflowInstance: """ Return a workflow instance by its Id :param instance_id: The Workflow instance :type instance_id: int :return: workflow_instance :rtype: WorkflowInstance """ headers = {HEADER_TOKEN: self.token} params = {"includeErrors": "true"} request = self.session.get(f'{self.protocol}://{self.server}/{self.base_url}/instances/{str(instance_id)}', headers=headers, params=params) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) w_id = int(entity_response.find(f".//{{{NS_WORKFLOW}}}Id").text) assert instance_id == w_id workflow_instance = WorkflowInstance(int(instance_id)) started_element = entity_response.find(f".//{{{NS_WORKFLOW}}}Started") if started_element: if hasattr(started_element, "text"): workflow_instance.started = datetime.datetime.strptime(started_element.text, '%Y-%m-%dT%H:%M:%S.%fZ') finished_element = entity_response.find(f".//{{{NS_WORKFLOW}}}Finished") if finished_element: if hasattr(finished_element, "text"): workflow_instance.finished = datetime.datetime.strptime(finished_element.text, '%Y-%m-%dT%H:%M:%S.%fZ') workflow_instance.state = entity_response.find(f".//{{{NS_WORKFLOW}}}State").text workflow_instance.display_state = entity_response.find(f".//{{{NS_WORKFLOW}}}DisplayState").text workflow_instance.archival_process_id = entity_response.find(f".//{{{NS_WORKFLOW}}}ArchivalProcessId").text workflow_instance.workflow_group_id = entity_response.find(f".//{{{NS_WORKFLOW}}}WorkflowGroupId").text workflow_instance.workflow_context_id = entity_response.find(f".//{{{NS_WORKFLOW}}}WorkflowContextId").text workflow_instance.workflow_context_name = entity_response.find( f".//{{{NS_WORKFLOW}}}WorkflowContextName").text workflow_instance.workflow_definition_id = entity_response.find( f".//{{{NS_WORKFLOW}}}WorkflowDefinitionTextId").text workflow_instance.xml_response = xml_response return workflow_instance elif request.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.workflow_instance(instance_id) else: logger.error(request.content) raise RuntimeError(request.status_code, "workflow_instance")
[docs] def workflow_instances(self, workflow_state: str, workflow_type: str, **kwargs): """ Return a list of Workflow instances :param workflow_state: The Workflow state Aborted, Active, Completed, Finished_Mixed_Outcome, Pending, Suspended, Unknown, or Failed :param workflow_type: The Workflow type Ingest, Access, Transformation or DataManagement """ start_value = int(0) maximum = int(25) total_count = maximum while total_count > start_value: result = self.__workflow_instances__(workflow_state, workflow_type, maximum=maximum, start_value=start_value, **kwargs) workflow_instances_list = result[2] total_count = result[0] start_value = start_value + result[1] for w in workflow_instances_list: yield w
def __workflow_instances__(self, workflow_state: str, workflow_type: str, maximum: int = 25, start_value: int = 0, **kwargs): """ Return a list of Workflow instances :param workflow_state: The Workflow state: Aborted, Active, Completed, Finished_Mixed_Outcome, Pending, Suspended, Unknown, or Failed :param workflow_type: The Workflow type: Ingest, Access, Transformation or DataManagement """ headers = {HEADER_TOKEN: self.token} if workflow_state not in self.workflow_states: logger.error("Invalid Workflow State") raise RuntimeError("Invalid Workflow State") if workflow_type not in self.workflow_types: logger.error("Invalid Workflow Type") raise RuntimeError("Invalid Workflow Type") params = {"type": workflow_type, "state": workflow_state} if "contextId" in kwargs: context_id = kwargs.get("contextId") params["contextId"] = context_id if "creator" in kwargs: creator = kwargs.get("creator") params["creator"] = creator if "from" in kwargs: from_date = kwargs.get("from") params["from"] = from_date if "to" in kwargs: to_date = kwargs.get("to") params["to"] = to_date params["start"] = int(start_value) params["max"] = int(maximum) request = self.session.get(f'{self.protocol}://{self.server}/{self.base_url}/instances', headers=headers, params=params) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) total_count = int(entity_response.find(f".//{{{NS_WORKFLOW}}}TotalCount").text) count = int(entity_response.find(f".//{{{NS_WORKFLOW}}}Count").text) workflow_instance = entity_response.findall(f".//{{{NS_WORKFLOW}}}WorkflowInstance") workflow_instances = [] for instance in workflow_instance: instance_id = instance.find(f".//{{{NS_WORKFLOW}}}Id").text workflow_instance = WorkflowInstance(int(instance_id)) started_element = instance.find(f".//{{{NS_WORKFLOW}}}Started") if started_element: if hasattr(started_element, "text"): workflow_instance.started = datetime.datetime.strptime(started_element.text, '%Y-%m-%dT%H:%M:%S.%fZ') finished_element = instance.find(f".//{{{NS_WORKFLOW}}}Finished") if finished_element: if hasattr(finished_element, "text"): workflow_instance.finished = datetime.datetime.strptime(finished_element.text, '%Y-%m-%dT%H:%M:%S.%fZ') workflow_instance.state = instance.find(f".//{{{NS_WORKFLOW}}}State").text workflow_instance.display_state = instance.find(f".//{{{NS_WORKFLOW}}}DisplayState").text workflow_instance.archival_process_id = instance.find(f".//{{{NS_WORKFLOW}}}ArchivalProcessId").text workflow_instance.workflow_group_id = instance.find(f".//{{{NS_WORKFLOW}}}WorkflowGroupId").text workflow_instance.workflow_context_id = instance.find(f".//{{{NS_WORKFLOW}}}WorkflowContextId").text workflow_instance.workflow_context_name = instance.find(f".//{{{NS_WORKFLOW}}}WorkflowContextName").text workflow_instance.workflow_definition_id = instance.find( f".//{{{NS_WORKFLOW}}}WorkflowDefinitionTextId").text workflow_instances.append(workflow_instance) return tuple((total_count, count, workflow_instances)) elif request.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.__workflow_instances__(workflow_state, workflow_type, maximum, start_value, **kwargs) else: logger.error(request.content) raise RuntimeError(request.status_code, "workflow_instances")