Source code for pyPreservica.workflowAPI

"""
pyPreservica WorkflowAPI module definition

A client library for the Preservica Repository web services Workflow API
https://us.preservica.com/sdb/rest/workflow/documentation.html

author:     James Carr
licence:    Apache License 2.0

"""

import uuid
import datetime
from typing import Callable, Generator
from xml.etree import ElementTree

from pyPreservica.common import *

logger = logging.getLogger(__name__)


[docs] class WorkflowInstance: """ Defines a workflow Instance. The workflow Instance is a context which has been executed """ def __init__(self, instance_id: int): self.instance_id = instance_id self.started = None self.finished = None self.state = None self.display_state = None self.archival_process_id = None self.workflow_group_id = None self.workflow_context_id = None self.workflow_context_name = None self.workflow_definition_id = None self.xml_response = None def __str__(self): return f"Workflow Instance ID: {self.instance_id}" def __repr__(self): return self.__str__()
[docs] class WorkflowContext: """ Defines a workflow context. The workflow context is the pre-defined workflow which is ready to run """ def __init__(self, workflow_id: str, workflow_name: str): self.workflow_id = workflow_id self.workflow_name = workflow_name def __str__(self): return f""" Workflow ID: {self.workflow_id} Workflow Name: {self.workflow_name} """ def __repr__(self): return self.__str__()
class Process: """ Defines an Ingest Process. """ def __init__(self, process_id: str, process_name: str): self.process_id = process_id self.name = process_name self.description = "" self.type = None self.active: bool = False self.trigger_type = None def __str__(self): return f""" Process ID: {self.process_id} Process Name: {self.name} Process description: {self.description} Process type: {self.type} Process active: {self.active} Process trigger type: {self.trigger_type} """ def __repr__(self): return self.__str__() class ProcessAPI(AuthenticatedAPI): """ API for starting processes, and retrieving and updating configuration for processes. https://demo.preservica.com/api/process/documentation.html """ def __init__(self, username: str|None = None, password: str|None = None, tenant: str|None = None, server: str|None = None, use_shared_secret: bool = False, two_fa_secret_key: str|None = None, protocol: str = "https", request_hook: Callable|None = None, credentials_path: str = 'credentials.properties'): super().__init__(username, password, tenant, server, use_shared_secret, two_fa_secret_key, protocol, request_hook, credentials_path) self.base_url = "api/process" def __set_status__(self, process_id: str, state: str) -> bool: headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/json'} request = self.session.put( f'{self.protocol}://{self.server}/{self.base_url}/ingest/configs/{process_id}/active', headers=headers, data=str(state)) if request.status_code == requests.codes.ok: config = json.loads(request.content.decode("utf-8")) return bool(config['active']) else: logger.error(request) raise RuntimeError(request.status_code, "deactivate_process") def deactivate_process(self, process_id: str) -> bool: """ Deactivate an ingest process. :param process_id: The id of the process :type process_id: str :return: :rtype: """ return self.__set_status__(process_id, 'false') def reactivate_process(self, process_id: str) -> bool: """ Reactivate an ingest process. :param process_id: The id of the process :type process_id: str :return: :rtype: """ return self.__set_status__(process_id, 'true') def ingest_process(self, ingest_types=None) -> list: """ Return all the ingest processes. :return: list of Processes :rtype: list """ headers = {HEADER_TOKEN: self.token} params = {} if ingest_types is not None: params['types'] = ingest_types with self.session.get(f'{self.protocol}://{self.server}/{self.base_url}/ingest/configs', headers=headers, params=params) as request: if request.status_code == requests.codes.ok: results = [] json_dict = json.loads(request.content.decode("utf-8")) for entry in json_dict['configs']: p = Process(entry['apiId'], entry['name']) p.description = entry['description'] p.type = entry['type'] p.active = bool(entry['active']) p.trigger_type = entry['trigger']['type'] results.append(p) return results return []
[docs] class WorkflowAPI(AuthenticatedAPI): """ A class for calling the Preservica Workflow API This API can be used to programmatically manage the Preservica Workflows. https://demo.preservica.com/sdb/rest/workflow/documentation.html """ workflow_states = ['Aborted', 'Active', 'Completed', 'Finished_Mixed_Outcome', 'Pending', 'Suspended', 'Unknown', 'Failed'] workflow_types = ['Ingest', 'Access', 'Transformation', 'DataManagement'] def __init__(self, username: str|None = None, password: str|None = None, tenant: str|None = None, server: str|None = None, use_shared_secret: bool = False, two_fa_secret_key: str|None = None, protocol: str = "https", request_hook: Callable|None = None, credentials_path: str = 'credentials.properties'): super().__init__(username, password, tenant, server, use_shared_secret, two_fa_secret_key, protocol, request_hook, credentials_path) self.base_url = "sdb/rest/workflow"
[docs] def get_workflow_contexts_by_type(self, workflow_type: str) -> list[WorkflowContext]: """ Return a list of Workflow Contexts which have the same Workflow type :param workflow_type: The Workflow type Ingest, Access, Transformation or DataManagement :type workflow_type: str :return: List of Workflow Contexts :rtype: list """ headers = {HEADER_TOKEN: self.token} params = {"type": workflow_type} workflow_contexts: list[WorkflowContext] = [] request = self.session.get(f'{self.protocol}://{self.server}/{self.base_url}/contexts', headers=headers, params=params) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) contexts = entity_response.findall(f".//{{{NS_WORKFLOW}}}WorkflowContext") for context in contexts: workflow_element_id = context.find(f".//{{{NS_WORKFLOW}}}Id") workflow_id = workflow_element_id.text if workflow_element_id is not None else None workflow_element_name = context.find(f".//{{{NS_WORKFLOW}}}Name") name = workflow_element_name.text if workflow_element_name is not None else None if workflow_id is not None and name is not None: workflow_context = WorkflowContext(workflow_id, name) workflow_contexts.append(workflow_context) return workflow_contexts else: logger.error(request.content) raise RuntimeError(request.status_code, "get_workflow_contexts_by_type")
[docs] def get_workflow_contexts(self, definition: str) -> list[WorkflowContext]: """ Return a list of Workflow Contexts which have the same Workflow Definition :param definition: The Workflow Definition ID :type definition: str :return: List of Workflow Contexts :rtype: list """ headers = {HEADER_TOKEN: self.token} params = {"workflowDefinitionId": definition} workflow_contexts: list[WorkflowContext] = [] request = self.session.get(f'{self.protocol}://{self.server}/{self.base_url}/contexts', headers=headers, params=params) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) contexts = entity_response.findall(f".//{{{NS_WORKFLOW}}}WorkflowContext") for context in contexts: wrkfl_id = context.find(f".//{{{NS_WORKFLOW}}}Id").text name = context.find(f".//{{{NS_WORKFLOW}}}Name").text workflow_context = WorkflowContext(wrkfl_id, name) workflow_contexts.append(workflow_context) return workflow_contexts else: logger.error(request.content) raise RuntimeError(request.status_code, "get_workflow_contexts")
[docs] def start_workflow_instance(self, workflow_context: WorkflowContext, **kwargs): """ Start a workflow context Returns a Correlation Id which is used to monitor the workflow progress :param workflow_context: The workflow context to start :type workflow_context: WorkflowContext :param kwargs: Key/Values to pass to the workflow instance :return: correlation_id :rtype: str """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} correlation_id = str(uuid.uuid4()) request_payload = xml.etree.ElementTree.Element('StartWorkflowRequest ', {"xmlns": "http://workflow.preservica.com"}) xml.etree.ElementTree.SubElement(request_payload, "WorkflowContextId").text = workflow_context.workflow_id xml.etree.ElementTree.SubElement(request_payload, "WorkflowContextName").text = workflow_context.workflow_name for key, value in kwargs.items(): parameter = xml.etree.ElementTree.SubElement(request_payload, "Parameter") xml.etree.ElementTree.SubElement(parameter, "Key").text = key xml.etree.ElementTree.SubElement(parameter, "Value").text = value xml.etree.ElementTree.SubElement(request_payload, "CorrelationId").text = correlation_id xml_request = xml.etree.ElementTree.tostring(request_payload, encoding='utf-8') request = self.session.post(f'{self.protocol}://{self.server}/{self.base_url}/instances', headers=headers, data=xml_request) if request.status_code == requests.codes.created: return correlation_id else: logger.error(request.content) raise RuntimeError(request.status_code, "start_workflow_instance failed")
[docs] def terminate_workflow_instance(self, instance_ids): """ Terminate a workflow by its instance id :param instance_ids: The Workflow instance :type instance_ids: int or a list of int """ if isinstance(instance_ids, list): converted_list = [str(int(e)) for e in instance_ids] param_string = ",".join(converted_list) else: param_string = str(int(instance_ids)) headers = {HEADER_TOKEN: self.token} params = {"workflowInstanceIds": param_string} request = self.session.post(f'{self.protocol}://{self.server}/{self.base_url}/instances/terminate', headers=headers, params=params) if request.status_code == requests.codes.accepted: return else: logger.error(request.content) raise RuntimeError(request.status_code, "terminate_workflow_instance")
[docs] def workflow_instance(self, instance_id: int) -> WorkflowInstance: """ Return a workflow instance by its Id :param instance_id: The Workflow instance :type instance_id: int :return: workflow_instance :rtype: WorkflowInstance """ headers = {HEADER_TOKEN: self.token} params = {"includeErrors": "true"} request = self.session.get(f'{self.protocol}://{self.server}/{self.base_url}/instances/{str(instance_id)}', headers=headers, params=params) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) w_id = int(entity_response.find(f".//{{{NS_WORKFLOW}}}Id").text) assert instance_id == w_id workflow_instance = WorkflowInstance(int(instance_id)) started_element = entity_response.find(f".//{{{NS_WORKFLOW}}}Started") if started_element is not None: if hasattr(started_element, "text"): workflow_instance.started = datetime.datetime.strptime(started_element.text, '%Y-%m-%dT%H:%M:%S.%fZ') finished_element = entity_response.find(f".//{{{NS_WORKFLOW}}}Finished") if finished_element is not None: if hasattr(finished_element, "text"): workflow_instance.finished = datetime.datetime.strptime(finished_element.text, '%Y-%m-%dT%H:%M:%S.%fZ') workflow_instance.state = entity_response.find(f".//{{{NS_WORKFLOW}}}State").text workflow_instance.display_state = entity_response.find(f".//{{{NS_WORKFLOW}}}DisplayState").text workflow_instance.archival_process_id = entity_response.find(f".//{{{NS_WORKFLOW}}}ArchivalProcessId").text workflow_instance.workflow_group_id = entity_response.find(f".//{{{NS_WORKFLOW}}}WorkflowGroupId").text workflow_instance.workflow_context_id = entity_response.find(f".//{{{NS_WORKFLOW}}}WorkflowContextId").text workflow_instance.workflow_context_name = entity_response.find( f".//{{{NS_WORKFLOW}}}WorkflowContextName").text workflow_instance.workflow_definition_id = entity_response.find( f".//{{{NS_WORKFLOW}}}WorkflowDefinitionTextId").text workflow_instance.xml_response = xml_response return workflow_instance else: logger.error(request.content) raise RuntimeError(request.status_code, "workflow_instance")
[docs] def workflow_instances(self, workflow_state: str, workflow_type: str, **kwargs) -> Generator[WorkflowInstance, None, None]: """ Return a list of Workflow instances :param workflow_state: The Workflow state Aborted, Active, Completed, Finished_Mixed_Outcome, Pending, Suspended, Unknown, or Failed :param workflow_type: The Workflow type Ingest, Access, Transformation or DataManagement """ start_value = int(0) maximum = int(100) total_count = maximum while total_count > start_value: result = self.__workflow_instances__(workflow_state, workflow_type, maximum=maximum, start_value=start_value, **kwargs) workflow_instances_list = result[2] total_count = result[0] start_value = start_value + result[1] for w in workflow_instances_list: yield w
def __workflow_instances__(self, workflow_state: str, workflow_type: str, maximum: int = 25, start_value: int = 0, **kwargs): """ Return a list of Workflow instances :param workflow_state: The Workflow state: Aborted, Active, Completed, Finished_Mixed_Outcome, Pending, Suspended, Unknown, or Failed :param workflow_type: The Workflow type: Ingest, Access, Transformation or DataManagement """ headers = {HEADER_TOKEN: self.token} if workflow_state not in self.workflow_states: logger.error("Invalid Workflow State") raise RuntimeError("Invalid Workflow State") if workflow_type not in self.workflow_types: logger.error("Invalid Workflow Type") raise RuntimeError("Invalid Workflow Type") params = {"type": workflow_type, "state": workflow_state} if "contextId" in kwargs: context_id = kwargs.get("contextId") params["contextId"] = context_id if "creator" in kwargs: creator = kwargs.get("creator") params["creator"] = creator if "from_date" in kwargs: from_date = kwargs.get("from_date") params["from"] = parse_date_to_iso(from_date) if "to_date" in kwargs: to_date = kwargs.get("to_date") params["to"] = parse_date_to_iso(to_date) params["start"] = int(start_value) params["max"] = int(maximum) request = self.session.get(f'{self.protocol}://{self.server}/{self.base_url}/instances', headers=headers, params=params) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) total_count = int(entity_response.find(f".//{{{NS_WORKFLOW}}}TotalCount").text) count = int(entity_response.find(f".//{{{NS_WORKFLOW}}}Count").text) workflow_instance = entity_response.findall(f".//{{{NS_WORKFLOW}}}WorkflowInstance") workflow_instances = [] for instance in workflow_instance: instance_id = instance.find(f".//{{{NS_WORKFLOW}}}Id").text workflow_instance = WorkflowInstance(int(instance_id)) started_element = instance.find(f".//{{{NS_WORKFLOW}}}Started") if started_element is not None: if hasattr(started_element, "text"): workflow_instance.started = time.strptime(started_element.text, '%Y-%m-%dT%H:%M:%S.%fZ') finished_element = instance.find(f".//{{{NS_WORKFLOW}}}Finished") if finished_element is not None: if hasattr(finished_element, "text"): workflow_instance.finished = time.strptime(finished_element.text, '%Y-%m-%dT%H:%M:%S.%fZ') workflow_instance.state = instance.find(f".//{{{NS_WORKFLOW}}}State").text workflow_instance.display_state = instance.find(f".//{{{NS_WORKFLOW}}}DisplayState").text workflow_instance.archival_process_id = instance.find(f".//{{{NS_WORKFLOW}}}ArchivalProcessId").text workflow_instance.workflow_group_id = instance.find(f".//{{{NS_WORKFLOW}}}WorkflowGroupId").text workflow_instance.workflow_context_id = instance.find(f".//{{{NS_WORKFLOW}}}WorkflowContextId").text workflow_instance.workflow_context_name = instance.find(f".//{{{NS_WORKFLOW}}}WorkflowContextName").text workflow_instance.workflow_definition_id = instance.find( f".//{{{NS_WORKFLOW}}}WorkflowDefinitionTextId").text workflow_instances.append(workflow_instance) return tuple((total_count, count, workflow_instances)) else: logger.error(request.content) raise RuntimeError(request.status_code, "workflow_instances")