Source code for pyPreservica.retentionAPI

"""
pyPreservica RetentionAPI module definition

A client library for the Preservica Repository web services Entity API
https://us.preservica.com/api/entity/documentation.html

author:     James Carr
licence:    Apache License 2.0

"""

import xml.etree.ElementTree
from typing import Set, Callable, Generator

from pyPreservica.common import *

logger = logging.getLogger(__name__)


[docs] class RetentionAssignment: def __init__(self, entity_reference: str, policy_reference: str, api_id: str, start_date, expired=False): self.entity_reference = entity_reference self.policy_reference = policy_reference self.api_id = api_id self.start_date = start_date self.expired = expired def __str__(self): return f"Entity Reference:\t\t\t{self.entity_reference}\n" \ f"Policy Reference:\t\t\t{self.policy_reference}\n" def __repr__(self): return self.__str__()
[docs] class RetentionPolicy: def __init__(self, name: str, reference: str): self.name = name self.reference = reference self.description = "" self.security_tag = "" self.start_date_field = "" self.period = "" self.expiry_action = "" self.assignable = True self.restriction = "" self.period_unit = "" def __str__(self): return f"Ref:\t\t\t{self.reference}\n" \ f"Name:\t\t\t{self.name}\n" \ f"Description:\t{self.description}\n" def __repr__(self): return self.__str__()
[docs] class RetentionAPI(AuthenticatedAPI): def __init__(self, username=None, password=None, tenant=None, server=None, use_shared_secret=False, two_fa_secret_key: str = None, protocol: str = "https", request_hook: Callable = None, credentials_path: str = 'credentials.properties'): super().__init__(username, password, tenant, server, use_shared_secret, two_fa_secret_key, protocol, request_hook, credentials_path) if self.major_version < 7 and self.minor_version < 2: raise RuntimeError("Retention API is only available when connected to a v6.2 System")
[docs] def policy(self, reference: str) -> RetentionPolicy: """ Return a retention policy by reference :param reference: The policy reference :type reference: str :return: The retention policy :rtype: RetentionPolicy """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} request = self.session.get(f'{self.protocol}://{self.server}/api/entity/retention-policies/{reference}', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) ref_element = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}Ref') ref: str = ref_element.text assert ref is not None assert ref == reference name = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}Name').text rp = RetentionPolicy(name, ref) description = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}Description') rp.description = description.text if description is not None else "" security_tag = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}SecurityTag').text rp.security_tag = security_tag start_date_field = entity_response.find( f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}StartDateField') if start_date_field is not None: rp.start_date_field = start_date_field.text else: rp.start_date_field = None period = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}Period') if period is not None: rp.period = period.text else: rp.period = None period_unit = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}PeriodUnit') if period_unit is not None: rp.period_unit = period_unit.text else: rp.period_unit = None expiry_action = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}ExpiryAction') if expiry_action is not None: rp.expiry_action = expiry_action.text else: rp.expiry_action = None restriction = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}Restriction') if restriction is not None: rp.restriction = restriction.text else: rp.restriction = None assignable = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy/{{{self.rm_ns}}}Assignable') rp.assignable = strtobool(assignable.text) return rp else: logger.error(f"policy failed with error code {request.status_code}") raise RuntimeError(request.status_code, "policy failed")
[docs] def assignable_policy(self, reference: str, status: bool): """ Make a policy assignable :param reference: The policy ID :type reference: str :param status: The assignable status :type status: bool :return: """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain;charset=UTF-8'} data = str(status) request = self.session.put( f'{self.protocol}://{self.server}/api/entity/retention-policies/{reference}/assignable', headers=headers, data=data) if request.status_code == requests.codes.ok: return None else: logger.error(f"assignable_policy failed with error code {request.status_code}") raise RuntimeError(request.status_code, "assignable_policy failed")
[docs] def update_policy(self, reference: str, **kwargs): """ Update an existing policy Arguments are kwargs map Name Description SecurityTag StartDateField Period PeriodUnit ExpiryAction ExpiryActionParameters Restriction Assignable """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} retention_policy = xml.etree.ElementTree.Element('RetentionPolicy ', {"xmlns": self.rm_ns}) if 'Name' in kwargs: name = kwargs.get("Name") else: raise RuntimeError("No Name specified in kwargs argument") if 'Description' in kwargs: description = kwargs.get("Description") else: raise RuntimeError("No Description specified in kwargs argument") if 'SecurityTag' in kwargs: security_tag = kwargs.get("SecurityTag") else: raise RuntimeError("No SecurityTag specified in kwargs argument") if 'StartDateField' in kwargs: start_date_field = kwargs.get("StartDateField") else: raise RuntimeError("No StartDateField specified in kwargs argument") if 'Period' in kwargs: period = kwargs.get("Period") else: raise RuntimeError("No Period specified in kwargs argument") if 'PeriodUnit' in kwargs: period_unit = kwargs.get("PeriodUnit") else: raise RuntimeError("No PeriodUnit specified in kwargs argument") if 'ExpiryAction' in kwargs: expiry_action = kwargs.get("ExpiryAction") else: raise RuntimeError("No ExpiryAction specified in kwargs argument") if 'ExpiryActionParameters' in kwargs: expiry_action_parameters = kwargs.get("ExpiryActionParameters") else: raise RuntimeError("No ExpiryActionParameters specified in kwargs argument") if 'Restriction' in kwargs: restriction = kwargs.get("Restriction") else: raise RuntimeError("No Restriction specified in kwargs argument") if 'Assignable' in kwargs: assignable = bool(kwargs.get("Assignable")) else: raise RuntimeError("No Assignable specified in kwargs argument") xml.etree.ElementTree.SubElement(retention_policy, "Ref").text = reference xml.etree.ElementTree.SubElement(retention_policy, "Name").text = name xml.etree.ElementTree.SubElement(retention_policy, "Description").text = description xml.etree.ElementTree.SubElement(retention_policy, "SecurityTag").text = security_tag xml.etree.ElementTree.SubElement(retention_policy, "StartDateField").text = start_date_field xml.etree.ElementTree.SubElement(retention_policy, "Period").text = period xml.etree.ElementTree.SubElement(retention_policy, "PeriodUnit").text = period_unit xml.etree.ElementTree.SubElement(retention_policy, "ExpiryAction").text = expiry_action xml.etree.ElementTree.SubElement(retention_policy, "ExpiryActionParameters").text = expiry_action_parameters xml.etree.ElementTree.SubElement(retention_policy, "Restriction").text = restriction xml.etree.ElementTree.SubElement(retention_policy, "Assignable").text = str(assignable) xml_request = xml.etree.ElementTree.tostring(retention_policy, encoding='utf-8') request = self.session.put(f'{self.protocol}://{self.server}/api/entity/retention-policies/{reference}', data=xml_request, headers=headers) if request.status_code == requests.codes.ok: return self.policy(reference) else: logger.error(str(request.content.decode('utf-8'))) raise RuntimeError(request.status_code, "update_policy failed " + str(request.content.decode('utf-8')))
[docs] def create_policy(self, **kwargs): """ Create a new policy Arguments are kwargs map Name Description SecurityTag StartDateField Period PeriodUnit ExpiryAction ExpiryActionParameters Restriction Assignable """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} retention_policy = xml.etree.ElementTree.Element('RetentionPolicy ', {"xmlns": self.rm_ns}) if 'Name' in kwargs: name = kwargs.get("Name") else: raise RuntimeError("No Name specified in kwargs argument") if 'Description' in kwargs: description = kwargs.get("Description") else: raise RuntimeError("No Description specified in kwargs argument") if 'SecurityTag' in kwargs: security_tag = kwargs.get("SecurityTag") else: raise RuntimeError("No SecurityTag specified in kwargs argument") if 'StartDateField' in kwargs: start_date_field = kwargs.get("StartDateField") else: raise RuntimeError("No StartDateField specified in kwargs argument") if 'Period' in kwargs: period = kwargs.get("Period") else: raise RuntimeError("No Period specified in kwargs argument") if 'PeriodUnit' in kwargs: period_unit = kwargs.get("PeriodUnit") else: raise RuntimeError("No PeriodUnit specified in kwargs argument") if 'ExpiryAction' in kwargs: expiry_action = kwargs.get("ExpiryAction") else: raise RuntimeError("No ExpiryAction specified in kwargs argument") if 'ExpiryActionParameters' in kwargs: expiry_action_parameters = kwargs.get("ExpiryActionParameters") else: raise RuntimeError("No ExpiryActionParameters specified in kwargs argument") if 'Restriction' in kwargs: restriction = kwargs.get("Restriction") else: raise RuntimeError("No Restriction specified in kwargs argument") if 'Assignable' in kwargs: assignable = bool(kwargs.get("Assignable")) else: raise RuntimeError("No Assignable specified in kwargs argument") xml.etree.ElementTree.SubElement(retention_policy, "Name").text = name xml.etree.ElementTree.SubElement(retention_policy, "Description").text = description xml.etree.ElementTree.SubElement(retention_policy, "SecurityTag").text = security_tag xml.etree.ElementTree.SubElement(retention_policy, "StartDateField").text = start_date_field xml.etree.ElementTree.SubElement(retention_policy, "Period").text = period xml.etree.ElementTree.SubElement(retention_policy, "PeriodUnit").text = period_unit xml.etree.ElementTree.SubElement(retention_policy, "ExpiryAction").text = expiry_action xml.etree.ElementTree.SubElement(retention_policy, "ExpiryActionParameters").text = expiry_action_parameters xml.etree.ElementTree.SubElement(retention_policy, "Restriction").text = restriction xml.etree.ElementTree.SubElement(retention_policy, "Assignable").text = str(assignable) xml_request = xml.etree.ElementTree.tostring(retention_policy, encoding='utf-8') request = self.session.post(f'{self.protocol}://{self.server}/api/entity/retention-policies', data=xml_request, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) retention_policy = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy') ref = retention_policy.find(f'.//{{{self.rm_ns}}}Ref').text return self.policy(ref) else: logger.error(f'create_policy failed {request.status_code}') logger.error(str(request.content.decode('utf-8'))) raise RuntimeError(request.status_code, "create_policy failed")
[docs] def delete_policy(self, reference: str): """ Delete a retention policy :param reference: The policy reference :type reference: str """ headers = {HEADER_TOKEN: self.token} request = self.session.delete(f'{self.protocol}://{self.server}/api/entity/retention-policies/{reference}', headers=headers) if request.status_code == requests.codes.no_content: return None else: logger.error(f'delete_policy failed {request.status_code}') raise RuntimeError(request.status_code, "delete_policy failed")
[docs] def policy_by_name(self, name: str) -> RetentionPolicy: """ Return a retention policy by name :param name: The policy name :type name: str :return: The retention policy :rtype: RetentionPolicy """ for policy in self.policies(): if policy.name == name: return self.policy(reference=policy.reference) return None
[docs] def policies(self) -> Generator[RetentionPolicy, None, None]: """ Return a list of all retention policies Returns a maximum of 100 policies for each call to the server :return: Generator of retention policies :rtype: Generator[RetentionPolicy] """ paged_set = self._policies_set(maximum=100, next_page=None) for policy in paged_set.results: yield policy while paged_set.has_more: paged_set = self._policies_set(maximum=100, next_page=paged_set.next_page) for policy in paged_set.results: yield policy
def _policies_set(self, maximum: int = 250, next_page: str = None) -> PagedSet: """ Return a list of all retention policies Returns a maximum of 250 policies by default Internal helper function not part of the public API :return: Set of retention policies :rtype: Set[RetentionPolicy] """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} if next_page is None: params = {'start': '0', 'max': str(maximum)} request = self.session.get(f'{self.protocol}://{self.server}/api/entity/retention-policies', params=params, headers=headers) else: request = self.session.get(next_page, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) logger.debug(xml_response) result = set() next_url = entity_response.find(f'.//{{{self.entity_ns}}}Paging/{{{self.entity_ns}}}Next') total_results = int(entity_response.find(f'.//{{{self.entity_ns}}}TotalResults').text) for assignment in entity_response.findall(f'.//{{{self.entity_ns}}}RetentionPolicy'): ref = assignment.attrib['ref'] result.add(self.policy(reference=ref)) has_more = True url = None if next_url is None: has_more = False else: url = next_url.text return PagedSet(result, has_more, total_results, url) else: raise RuntimeError(request.status_code, "policies failed")
[docs] def add_assignments(self, entity: Entity, policy: RetentionPolicy) -> RetentionAssignment: """ Assign a retention policy to an Asset. :param entity: The Preservica Entity to assign a policy to :type entity: Entity :param policy: The RetentionAssignment :type policy: RetentionPolicy :return: The RetentionAssignment :rtype: RetentionAssignment """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} if not isinstance(entity, Asset): raise RuntimeError("Retention policies can only be assigned to Assets") assignment = xml.etree.ElementTree.Element('RetentionAssignment', {"xmlns": self.rm_ns}) xml.etree.ElementTree.SubElement(assignment, "RetentionPolicy").text = policy.reference xml_request = xml.etree.ElementTree.tostring(assignment, encoding='utf-8').decode('utf-8') logger.debug(xml_request) request = self.session.post( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/retention-assignments', headers=headers, data=xml_request) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) api_id = entity_response.find(f'.//{{{self.rm_ns}}}ApiId').text policy_ref = entity_response.find(f'.//{{{self.rm_ns}}}RetentionPolicy').text entity_ref = entity_response.find(f'.//{{{self.rm_ns}}}Entity').text start_date = entity_response.find(f'.//{{{self.rm_ns}}}StartDate') if start_date is not None: start_date = start_date.text else: start_date = None assert entity_ref == entity.reference assert policy_ref == policy.reference return RetentionAssignment(entity_ref, policy_ref, api_id, start_date) else: logger.debug(f"add_assignments failed {request.status_code}") logger.error(str(request.content.decode('utf-8'))) raise RuntimeError(request.status_code, "add_assignments failed")
[docs] def remove_assignments(self, retention_assignment: RetentionAssignment): """ Delete a retention policy from an asset :param retention_assignment: The Preservica Entity to assign a policy to :type retention_assignment: RetentionAssignment :return: The Asset Reference :rtype: str """ headers = {HEADER_TOKEN: self.token} request = self.session.delete( f'{self.protocol}://{self.server}/api/entity/information-objects/{retention_assignment.entity_reference}/retention' f'-assignments/{retention_assignment.api_id}', headers=headers) if request.status_code == requests.codes.no_content: return retention_assignment.entity_reference else: raise RuntimeError(request.status_code, "remove_assignments failed")
[docs] def assignments(self, entity: Entity) -> Generator[RetentionPolicy, None, None]: """ Return a list of retention policies for an entity. :param entity: The entity to fetch assignments for :type entity: class:`Entity` :return: Policy assignments :rtype: Generator[RetentionAssignment] """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} request = self.session.get( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/retention-assignments', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) for assignment in entity_response.findall(f'.//{{{self.rm_ns}}}RetentionAssignment'): entity_ref = assignment.find(f'.//{{{self.rm_ns}}}Entity').text assert entity_ref == entity.reference policy = assignment.find(f'.//{{{self.rm_ns}}}RetentionPolicy').text start_date = assignment.find(f'.//{{{self.rm_ns}}}StartDate') if start_date is not None: start_date = start_date.text else: start_date = None expired = bool(assignment.find(f'.//{{{self.rm_ns}}}Expired').text == 'true') api_id = assignment.find(f'.//{{{self.rm_ns}}}ApiId').text ra = RetentionAssignment(entity_ref, policy, api_id, start_date, expired) yield ra else: raise RuntimeError(request.status_code, "assignments failed")