Source code for pyPreservica.entityAPI

"""
pyPreservica EntityAPI module definition

A client library for the Preservica Repository web services Entity API
https://us.preservica.com/api/entity/documentation.html

author:     James Carr
licence:    Apache License 2.0

"""

import os.path
import uuid
import xml.etree.ElementTree
from datetime import timedelta, timezone
from io import BytesIO
from time import sleep
from typing import Any, Generator, Tuple, Iterable, Union, Callable


from pyPreservica.common import *

logger = logging.getLogger(__name__)


[docs] class EntityAPI(AuthenticatedAPI): """ A class for the Preservica Repository web services Entity API https://us.preservica.com/api/entity/documentation.html The EntityAPI allows users to interact with the Preservica repository """ def __init__(self, username: str|None = None, password: str|None = None, tenant: str|None = None, server: str|None = None, use_shared_secret: bool = False, two_fa_secret_key: str|None = None, protocol: str = "https", request_hook: Callable|None = None, credentials_path: str = 'credentials.properties'): super().__init__(username, password, tenant, server, use_shared_secret, two_fa_secret_key, protocol, request_hook, credentials_path) xml.etree.ElementTree.register_namespace("oai_dc", "http://www.openarchives.org/OAI/2.0/oai_dc/") xml.etree.ElementTree.register_namespace("ead", "urn:isbn:1-931666-22-9")
[docs] def user_security_tags(self, with_permissions: bool = False) -> dict: """ Return security tags available for the current user :param with_permissions: Return the permissions for each security tag :type with_permissions: bool :return: dict of security tags :rtype: dict """ return self.security_tags_base(with_permissions=with_permissions)
[docs] def bitstream_chunks(self, bitstream: Bitstream, chunk_size: int = CHUNK_SIZE) -> Generator: """ Generator function to return bitstream chunks, allows the clients to process chunks as they are downloaded. :param bitstream: A bitstream object :type bitstream: Bitstream :param chunk_size: Optional size of the chunks to be downloaded :type chunk_size: int :return: Iterator :rtype: Generator """ if not isinstance(bitstream, Bitstream): logger.error("bitstream_content argument is not a Bitstream object") raise RuntimeError("bitstream_bytes argument is not a Bitstream object") with self.session.get(bitstream.content_url, headers={HEADER_TOKEN: self.token, 'X-STREAM-No-Retry': 'true'}, stream=True) as request: if request.status_code == requests.codes.unauthorized: self.token = self.__token__() yield from self.bitstream_chunks(bitstream) elif request.status_code == requests.codes.ok: for chunk in request.iter_content(chunk_size=chunk_size): yield chunk else: exception = HTTPException(bitstream.filename, request.status_code, request.url, "bitstream_chunks", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def bitstream_bytes(self, bitstream: Bitstream, chunk_size: int = CHUNK_SIZE) -> BytesIO: """ Download a file represented as a Bitstream to a byteIO array Returns the byteIO Returns None if the file does not contain the correct number of bytes (default 2k) :param chunk_size: The buffer copy chunk size in bytes default :param bitstream: A Bitstream object :type bitstream: Bitstream :return: The file in bytes :rtype: byteIO """ if not isinstance(bitstream, Bitstream): logger.error("bitstream_content argument is not a Bitstream object") raise RuntimeError("bitstream_bytes argument is not a Bitstream object") with self.session.get(bitstream.content_url, headers={HEADER_TOKEN: self.token, 'X-STREAM-No-Retry': 'true'}, stream=True) as response: if response.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.bitstream_bytes(bitstream) elif response.status_code == requests.codes.ok: file_bytes = BytesIO() for chunk in response.iter_content(chunk_size=chunk_size): file_bytes.write(chunk) file_bytes.seek(0) if file_bytes.getbuffer().nbytes == bitstream.length: logger.debug(f"Downloaded {bitstream.length} bytes from {bitstream.filename}") return file_bytes else: logger.error("Downloaded file size did not match the Preservica held value") raise RuntimeError("Downloaded file size did not match the Preservica held value") else: exception = HTTPException(bitstream.filename, response.status_code, response.url, "bitstream_bytes", response.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def bitstream_location(self, bitstream: Bitstream) -> list: """" Retrieves information about a bitstreams storage locations :param Bitstream bitstream: The bitstream object :return: A list of strings containing all the storage locations of this bitstream :rtype: list """ if not isinstance(bitstream, Bitstream): logger.error("bitstream argument is not a Bitstream object") raise RuntimeError("bitstream argument is not a Bitstream object") storage_locations = [] url: str = f'{self.protocol}://{self.server}/api/entity/content-objects/{bitstream.co_ref}/generations/{bitstream.gen_index}/bitstreams/{bitstream.bs_index}/storage-locations' with self.session.get(url, headers={HEADER_TOKEN: self.token}) as request: if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) logger.debug(xml_response) locations = entity_response.find(f'.//{{{self.entity_ns}}}StorageLocation') if locations is not None: for adapter in locations: storage_locations.append(adapter.attrib['name']) return storage_locations else: exception = HTTPException(bitstream.filename, request.status_code, request.url, "bitstream_location", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def bitstream_content(self, bitstream: Bitstream, filename: str, chunk_size: int = CHUNK_SIZE) -> int: """ Download a file represented as a Bitstream to a local filename Returns the number of bytes written to the file Returns None if the file does not contain the correct number of bytes :param chunk_size: The buffer copy chunk size in bytes default :param bitstream: A Bitstream object :type bitstream: Bitstream :param filename: The filename to write the bytes to :type filename: str :return: The size of the file in bytes :rtype: int """ if not isinstance(bitstream, Bitstream): logger.error("bitstream_content argument is not a Bitstream object") raise RuntimeError("bitstream_content argument is not a Bitstream object") with self.session.get(bitstream.content_url, headers={HEADER_TOKEN: self.token, 'X-STREAM-No-Retry': 'true'}, stream=True) as request: if request.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.bitstream_content(bitstream, filename) elif request.status_code == requests.codes.ok: with open(filename, 'wb') as file: for chunk in request.iter_content(chunk_size=chunk_size): file.write(chunk) file.flush() if os.path.getsize(filename) == bitstream.length: logger.debug(f"Downloaded {bitstream.length} bytes into {filename}") return bitstream.length else: logger.error("Download file size did not match the Preservica held value") os.remove(filename) raise RuntimeError("Downloaded file size did not match the Preservica held value") else: exception = HTTPException(bitstream.filename, request.status_code, request.url, "bitstream_content", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def download_opex(self, pid: str) -> str: """ Download a completed OPEX export using the workflow process ID :param pid: A process id which identifiers the export workflow :type pid: str :return: The downloaded zip file name :rtype: str """ headers = {HEADER_TOKEN: self.__token__(), 'Content-Type': 'application/xml;charset=UTF-8', 'X-STREAM-No-Retry': 'true'} download = self.session.get(f'{self.protocol}://{self.server}/api/entity/actions/exports/{pid}/content', stream=True, headers=headers) if download.status_code == requests.codes.ok: with open(f'{pid}.zip', 'wb') as file: for chunk in download.iter_content(chunk_size=CHUNK_SIZE): file.write(chunk) file.flush() logger.debug(f"Downloaded open package into {pid}.zip") return f'{pid}.zip' elif download.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.download_opex(pid) else: exception = HTTPException(pid, download.status_code, download.url, "download_opex", download.content.decode('utf-8')) logger.error(exception) raise exception
def __export_opex_start__(self, entity: Entity, **kwargs) -> str: """ Initiates export of the entity and downloads the opex package By default, includes content, metadata with the latest active generations and the parent hierarchy. Arguments are kwargs map IncludeContent IncludeMetadata IncludedGenerations IncludeParentHierarchy """ if self.major_version < 7 and self.minor_version < 2: raise RuntimeError("export_opex API is only available when connected to a v6.2 system or above") headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} include_content_options = ("Content", "NoContent") include_metadata_options = ("Metadata", "NoMetadata", "MetadataWithEvents") include_generation_options = ("LatestActive", "AllActive", "All") include_parent_options = ("true", "false") include_content = "Content" if 'IncludeContent' in kwargs: value = str(kwargs.get("IncludeContent")).strip() if value.casefold() in map(str.casefold, include_content_options): include_content = value include_metadata = "Metadata" if 'IncludeMetadata' in kwargs: value = str(kwargs.get("IncludeMetadata")).strip() if value.casefold() in map(str.casefold, include_metadata_options): include_metadata = value include_generation = "All" if 'IncludedGenerations' in kwargs: value = str(kwargs.get("IncludedGenerations")).strip() if value.casefold() in map(str.casefold, include_generation_options): include_generation = value include_parent = "true" if 'IncludeParentHierarchy' in kwargs: value = str(kwargs.get("IncludeParentHierarchy")).strip() if value.casefold() in map(str.casefold, include_parent_options): include_parent = value xml_object = xml.etree.ElementTree.Element('ExportAction', {"xmlns": self.entity_ns}) xml.etree.ElementTree.SubElement(xml_object, "IncludeContent").text = include_content xml.etree.ElementTree.SubElement(xml_object, "IncludeMetadata").text = include_metadata xml.etree.ElementTree.SubElement(xml_object, "IncludedGenerations").text = include_generation xml.etree.ElementTree.SubElement(xml_object, "IncludeParentHierarchy").text = include_parent.lower() xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8') logger.debug(xml_request) request = self.session.post( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/exports', headers=headers, data=xml_request) if request.status_code == requests.codes.accepted: return str(request.content.decode('utf-8')) else: exception = HTTPException(entity.reference, request.status_code, request.url, "__export_opex_start__", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def export_opex_async(self, entity: Entity, **kwargs) -> str: """ Initiates export of the entity returns an id to track progress """ return self.__export_opex_start__(entity, **kwargs)
[docs] def export_opex_sync(self, entity: Entity, **kwargs) -> str: """ Initiates export of the entity and downloads the opex package Blocks until the package is downloaded By default, includes content, metadata with the latest active generations and the parent hierarchy. Arguments are kwargs map IncludeContent IncludeMetadata IncludedGenerations IncludeParentHierarchy """ return self.export_opex(entity, **kwargs)
[docs] def export_opex(self, entity: Entity, **kwargs) -> str: """ Initiates export of the entity and downloads the opex package Blocks until the package is downloaded By default, includes content, metadata with the latest active generations and the parent hierarchy. Arguments are kwargs map IncludeContent IncludeMetadata IncludedGenerations IncludeParentHierarchy :param Entity entity: The entity to export Asset or Folder :param str IncludeContent: "Content", "NoContent" :param str IncludeMetadata: "Metadata", "NoMetadata", "MetadataWithEvents" :param str IncludedGenerations: "LatestActive", "AllActive", "All" :param str IncludeParentHierarchy: "true", "false" :return: The path to the opex ZIP file :rtype: str """ status = "ACTIVE" pid = self.__export_opex_start__(entity, **kwargs) while status == "ACTIVE": status = self.get_async_progress(pid) logger.debug(status) if status == "COMPLETED": return self.download_opex(pid) else: logger.error(status) raise RuntimeError(f"export progress failed {status}")
[docs] def download(self, entity: Entity, filename: str) -> str: """ Download the first generation of the access representation of an asset :param Entity entity: The entity :param str filename: The file the image is written to :return: The filename :rtype: str """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream', 'X-STREAM-No-Retry': 'true'} params = {'id': f'sdb:{entity.entity_type.value}|{entity.reference}'} with self.session.get(f'{self.protocol}://{self.server}/api/content/download', params=params, headers=headers, stream=True) as request: if request.status_code == requests.codes.ok: with open(filename, 'wb') as file: for chunk in request.iter_content(chunk_size=CHUNK_SIZE): file.write(chunk) file.flush() return filename elif request.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.download(entity, filename) else: exception = HTTPException(entity.reference, request.status_code, request.url, "download", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def has_thumbnail(self, entity: Entity) -> bool: """ Does the entity have a thumbnail image attached Returns false if the entity has no thumbnail :param entity: The entity """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'} params = {'id': f'sdb:{entity.entity_type.value}|{entity.reference}', 'size': f'{Thumbnail.SMALL.value}'} with self.session.get(f'{self.protocol}://{self.server}/api/content/thumbnail', params=params, headers=headers) as request: if request.status_code == requests.codes.ok: return True if request.status_code == requests.codes.not_found: return False else: exception = HTTPException(entity.reference, request.status_code, request.url, "has_thumbnail", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def thumbnail(self, entity: Entity, filename: str, size=Thumbnail.LARGE): """ Get the thumbnail image for an asset or folder :param Entity entity: The entity :param str filename: The file the image is written to :param Thumbnail size: The size of the thumbnail image :return: The filename :rtype: str """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream', 'X-STREAM-No-Retry': 'true'} params = {'id': f'sdb:{entity.entity_type.value}|{entity.reference}', 'size': f'{size.value}'} with self.session.get(f'{self.protocol}://{self.server}/api/content/thumbnail', params=params, headers=headers, stream=True) as request: if request.status_code == requests.codes.ok: with open(filename, 'wb') as file: for chunk in request.iter_content(chunk_size=CHUNK_SIZE): file.write(chunk) file.flush() return filename elif request.status_code == requests.codes.not_found: return None elif request.status_code == requests.codes.unauthorized: self.token = self.__token__() return self.thumbnail(entity, filename, size=size) else: exception = HTTPException(entity.reference, request.status_code, request.url, "thumbnail", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def delete_identifiers(self, entity: Entity, identifier_type: str|None = None, identifier_value: str|None = None) -> Entity: """ Delete identifiers on an Entity object Setting identifier_type and identifier_value to None acts a wildcard deleting all identifiers on the entity :param Entity entity: The entity the identifiers are deleted from :param str identifier_type: The identifier type :param str identifier_value: The identifier value :return: entity :rtype: Entity """ if (self.major_version < 6) or (self.major_version == 6 and self.minor_version < 1): raise RuntimeError("delete_identifiers API call is not available when connected to a v6.0 System") headers = {HEADER_TOKEN: self.token} request = self.session.get( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/identifiers', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) identifier_list = entity_response.findall(f'.//{{{self.xip_ns}}}Identifier') for identifier_element in identifier_list: _ref = _type = _value = _aipid = None for identifier in identifier_element: if identifier.tag.endswith("Entity"): _ref = identifier.text if identifier.tag.endswith("Type") and identifier_type is not None: _type = identifier.text if identifier.tag.endswith("Value") and identifier_value is not None: _value = identifier.text if identifier.tag.endswith("ApiId"): _aipid = identifier.text if _ref == entity.reference and _type == identifier_type and _value == identifier_value: del_req = self.session.delete( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/identifiers/{_aipid}', headers=headers) if del_req.status_code == requests.codes.no_content: pass else: raise RuntimeError(del_req.status_code, "delete_identifier failed") return entity else: logger.error(request) raise RuntimeError(request.status_code, "delete_identifier failed")
[docs] def entity_identifiers(self, entity: Entity, external_identifier_type: str|None = None) -> set[ExternIdentifier]: """ Get all external identifiers on an entity Returns the set of external identifiers on the entity :param entity: The Entity (Asset or Folder) :param external_identifier_type: Optional identifier type to filter the results :type entity: Entity """ headers = {HEADER_TOKEN: self.token} request = self.session.get( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/identifiers', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) identifier_list = entity_response.findall(f'.//{{{self.xip_ns}}}Identifier') result = set() for identifier in identifier_list: identifier_value = identifier_type = identifier_id = "" for child in identifier: if child.tag.endswith("Type"): identifier_type = child.text if child.tag.endswith("Value"): identifier_value = child.text if child.tag.endswith("ApiId"): identifier_id = child.text if external_identifier_type is None: external_id: ExternIdentifier = ExternIdentifier(identifier_type, identifier_value) external_id.identifier_id = identifier_id result.add(external_id) else: if identifier_type == external_identifier_type: external_id: ExternIdentifier = ExternIdentifier(identifier_type, identifier_value) external_id.identifier_id = identifier_id result.add(external_id) return result else: exception = HTTPException(entity.reference, request.status_code, request.url, "entity_identifiers", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def identifiers_for_entity(self, entity: Entity) -> set[Tuple]: """ Return a set of identifiers which belong to the entity :param Entity entity: The entity :return: Set of identifiers as tuples :rtype: set(Tuple) """ headers = {HEADER_TOKEN: self.token} request = self.session.get( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/identifiers', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) identifier_list = entity_response.findall(f'.//{{{self.xip_ns}}}Identifier') result = set() for identifier in identifier_list: identifier_value = identifier_type = "" for child in identifier: if child.tag.endswith("Type"): identifier_type = child.text if child.tag.endswith("Value"): identifier_value = child.text result.add(tuple((identifier_type, identifier_value))) return result else: exception = HTTPException(entity.reference, request.status_code, request.url, "identifiers_for_entity", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def identifier(self, identifier_type: str, identifier_value: str) -> set[EntityT]: """ Return a set of entities with external identifiers which match the type and value :param str identifier_type: The identifier type :param str identifier_value: The identifier value :return: Set of entity objects which have a reference and title attribute :rtype: set(Entity) """ headers = {HEADER_TOKEN: self.token} payload = {'type': identifier_type, 'value': identifier_value} request = self.session.get(f'{self.protocol}://{self.server}/api/entity/entities/by-identifier', params=payload, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) entity_list = entity_response.findall(f'.//{{{self.entity_ns}}}Entity') result = set() for entity in entity_list: if entity.attrib['type'] == EntityType.FOLDER.value: folder = Folder(entity.attrib['ref'], entity.attrib['title'], None, None, None, None) result.add(folder) elif entity.attrib['type'] == EntityType.ASSET.value: asset = Asset(entity.attrib['ref'], entity.attrib['title'], None, None, None, None) result.add(asset) elif entity.attrib['type'] == EntityType.CONTENT_OBJECT.value: co = ContentObject(entity.attrib['ref'], entity.attrib['title'], None, None, None, None) result.add(co) return result else: exception = HTTPException(payload, request.status_code, request.url, "identifier", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def add_identifier(self, entity: Entity, identifier_type: str, identifier_value: str): """ Add a new external identifier to an Entity object :param Entity entity: The entity the identifier is added to :param str identifier_type: The identifier type :param str identifier_value: The identifier value :return: An internal id for this external identifier :rtype: str """ if self.major_version < 7 and self.minor_version < 1: raise RuntimeError("add_identifier API call is not available when connected to a v6.0 System") headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} xml_object = xml.etree.ElementTree.Element('Identifier', {"xmlns": self.xip_ns}) xml.etree.ElementTree.SubElement(xml_object, "Type").text = identifier_type xml.etree.ElementTree.SubElement(xml_object, "Value").text = identifier_value xml.etree.ElementTree.SubElement(xml_object, "Entity").text = entity.reference end_point = f"/{entity.path}/{entity.reference}/identifiers" xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8') logger.debug(xml_request) request = self.session.post(f'{self.protocol}://{self.server}/api/entity{end_point}', data=xml_request, headers=headers) if request.status_code == requests.codes.ok: xml_string = str(request.content.decode("utf-8")) identifier_response = xml.etree.ElementTree.fromstring(xml_string) aip_id = identifier_response.find(f'.//{{{self.xip_ns}}}ApiId') if hasattr(aip_id, 'text'): return aip_id.text else: return None else: exception = HTTPException(entity.reference, request.status_code, request.url, "add_identifier", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def update_identifiers(self, entity: Entity, identifier_type: str|None = None, identifier_value: str|None = None): """ Update external identifiers based on Entity and Type Returns the internal identifier DB Key :param entity: The entity to delete identifiers from :param identifier_type: The type of the identifier to delete. :param identifier_value: The value of the identifier to delete. """ if (self.major_version < 7) and (self.minor_version < 1): raise RuntimeError("update_identifiers API call is not available when connected to a v6.0 System") headers = {HEADER_TOKEN: self.token} response = self.session.get( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/identifiers', headers=headers) if response.status_code == requests.codes.ok: xml_response = str(response.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) identifier_list = entity_response.findall(f'.//{{{self.xip_ns}}}Identifier') for identifier_element in identifier_list: _ref = _type = _value = _aipid = None for identifier in identifier_element: if identifier.tag.endswith("Entity"): _ref = identifier.text if identifier.tag.endswith("Type") and identifier_type is not None: _type = identifier.text if identifier.tag.endswith("Value") and identifier_value is not None: _value = identifier.text if identifier.tag.endswith("ApiId"): _aipid = identifier.text if _ref == entity.reference and _type == identifier_type: headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} xml_object = xml.etree.ElementTree.Element('Identifier', {"xmlns": self.xip_ns}) xml.etree.ElementTree.SubElement(xml_object, "Type").text = identifier_type xml.etree.ElementTree.SubElement(xml_object, "Value").text = identifier_value xml.etree.ElementTree.SubElement(xml_object, "Entity").text = entity.reference xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8') put_response = self.session.put( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/identifiers/{_aipid}', headers=headers, data=xml_request) if put_response.status_code == requests.codes.ok: xml_string = str(put_response.content.decode("utf-8")) identifier_response = xml.etree.ElementTree.fromstring(xml_string) aip_id = identifier_response.find(f'.//{{{self.xip_ns}}}ApiId') if hasattr(aip_id, 'text'): return aip_id.text else: return None if put_response.status_code == requests.codes.no_content: pass else: return None return entity else: logger.error(response) raise RuntimeError(response.status_code, "update_identifiers failed")
[docs] def delete_relationships(self, entity: Entity, relationship_type: str|None = None): """ Delete a relationship between two entities by its internal id This function only deletes the relationship FROM the specified entity to another entity It does not delete relationships TO this entity If relationship_type is not specified all relationships FROM this entity are deleted. :param entity: :type entity: Entity :param relationship_type: The relationship type to delete :type relationship_type: str """ if (self.major_version < 6) or ( (self.major_version < 7) and (self.minor_version < 4) ): raise RuntimeError("delete_relationships API call is only available with a Preservica v6.3.1 system or higher") for relationship in self.relationships(entity=entity): if relationship.direction == RelationshipDirection.FROM: assert relationship.this_ref == entity.reference if relationship_type is None: self.__delete_relationship(relationship) if relationship_type == relationship.relationship_type: self.__delete_relationship(relationship)
def __delete_relationship(self, relationship: Relationship): """ Delete a relationship between two entities by its internal id :param relationship: :return: """ headers = {HEADER_TOKEN: self.token} entity: Entity = self.entity(relationship.entity_type, relationship.this_ref) end_point = f"{entity.path}/{entity.reference}/links/{relationship.api_id}" request = self.session.delete(f'{self.protocol}://{self.server}/api/entity/{end_point}', headers=headers) if request.status_code == requests.codes.no_content: return None else: exception = HTTPException(entity.reference, request.status_code, request.url, "delete_relationships", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def relationships(self, entity: Entity, page_size: int = 25) -> Generator[Relationship, None, None]: """ List the relationship links between entities :param page_size: The number of items returned in a single server call :type: page_size: int :param entity: The Source Entity :type: entity: An Entity type such as Asset, Folder etc :return: Generator :rtype: Relationship """ paged_set = self.__relationships__(entity, maximum=page_size, next_page=None) for e in paged_set.results: yield e while paged_set.has_more: paged_set = self.__relationships__(entity, maximum=page_size, next_page=paged_set.next_page) for e in paged_set.results: yield e
def __relationships__(self, entity: Entity, maximum: int = 50, next_page: str|None = None) -> PagedSet: """ List the relationship links between entities :param next_page: URL to next page of results :type: next_page: str :param maximum: The number of items returned in a single server call :type: maximum: int :param entity: The Source Entity :type: from_entity: Entity :return: relationship links :rtype: list """ headers = {HEADER_TOKEN: self.token} end_point = f"{entity.path}/{entity.reference}/links" if next_page is None: params = {'start': '0', 'max': str(maximum)} request = self.session.get(f'{self.protocol}://{self.server}/api/entity/{end_point}', headers=headers, params=params) else: request = self.session.get(next_page, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) links = entity_response.findall(f'.//{{{self.entity_ns}}}Link') next_url = entity_response.find(f'.//{{{self.entity_ns}}}Paging/{{{self.entity_ns}}}Next') total_hits = entity_response.find(f'.//{{{self.entity_ns}}}Paging/{{{self.entity_ns}}}TotalResults') results = [] for link in links: link_type = link.attrib['linkType'] link_direction = link.attrib['linkDirection'] title = link.attrib['title'] other_ref = link.attrib['ref'] this_ref = entity.reference entity_type = link.attrib['type'] api_id = link.attrib['apiId'] results.append(Relationship(api_id, link_type, RelationshipDirection(link_direction), other_ref, title, EntityType(entity_type), this_ref, api_id)) has_more = True url = None if next_url is None: has_more = False else: url = next_url.text return PagedSet(results, has_more, int(total_hits.text), url) else: exception = HTTPException(entity.reference, request.status_code, request.url, "relationships", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def add_relation(self, from_entity: Entity, relationship_type: str, to_entity: Entity): """ Add a new relationship link between two Assets or Folders :param from_entity: The Source entity to link from :type from_entity: Entity :param to_entity: The Target entity :type to_entity: Entity :param relationship_type: The Relationship type :type relationship_type: str :return: relationship_type :rtype: str """ if (self.major_version < 6) or (self.major_version < 7) and (self.minor_version < 4): raise RuntimeError("add_relation API call is only available with a Preservica v6.3.1 system or higher") assert from_entity.entity_type is not EntityType.CONTENT_OBJECT assert to_entity.entity_type is not EntityType.CONTENT_OBJECT headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} xml_object = xml.etree.ElementTree.Element('Link ', {"xmlns": self.xip_ns}) xml.etree.ElementTree.SubElement(xml_object, "Type").text = relationship_type xml.etree.ElementTree.SubElement(xml_object, "FromEntity").text = from_entity.reference xml.etree.ElementTree.SubElement(xml_object, "ToEntity").text = to_entity.reference end_point = f"/{from_entity.path}/{from_entity.reference}/links" xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8') logger.debug(xml_request) request = self.session.post(f'{self.protocol}://{self.server}/api/entity{end_point}', data=xml_request, headers=headers) if request.status_code == requests.codes.ok: xml_string = str(request.content.decode("utf-8")) logger.debug(xml_string) link_response = xml.etree.ElementTree.fromstring(xml_string) relation = link_response.find(f'.//{{{self.xip_ns}}}Link') relation_type = relation.find(f'.//{{{self.xip_ns}}}Type') return relation_type.text else: exception = HTTPException(from_entity.reference, request.status_code, request.url, "add_relation", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def delete_metadata(self, entity: Entity, schema: str) -> EntityT: """ Delete an existing descriptive XML document on an entity by its schema This call will delete all fragments with the same schema :param Entity entity: The entity to add the metadata to :param str schema: The metadata schema URI :return: The updated Entity :rtype: Entity """ headers = {HEADER_TOKEN: self.token} for url in entity.metadata: if schema == entity.metadata[url]: request = self.session.delete(url, headers=headers) if request.status_code == requests.codes.no_content: pass else: exception = HTTPException(entity.reference, request.status_code, request.url, "delete_metadata", request.content.decode('utf-8')) logger.error(exception) raise exception return self.entity(entity.entity_type, entity.reference)
[docs] def add_group_metadata(self, csv_file: str) -> str: """ Perform bulk additions of metadata with a CSV file. This is designed for metadata which populates a New Gen Metadata Group Returns The process ID which will track the updates Requires DataManagement permission :param str csv_file: The path of the CSV metadata file :return: The process ID :rtype: str """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/csv;charset=UTF-8'} url = f'{self.protocol}://{self.server}/api/entity/actions/metadata-csv-edits' with open(csv_file, 'rb') as fd: with self.session.post(url, headers=headers, data=fd) as request: if request.status_code == requests.codes.accepted: return str(request.content.decode('utf-8')) else: exception = HTTPException(None, request.status_code, request.url, "add_group_metadata", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def update_metadata(self, entity: EntityT, schema: str, data: Any) -> EntityT: """ Update an existing descriptive XML document on an entity :param Entity entity: The entity to add the metadata to :param str schema: The metadata schema URI :param data data: The XML document as a string or as a file bytes :return: The updated Entity :rtype: Entity """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} if schema not in entity.metadata.values(): raise RuntimeError("Only existing schema's can be updated.") for url in entity.metadata: if schema == entity.metadata[url]: mref = url[url.rfind(f"{entity.reference}/metadata/") + len(f"{entity.reference}/metadata/"):] xml_object = xml.etree.ElementTree.Element('xip:MetadataContainer', {"schemaUri": schema, "xmlns:xip": self.xip_ns}) xml.etree.ElementTree.SubElement(xml_object, "xip:Ref").text = mref xml.etree.ElementTree.SubElement(xml_object, "xip:Entity").text = entity.reference content = xml.etree.ElementTree.SubElement(xml_object, "xip:Content") if isinstance(data, str): ob = xml.etree.ElementTree.fromstring(data) content.append(ob) elif hasattr(data, "read"): tree = xml.etree.ElementTree.parse(data) content.append(tree.getroot()) else: raise RuntimeError("Unknown data type") xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8').decode("utf-8") logger.debug(xml_request) request = self.session.put(url, data=xml_request, headers=headers) if request.status_code == requests.codes.ok: pass else: exception = HTTPException(entity.reference, request.status_code, request.url, "update_metadata", request.content.decode('utf-8')) logger.error(exception) raise exception return self.entity(entity.entity_type, entity.reference)
[docs] def add_metadata_as_fragment(self, entity: EntityT, schema: str, xml_fragment: str) -> EntityT: """ Add a metadata fragment with a given namespace URI to an Entity Don't parse the xml fragment which may add extra namespaces etc Returns The updated Entity :param str xml_fragment: The new XML as a string :param Entity entity: The entity to update :param str schema: The schema URI of the XML document :rtype: Entity """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} xml_doc = f"""<xip:MetadataContainer xmlns="{schema}" schemaUri="{schema}" xmlns:xip="{self.xip_ns}"> <xip:Entity>{entity.reference}</xip:Entity> <xip:Content> {xml_fragment} </xip:Content> </xip:MetadataContainer>""" end_point = f"/{entity.path}/{entity.reference}/metadata" logger.debug(xml_doc) request = self.session.post(f'{self.protocol}://{self.server}/api/entity{end_point}', data=xml_doc, headers=headers) if request.status_code == requests.codes.ok: return self.entity(entity_type=entity.entity_type, reference=entity.reference) else: exception = HTTPException(entity.reference, request.status_code, request.url, "add_metadata_as_fragment", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def add_metadata(self, entity: EntityT, schema: str, data) -> EntityT: """ Add a new descriptive XML document to an existing entity :param Entity entity: The entity to add the metadata to :param str schema: The metadata schema URI :param data data: The XML document as a string or as file bytes :return: The updated entity with the new metadata :rtype: Entity """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} xml_object = xml.etree.ElementTree.Element('xip:MetadataContainer', {"schemaUri": schema, "xmlns:xip": self.xip_ns}) xml.etree.ElementTree.SubElement(xml_object, "xip:Entity").text = entity.reference content = xml.etree.ElementTree.SubElement(xml_object, "xip:Content") if isinstance(data, str): ob = xml.etree.ElementTree.fromstring(data) content.append(ob) elif hasattr(data, "read"): tree = xml.etree.ElementTree.parse(data) content.append(tree.getroot()) else: raise RuntimeError("Unknown data type") xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8') end_point = f"/{entity.path}/{entity.reference}/metadata" logger.debug(xml_request) request = self.session.post(f'{self.protocol}://{self.server}/api/entity{end_point}', data=xml_request, headers=headers) if request.status_code == requests.codes.ok: return self.entity(entity_type=entity.entity_type, reference=entity.reference) else: exception = HTTPException(entity.reference, request.status_code, request.url, "add_metadata", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def save(self, entity: EntityT) -> EntityT: """ Updates the title and description of an entity The security tag and parent are not saved via this method call :param entity: The entity (asset, folder, content_object) to be updated :type entity: Entity :return: The updated entity :rtype: Entity """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} xml_object = xml.etree.ElementTree.Element(entity.tag, {"xmlns": self.xip_ns}) xml.etree.ElementTree.SubElement(xml_object, "Ref").text = entity.reference xml.etree.ElementTree.SubElement(xml_object, "Title").text = entity.title xml.etree.ElementTree.SubElement(xml_object, "Description").text = entity.description xml.etree.ElementTree.SubElement(xml_object, "SecurityTag").text = entity.security_tag if entity.custom_type is not None: xml.etree.ElementTree.SubElement(xml_object, "CustomType").text = entity.custom_type if entity.parent is not None: xml.etree.ElementTree.SubElement(xml_object, "Parent").text = entity.parent xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8') logger.debug(xml_request) request = self.session.put(f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}', data=xml_request, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) response = self.entity_from_string(xml_response) if isinstance(entity, Asset): asset = Asset(response['reference'], response['title'], response['description'], response['security_tag'], response['parent'], response['metadata']) if 'CustomType' in response: asset.custom_type = response['CustomType'] return asset elif isinstance(entity, Folder): folder = Folder(response['reference'], response['title'], response['description'], response['security_tag'], response['parent'], response['metadata']) if 'CustomType' in response: folder.custom_type = response['CustomType'] return folder elif isinstance(entity, ContentObject): content_object = ContentObject(response['reference'], response['title'], response['description'], response['security_tag'], response['parent'], response['metadata']) if 'CustomType' in response: content_object.custom_type = response['CustomType'] return content_object return None else: exception = HTTPException(entity.reference, request.status_code, request.url, "save", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def move_async(self, entity: Entity, dest_folder: Folder) -> str: """ Move an Entity (Asset or Folder) to a new Folder If dest_folder is None then the entity must be a Folder and will be moved to the root of the repository Returns a process ID asynchronous (without blocking) Returns The updated Entity :param Entity entity: The entity to move either asset or folder :param Entity dest_folder: The new destination folder. This can be None to move a folder to the root of the repository :return: Progress ID token :rtype: str """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'} if isinstance(entity, Asset) and dest_folder is None: raise RuntimeError(entity.reference, "Only folders can be moved to the root of the repository") if dest_folder is not None: data = dest_folder.reference else: data = "@root@" request = self.session.put( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/parent-ref', data=data, headers=headers) if request.status_code == requests.codes.accepted: return request.content.decode() else: exception = HTTPException(entity.reference, request.status_code, request.url, "move_async", request.content.decode('utf-8')) logger.error(exception) raise exception
def get_progress(self, pid: str) -> AsyncProgress: return AsyncProgress[self.get_async_progress(pid)]
[docs] def get_async_progress(self, pid: str) -> str: """ Return the status of a running process :param pid: The progress ID :return: Workflow status :rtype: str """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'} request = self.session.get(f"{self.protocol}://{self.server}/api/entity/progress/{pid}", headers=headers) if request.status_code == requests.codes.ok: entity_response = xml.etree.ElementTree.fromstring(request.content.decode("utf-8")) status = entity_response.find(".//{http://status.preservica.com}Status") if hasattr(status, 'text'): return status.text else: return "UNKNOWN" else: exception = HTTPException(pid, request.status_code, request.url, "get_async_progress", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def move_sync(self, entity: EntityT, dest_folder: Folder) -> EntityT: """ Move an entity (asset or folder) to a new folder This call blocks until the move is complete :param Entity entity: The entity to move either asset or folder :param Entity dest_folder: The new destination folder. This can be None to move a folder to the root of the repository :return: The updated entity :rtype: Entity """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'} if isinstance(entity, Asset) and dest_folder is None: raise RuntimeError(entity.reference, "Only folders can be moved to the root of the repository") if dest_folder is not None: data = dest_folder.reference else: data = "@root@" request = self.session.put( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/parent-ref', data=data, headers=headers) if request.status_code == requests.codes.accepted: sleep_sec = 1 while True: status = self.get_async_progress(request.content.decode("utf-8")) if status != "ACTIVE": return self.entity(entity.entity_type, entity.reference) else: sleep(sleep_sec) sleep_sec = sleep_sec + 1 else: exception = HTTPException(entity, request.status_code, request.url, "move_sync", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def move(self, entity: EntityT, dest_folder: Folder) -> EntityT: """ Move an entity (asset or folder) to a new folder This call is an alias for the move_sync (blocking) method. :param Entity entity: The entity to move either asset or folder :param Entity dest_folder: The new destination folder. This can be None to move a folder to the root of the repository :return: The updated entity :rtype: Entity """ return self.move_sync(entity, dest_folder)
[docs] def create_folder(self, title: str, description: str, security_tag: str, parent: str = None) -> Folder: """ Create a new Folder in the repository If parent is None then the new Folder will be created at the root of the repository Returns The updated Entity :param title: The title of the new folder :param description: The description of the new folder :param security_tag: The security_tag of the new folder :param parent: The parent of the new folder, Can be None to create a root Folder """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} structural_object = xml.etree.ElementTree.Element('StructuralObject', {"xmlns": self.xip_ns}) xml.etree.ElementTree.SubElement(structural_object, "Ref").text = str(uuid.uuid4()) xml.etree.ElementTree.SubElement(structural_object, "Title").text = title xml.etree.ElementTree.SubElement(structural_object, "Description").text = description xml.etree.ElementTree.SubElement(structural_object, "SecurityTag").text = security_tag if parent is not None: xml.etree.ElementTree.SubElement(structural_object, "Parent").text = parent xml_request = xml.etree.ElementTree.tostring(structural_object, encoding='utf-8') logger.debug(xml_request) request = self.session.post(f'{self.protocol}://{self.server}/api/entity/structural-objects', data=xml_request, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity = self.entity_from_string(xml_response) return Folder(entity['reference'], entity['title'], entity['description'], entity['security_tag'], entity['parent'], entity['metadata']) else: exception = HTTPException(title, request.status_code, request.url, "create_folder", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def all_metadata(self, entity: Entity) -> Generator[Tuple[str, str], None, None]: """ Retrieve all metadata fragments on an entity Returns XML documents in a tuple :param Entity entity: The entity with the metadata :return: A list of Tuples, the first value is the schmea and the second is the metadata :rtype: Generator[Tuple[str, str]] """ for uri, schema in entity.metadata.items(): yield tuple((str(schema), self.metadata(uri)))
[docs] def metadata_for_entity(self, entity: Entity, schema: str) -> Union[str, None]: """ Fetch the first metadata document which matches the schema URI from an entity :param Entity entity: The entity containing the metadata :param str schema: The metadata schema URI :return: The first XML document on the entity matching the schema URI :rtype: str """ # if the entity is a lightweight enum version request the full object if entity.metadata is None: entity = self.entity(entity.entity_type, entity.reference) for uri, schema_name in entity.metadata.items(): if schema == schema_name: return self.metadata(uri) return None
[docs] def metadata_tag_for_entity(self, entity: Entity, schema: str, tag: str, isXpath: bool = False) -> Union[str, None]: """ Retrieve the first value of the tag from a metadata template given by schema Returns XML document as a string :param isXpath: True if the tag name is a fully qualified xpath expression :param entity: The entity with the metadata :param schema: The schema URI :param tag: The tag name """ xml_doc = self.metadata_for_entity(entity, schema) if xml_doc: xml_object = xml.etree.ElementTree.fromstring(xml_doc) if not isXpath: return xml_object.find(f'.//{{*}}{tag}').text else: return xml_object.find(tag).text return None
[docs] def security_tag_sync(self, entity: EntityT, new_tag: str) -> EntityT: """ Change the security tag of an asset or folder This is a blocking call which returns after all entities have been updated. :param entity: The entity (asset, folder) to be updated :type entity: Entity :param new_tag: The new security tag to be set on the entity :type new_tag: str :return: The updated entity :rtype: Entity """ self.token = self.__token__() headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'} end_point = f"/{entity.path}/{entity.reference}/security-descriptor" request = self.session.put(f'{self.protocol}://{self.server}/api/entity{end_point}?includeDescendants=false', data=new_tag, headers=headers) if request.status_code == requests.codes.accepted: sleep_sec = 1 while True: status = self.get_async_progress(request.content.decode("utf-8")) if status != "ACTIVE": return self.entity(entity.entity_type, entity.reference) else: sleep(sleep_sec) sleep_sec = sleep_sec + 1 else: exception = HTTPException(entity.reference, request.status_code, request.url, "security_tag_sync", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def security_tag_async(self, entity: Entity, new_tag: str): """ Change the security tag of an asset or folder This is a non blocking call which returns immediately. :param entity: The entity (asset, folder) to be updated :type entity: Entity :param new_tag: The new security tag to be set on the entity :type new_tag: str :return: A progress id which can be used to monitor the workflow :rtype: str """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'} end_point = f"/{entity.path}/{entity.reference}/security-descriptor" request = self.session.put(f'{self.protocol}://{self.server}/api/entity{end_point}?includeDescendants=false', data=new_tag, headers=headers) if request.status_code == requests.codes.accepted: return request.content.decode("utf-8") else: exception = HTTPException(entity.reference, request.status_code, request.url, "security_tag_async", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def metadata(self, uri: str) -> str: """ Fetch the metadata document by its identifier, this is the key from the entity metadata map :param str uri: The metadata identifier :return: An XML document as a string :rtype: str """ request = self.session.get(uri, headers={HEADER_TOKEN: self.token}) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) content = entity_response.find(f'.//{{{self.xip_ns}}}Content') return xml.etree.ElementTree.tostring(content[0], encoding='utf-8', method='xml').decode('utf-8') else: exception = HTTPException(uri, request.status_code, request.url, "metadata", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def entity(self, entity_type: EntityType, reference: str) -> EntityT: """ Returns a generic entity based on its reference identifier :param entity_type: The type of entity :type entity_type: EntityType :param reference: The unique identifier for the entity :type reference: str :return: The entity either Asset, Folder or ContentObject :rtype: Entity :raises RuntimeError: if the identifier is incorrect """ if entity_type is EntityType.CONTENT_OBJECT: return self.content_object(reference) if entity_type is EntityType.FOLDER: return self.folder(reference) if entity_type is EntityType.ASSET: return self.asset(reference) return None
[docs] def add_physical_asset(self, title: str, description: str, parent: Folder, security_tag: str = "open") -> Asset: """ Create a new asset which represents a physical object Returns Asset :param str title: The title of the new Asset :param str description: The description of the new Asset :param Folder parent: The parent folder :param str security_tag: The security tag, defaults to open :return: The new physical object :rtype: Asset """ if (self.major_version < 7) and (self.minor_version < 4): raise RuntimeError( "add_physical_asset API call is only available with a Preservica v6.4.0 system or higher") headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} xip_object = xml.etree.ElementTree.Element('XIP ', {"xmlns": self.xip_ns}) io_object = xml.etree.ElementTree.SubElement(xip_object, "InformationObject") xml.etree.ElementTree.SubElement(io_object, "Title").text = str(title) xml.etree.ElementTree.SubElement(io_object, "Description").text = str(description) xml.etree.ElementTree.SubElement(io_object, "SecurityTag").text = str(security_tag) xml.etree.ElementTree.SubElement(io_object, "Parent").text = parent.reference rep_object = xml.etree.ElementTree.SubElement(xip_object, "Representation") xml.etree.ElementTree.SubElement(rep_object, "Type").text = "Physical" xml_request = xml.etree.ElementTree.tostring(xip_object, encoding='utf-8') request = self.session.post(f'{self.protocol}://{self.server}/api/entity/{IO_PATH}', data=xml_request, headers=headers) if request.status_code == requests.codes.ok: xml_string = str(request.content.decode("utf-8")) entity = self.entity_from_string(xml_string) return Asset(entity['reference'], entity['title'], entity['description'], entity['security_tag'], entity['parent'], entity['metadata']) else: exception = HTTPException(title, request.status_code, request.url, "add_physical_asset", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def merge_assets(self, assets: list[Asset], title: str, description: str) -> str: """ Create a new Asset with the content from each Asset in supplied list This call will create a new multipart Asset which contains all the content from list of Assets. The return value is the progress status of the merge operation. """ headers = { HEADER_TOKEN: self.token, "Content-Type": "application/xml;charset=UTF-8", "accept": "text/plain;charset=UTF-8", } merge_object = xml.etree.ElementTree.Element("MergeAction", {"xmlns": self.entity_ns, "xmlns:xip": self.xip_ns}) xml.etree.ElementTree.SubElement(merge_object, "Title").text = str(title) xml.etree.ElementTree.SubElement(merge_object, "Description").text = str(description) for a in assets: xml.etree.ElementTree.SubElement(merge_object, "Entity", { "excludeIdentifiers": "true", "excludeLinks": "true", "excludeMetadata": "true", "ref": a.reference, "type": EntityType.ASSET.value} ) # order_object = xml.etree.ElementTree.SubElement(merge_object, "Order") # for a in assets: # xml.etree.ElementTree.SubElement(order_object, "Entity", { # "ref": a.reference, # "type": EntityType.CONTENT_OBJECT.value} # ) xml_request = xml.etree.ElementTree.tostring(merge_object, encoding="utf-8") print(xml_request) request = self.session.post( f"{self.protocol}://{self.server}/api/entity/actions/merges", data=xml_request, headers=headers) if request.status_code == requests.codes.accepted: return request.content.decode('utf-8') else: exception = HTTPException( "", request.status_code, request.url, "merge_assets", request.content.decode("utf-8"), ) logger.error(exception) raise exception
[docs] def merge_folder(self, folder: Folder) -> str: """ Create a new Asset with the content from each Asset in the Folder This call will create a new multipart Asset which contains all the content from the Folder. The new Asset which is created will have the same title, description and parent as the Folder. The return value is the progress status of the merge operation. """ headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8', 'accept': 'text/plain;charset=UTF-8'} payload = f"""<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <MergeAction xmlns="{self.entity_ns}" xmlns:xip="{self.xip_ns}"> <Title>{folder.title}</Title> <Description>{folder.description}</Description> <Entity excludeIdentifiers="true" excludeLinks="true" excludeMetadata="true" ref="{folder.reference}" type="SO"/> </MergeAction>""" request = self.session.post( f"{self.protocol}://{self.server}/api/entity/actions/merges", data=payload, headers=headers) if request.status_code == requests.codes.accepted: return request.content.decode('utf-8') else: exception = HTTPException( folder.reference, request.status_code, request.url, "merge_folder", request.content.decode("utf-8"), ) logger.error(exception) raise exception
[docs] def xml_asset(self, reference: str) -> str: """ Retrieve an Asset by its reference Returns an XML document of the full Asset :param reference: The unique identifier of the entity """ headers = {HEADER_TOKEN: self.token} params = {"expand": "structure"} request = self.session.get(f'{self.protocol}://{self.server}/api/entity/{IO_PATH}/{reference}', params=params, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) return xml_response elif request.status_code == requests.codes.not_found: exception = ReferenceNotFoundException(reference, request.status_code, request.url, "xml_asset") logger.error(exception) raise exception else: exception = HTTPException(reference, request.status_code, request.url, "xml_asset", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def asset(self, reference: str) -> Asset: """ Returns an asset object back by its internal reference identifier :param reference: The unique identifier for the asset usually its uuid :type reference: str :return: The Asset object :rtype: Asset :raises RuntimeError: if the identifier is incorrect """ headers = {HEADER_TOKEN: self.token} request = self.session.get(f'{self.protocol}://{self.server}/api/entity/{IO_PATH}/{reference}', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity = self.entity_from_string(xml_response) asset = Asset(entity['reference'], entity['title'], entity['description'], entity['security_tag'], entity['parent'], entity['metadata']) if 'CustomType' in entity: asset.custom_type = entity['CustomType'] return asset elif request.status_code == requests.codes.not_found: exception = ReferenceNotFoundException(reference, request.status_code, request.url, "asset") logger.error(exception) raise exception else: exception = HTTPException(reference, request.status_code, request.url, "asset", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def folder(self, reference: str) -> Folder: """ Returns a folder object back by its internal reference identifier :param reference: The unique identifier for the folder usually its uuid :type reference: str :return: The Folder object :rtype: Folder :raises RuntimeError: if the identifier is incorrect """ headers = {HEADER_TOKEN: self.token} request = self.session.get(f'{self.protocol}://{self.server}/api/entity/{SO_PATH}/{reference}', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity = self.entity_from_string(xml_response) folder = Folder(entity['reference'], entity['title'], entity['description'], entity['security_tag'], entity['parent'], entity['metadata']) if 'CustomType' in entity: folder.custom_type = entity['CustomType'] return folder elif request.status_code == requests.codes.not_found: exception = ReferenceNotFoundException(reference, request.status_code, request.url, "folder") logger.error(exception) raise exception else: exception = HTTPException(reference, request.status_code, request.url, "folder", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def content_object(self, reference: str) -> ContentObject: """ Returns a content object back by its internal reference identifier :param reference: The unique identifier for the content object usually its uuid :type reference: str :return: The content object :rtype: ContentObject :raises RuntimeError: if the identifier is incorrect """ headers = {HEADER_TOKEN: self.token} request = self.session.get(f'{self.protocol}://{self.server}/api/entity/{CO_PATH}/{reference}', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity = self.entity_from_string(xml_response) content_object = ContentObject(entity['reference'], entity['title'], entity['description'], entity['security_tag'], entity['parent'], entity['metadata']) if 'CustomType' in entity: content_object.custom_type = entity['CustomType'] return content_object elif request.status_code == requests.codes.not_found: exception = ReferenceNotFoundException(reference, request.status_code, request.url, "content_object") logger.error(exception) raise exception else: exception = HTTPException(reference, request.status_code, request.url, "content_object", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def content_objects(self, representation: Representation) -> list[ContentObject]: """ Return a list of content objects for a representation :param representation: The representation :type representation: Representation :return: List of content objects :rtype: list(ContentObject) """ headers = {HEADER_TOKEN: self.token} if not isinstance(representation, Representation): logger.warning("representation is not of type Representation") return [] request = self.session.get(f'{representation.url}', headers=headers) if request.status_code == requests.codes.ok: results = [] xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) content_objects = entity_response.findall(f'.//{{{self.xip_ns}}}Representation/' f'{{{self.xip_ns}}}ContentObjects/{{{self.xip_ns}}}ContentObject') for co in content_objects: content_object = self.content_object(co.text) content_object.representation_type = representation.rep_type content_object.asset = representation.asset results.append(content_object) return results else: exception = HTTPException(representation.name, request.status_code, request.url, "content_objects", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def generation(self, url: str, content_ref: str = None) -> Generation: """ Retrieve a list of generation objects :param url: :param content_ref: :return: Generation :rtype: Generation """ headers = {HEADER_TOKEN: self.token} request = self.session.get(url, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) ge = entity_response.find(f'.//{{{self.xip_ns}}}Generation') format_group = entity_response.find(f'.//{{{self.xip_ns}}}FormatGroup') effective_date = entity_response.find(f'.//{{{self.xip_ns}}}EffectiveDate') formats = entity_response.findall(f'.//{{{self.xip_ns}}}Formats/{{{self.xip_ns}}}Format') formats_list = [] for tech_format in formats: format_dict = {'Valid': tech_format.attrib['valid']} puid = tech_format.find(f'.//{{{self.xip_ns}}}PUID') format_dict['PUID'] = puid.text if hasattr(puid, 'text') else None priority = tech_format.find(f'.//{{{self.xip_ns}}}Priority') format_dict['Priority'] = priority.text if hasattr(priority, 'text') else None method = tech_format.find(f'.//{{{self.xip_ns}}}IdentificationMethod') format_dict['IdentificationMethod'] = method.text if hasattr(method, 'text') else None name = tech_format.find(f'.//{{{self.xip_ns}}}FormatName') format_dict['FormatName'] = name.text if hasattr(name, 'text') else None version = tech_format.find(f'.//{{{self.xip_ns}}}FormatVersion') format_dict['FormatVersion'] = version.text if hasattr(version, 'text') else None formats_list.append(format_dict) index = int(url.rsplit("/", 1)[-1]) properties = entity_response.findall(f'.//{{{self.xip_ns}}}Properties/{{{self.xip_ns}}}Property') property_set = [] for tech_props in properties: tech_props_dict = {} puid = tech_props.find(f'.//{{{self.xip_ns}}}PUID') tech_props_dict['PUID'] = puid.text if hasattr(puid, 'text') else None name = tech_props.find(f'.//{{{self.xip_ns}}}PropertyName') tech_props_dict['PropertyName'] = name.text if hasattr(name, 'text') else None value = tech_props.find(f'.//{{{self.xip_ns}}}Value') tech_props_dict['Value'] = value.text if hasattr(value, 'text') else None property_set.append(tech_props_dict) bitstreams = entity_response.findall(f'./{{{self.entity_ns}}}Bitstreams/{{{self.entity_ns}}}Bitstream') bitstream_list = [] for bit in bitstreams: bs: Bitstream = self.bitstream(bit.text) bs.gen_index = index if content_ref is not None: bs.co_ref = content_ref bitstream_list.append(bs) generation = Generation(strtobool(ge.attrib['original']), strtobool(ge.attrib['active']), format_group.text if hasattr(format_group, 'text') else None, effective_date.text if hasattr(effective_date, 'text') else None, bitstream_list) generation.formats = formats_list generation.properties = property_set generation.gen_index = index return generation else: exception = HTTPException(url, request.status_code, request.url, "generation", request.content.decode('utf-8')) logger.error(exception) raise exception
def _integrity_checks(self, bitstream: Bitstream, maximum: int = 10, next_page: str = None): headers = {HEADER_TOKEN: self.token} if next_page is None: url = re.sub('content$', f'integrity-check-history', bitstream.content_url) params = {'start': '0', 'max': str(maximum)} request = self.session.get(url, headers=headers, params=params) else: request = self.session.get(next_page, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) histories = entity_response.findall(f'.//{{{self.entity_ns}}}IntegrityCheckHistory') next_url = entity_response.find(f'.//{{{self.entity_ns}}}Paging/{{{self.entity_ns}}}Next') total_hits = entity_response.find(f'.//{{{self.entity_ns}}}Paging/{{{self.entity_ns}}}TotalResults') results = [] for history in histories: xip_type = history.find(f'./{{{self.xip_ns}}}Type') xip_success = history.find(f'./{{{self.xip_ns}}}Success') xip_date = history.find(f'./{{{self.xip_ns}}}Date') xip_adapter_name = history.find(f'./{{{self.xip_ns}}}AdapterName') xip_fixed = history.find(f'./{{{self.xip_ns}}}Fixed') xip_reason = history.find(f'./{{{self.xip_ns}}}Reason') check = IntegrityCheck(xip_type.text if hasattr(xip_type, 'text') else None, xip_success.text if hasattr(xip_success, 'text') else None, xip_date.text if hasattr(xip_date, 'text') else None, xip_adapter_name.text if hasattr(xip_adapter_name, 'text') else None, xip_fixed.text if hasattr(xip_fixed, 'text') else None, xip_reason.text if hasattr(xip_reason, 'text') else None) results.append(check) has_more = True url = None if next_url is None: has_more = False else: url = next_url.text return PagedSet(results, has_more, int(total_hits.text), url) else: exception = HTTPException(bitstream.filename, request.status_code, request.url, "_integrity_checks", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def integrity_checks(self, bitstream: Bitstream) -> Generator: """ Return integrity checks for a bitstream """ maximum = 10 paged_set = self._integrity_checks(bitstream=bitstream, maximum=maximum, next_page=None) for entity in paged_set.results: yield entity while paged_set.has_more: paged_set = self._integrity_checks(bitstream=bitstream, maximum=maximum, next_page=paged_set.next_page) for entity in paged_set.results: yield entity
[docs] def bitstream(self, url: str) -> Bitstream: """ Fetch a bitstream object from the server using its URL :param url: The URL to the bitstream :type url: str :return: a bitstream object :rtype: Bitstream """ headers = {HEADER_TOKEN: self.token} request = self.session.get(url, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) logger.debug(xml_response) filename = entity_response.find(f'.//{{{self.xip_ns}}}Filename') filesize = entity_response.find(f'.//{{{self.xip_ns}}}FileSize') fixity_values = entity_response.findall(f'.//{{{self.xip_ns}}}Fixity') content = entity_response.find(f'.//{{{self.entity_ns}}}Content') index = int(url.rsplit("/", 1)[-1]) fixity = {} for f in fixity_values: fixity[f[0].text] = f[1].text bitstream = Bitstream(filename.text if hasattr(filename, 'text') else None, int(filesize.text) if hasattr(filesize, 'text') else None, fixity, content.text if hasattr(content, 'text') else None) bitstream.bs_index = index return bitstream else: exception = HTTPException(url, request.status_code, request.url, "bitstream", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def replace_generation_sync(self, content_object: ContentObject, file_name, fixity_algorithm=None, fixity_value=None) -> str: """ Replace the last active generation of a content object with a new digital file. Starts the workflow and blocks until the workflow completes. :param ContentObject content_object: The content object to replace :param str file_name: The path to the new content object :param str fixity_algorithm: Optional fixity algorithm :param str fixity_value: Optional fixity value :return: Completed workflow status :rtype: str """ status = "ACTIVE" pid = self.replace_generation_async(content_object=content_object, file_name=file_name, fixity_algorithm=fixity_algorithm, fixity_value=fixity_value) while status == "ACTIVE": status = self.get_async_progress(pid) return status
[docs] def replace_generation_async(self, content_object: ContentObject, file_name, fixity_algorithm=None, fixity_value=None) -> str: """ Replace the last active generation of a content object with a new digital file. Starts the workflow and returns a process ID :param ContentObject content_object: The content object to replace :param str file_name: The path to the new content object :param str fixity_algorithm: Optional fixity algorithm :param str fixity_value: Optional fixity value :return: Process ID :rtype: str """ if (self.major_version < 7) and (self.minor_version < 2) and (self.patch_version < 1): raise RuntimeError("replace API call is only available when connected to a v6.2.1 System") headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'} params = {"replaceType": "previous"} if fixity_algorithm is None: generation = self.generations(content_object=content_object).pop() bitstream = generation.bitstreams.pop() for algo, value in bitstream.fixity.items(): fixity_algorithm = algo if "MD5" in fixity_algorithm.upper(): fixity_value = FileHash(hashlib.md5)(file_name) if "SHA1" in fixity_algorithm.upper() or "SHA-1" in fixity_algorithm.upper(): fixity_value = FileHash(hashlib.sha1)(file_name) if "SHA256" in fixity_algorithm.upper() or "SHA-256" in fixity_algorithm.upper(): fixity_value = FileHash(hashlib.sha256)(file_name) if "SHA512" in fixity_algorithm.upper() or "SHA-512" in fixity_algorithm.upper(): fixity_value = FileHash(hashlib.sha512)(file_name) if fixity_algorithm and fixity_value: if "MD5" in fixity_algorithm.upper(): headers["Fixity-MD5"] = fixity_value if "SHA1" in fixity_algorithm.upper() or "SHA-1" in fixity_algorithm.upper(): headers["Fixity-SHA-1"] = fixity_value if "SHA256" in fixity_algorithm.upper() or "SHA-256" in fixity_algorithm.upper(): headers["Fixity-SHA-256"] = fixity_value if "SHA512" in fixity_algorithm.upper() or "SHA-512" in fixity_algorithm.upper(): headers["Fixity-SHA-512"] = fixity_value headers["Filename"] = os.path.basename(file_name) headers['Content-Length'] = str(os.path.getsize(file_name)) with open(file_name, 'rb') as f: request = self.session.post( f'{self.protocol}://{self.server}/api/entity/{CO_PATH}/{content_object.reference}/generations', params=params, data=f, headers=headers) if request.status_code == requests.codes.ok: return str(request.content.decode('utf-8')) else: exception = HTTPException(content_object.reference, request.status_code, request.url, "replace_generation_async", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def generations(self, content_object: ContentObject) -> list[Generation]: """ Return a list of Generation objects for a content object :param content_object: The content object :type content_object: ContentObject :return: list of generations :rtype: list(Generation) """ headers = {HEADER_TOKEN: self.token} request = self.session.get( f'{self.protocol}://{self.server}/api/entity/{CO_PATH}/{content_object.reference}/generations', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) generations = entity_response.findall(f'.//{{{self.entity_ns}}}Generation') result = [] for g in generations: if hasattr(g, 'text'): generation = self.generation(g.text, content_object.reference) generation.asset = content_object.asset generation.content_object = content_object generation.representation_type = content_object.representation_type result.append(generation) return result else: exception = HTTPException(content_object.reference, request.status_code, request.url, "generations", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def bitstreams_for_asset(self, asset: Union[Asset, Entity]) -> Iterable[Bitstream]: """ Return all the active bitstreams within an asset. This includes all the representations and content objects :param asset: The asset :return: Iterable """ for representation in self.representations(asset): for content_object in self.content_objects(representation): for generation in self.generations(content_object): if generation.active: for bitstream in generation.bitstreams: bitstream.representation = representation bitstream.content_object = content_object bitstream.generation = generation yield bitstream
[docs] def representations(self, asset: Asset) -> set[Representation]: """ Return a set of representations for the asset Representations are used to define how the information object are composed in terms of technology and structure. :param asset: The asset containing the required representations :type asset: Asset :return: Set of Representation objects :rtype: set(Representation) """ headers = {HEADER_TOKEN: self.token} if not isinstance(asset, Asset): return set() request = self.session.get( f'{self.protocol}://{self.server}/api/entity/{asset.path}/{asset.reference}/representations', headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) representations = entity_response.findall(f'.//{{{self.entity_ns}}}Representation') result = set() for r in representations: representation = Representation(asset, r.get('type'), r.get("name", None), r.text) result.add(representation) return result else: exception = HTTPException(asset.reference, request.status_code, request.url, "representations", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def remove_thumbnail(self, entity: Entity): """ Remove the thumbnail for the entity to the uploaded image :param entity: The entity with the thumbnail :type entity: Entity """ if self.major_version < 7 and self.minor_version < 2: raise RuntimeError("Thumbnail API is only available when connected to a v6.2 System") if isinstance(entity, ContentObject): raise RuntimeError("Thumbnails cannot be added to Content Objects") headers = {HEADER_TOKEN: self.token} request = self.session.delete( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/preview', headers=headers) if request.status_code == requests.codes.no_content: return str(request.content.decode('utf-8')) else: exception = HTTPException(entity.reference, request.status_code, request.url, "remove_thumbnail", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def add_access_representation(self, entity: Entity, access_file: str, name: str = "Access"): """ Add a new Access representation to an existing asset. :param Entity entity: The existing asset which will receive the new representation :param str access_file: The new digital file :param str name: The name of the new access representation defaults to "Access" :return: """ if self.major_version < 7 and self.minor_version < 12: raise RuntimeError("Add Representation API is only available when connected to a v6.12 System") if isinstance(entity, Folder) or isinstance(entity, ContentObject): raise RuntimeError("Add Representation cannot be added to Folders and Content Objects") headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'} filename = os.path.basename(access_file) params = {'type': 'Access', 'name': name, 'filename': filename} with open(access_file, 'rb') as fd: request = self.session.post( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/representations', data=fd, headers=headers, params=params) if request.status_code == requests.codes.accepted: return str(request.content.decode('utf-8')) else: exception = HTTPException(entity.reference, request.status_code, request.url, "add_access_representation", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def add_thumbnail(self, entity: Entity, image_file: str): """ Set the thumbnail for the entity to the uploaded image Supported image formats are png, jpeg, tiff, gif and bmp. The image must be 10MB or less in size. :param Entity entity: The entity :param str image_file: The path to the image """ if self.major_version < 7 and self.minor_version < 2: raise RuntimeError("Thumbnail API is only available when connected to a v6.2 System") if isinstance(entity, ContentObject): raise RuntimeError("Thumbnails cannot be added to Content Objects") headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'} with open(image_file, 'rb') as fd: request = self.session.put( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/preview', data=fd, headers=headers) if request.status_code == requests.codes.no_content: return str(request.content.decode('utf-8')) else: exception = HTTPException(entity.reference, request.status_code, request.url, "add_thumbnail", request.content.decode('utf-8')) logger.error(exception) raise exception
def _event_actions(self, entity: Entity, maximum: int): """ event actions performed against this entity """ if self.major_version < 7 and self.minor_version < 1: logger.error("Entity events is only available when connected to a v6.1 System") raise RuntimeError("Entity events is only available when connected to a v6.1 System") headers = {HEADER_TOKEN: self.token} params = {'start': str(0), 'max': str(maximum)} request = self.session.get( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/event-actions', params=params, headers=headers) if request.status_code == requests.codes.ok: return None else: exception = HTTPException(entity.reference, request.status_code, request.url, "_event_actions", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def all_descendants(self, folder: Union[Folder, str, Entity, None] = None) -> Generator[Entity, None, None]: """ Return all child entities recursively of a folder or repository down to the assets using a lazy iterator. The paging is done internally using a default page size of 100 elements. Callers can iterate over the result to get all children with a single call. :param str folder: The parent folder reference, None for the children of root folders :return: A set of entity objects (Folders and Assets) :rtype: set(Entity) """ for entity in self.descendants(folder=folder): yield entity if entity.entity_type == EntityType.FOLDER: yield from self.all_descendants(folder=entity)
[docs] def descendants(self, folder: Union[Folder, str, Entity, None] = None) -> Generator[Entity, None, None]: """ Return the immediate child entities of a folder using a lazy iterator. The paging is done internally using a default page size of 100 elements. Callers can iterate over the result to get all children with a single call. :param str folder: The parent folder reference, None for the children of root folders :return: A set of entity objects (Folders and Assets) :rtype: set(Entity) """ maximum = 100 paged_set = self.children(folder, maximum=maximum, next_page=None) for entity in paged_set.results: yield entity while paged_set.has_more: paged_set = self.children(folder, maximum=maximum, next_page=paged_set.next_page) for entity in paged_set.results: yield entity
[docs] def children(self, folder: Union[str, Folder, None] = None, maximum: int = 100, next_page: str = None) -> PagedSet: """ Return the child entities of a folder one page at a time. The caller is responsible for requesting the next page of results. This function is deprecated, use descendants instead as the paging is automatic :param str folder: The parent folder reference, None for the children of root folders :param int maximum: The maximum size of the result set in each page :param str next_page: A URL for the next page of results :return: A set of entity objects :rtype: set(Entity) """ headers = {HEADER_TOKEN: self.token} data = {'start': str(0), 'max': str(maximum)} if isinstance(folder, Folder): folder_reference = folder.reference else: folder_reference = folder if next_page is None: if folder_reference is None: request = self.session.get(f'{self.protocol}://{self.server}/api/entity/root/children', params=data, headers=headers) else: if hasattr(folder, "reference"): folder_reference = folder.reference request = self.session.get( f'{self.protocol}://{self.server}/api/entity/structural-objects/{folder_reference}/children', params=data, headers=headers) else: request = self.session.get(next_page, headers=headers) logger.debug(request.url) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) children = entity_response.findall(f'.//{{{self.entity_ns}}}Child') result = set() next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next') total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults') for child in children: if child.attrib['type'] == EntityType.FOLDER.value: folder = Folder(child.attrib['ref'], child.attrib['title'], None, None, folder_reference, None) result.add(folder) else: asset = Asset(child.attrib['ref'], child.attrib['title'], None, None, folder_reference, None) result.add(asset) has_more = True url = None if next_url is None: has_more = False else: url = next_url.text return PagedSet(result, has_more, int(total_hits.text), url) else: exception = HTTPException(folder_reference, request.status_code, request.url, "children", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def all_ingest_events(self, previous_days: int = 1) -> Generator: """ Returns a list of ingest only events for the user's tenancy This method uses a generator function to make repeated calls to the server for every page of results. :param int previous_days: The number of days to look back for events :return: A generator of events :rtype: Generator """ self.token = self.__token__() previous = datetime.utcnow() - timedelta(days=previous_days) from_date = previous.replace(tzinfo=timezone.utc).isoformat() to_date = datetime.utcnow().replace(tzinfo=timezone.utc).isoformat() paged_set = self._all_events_page(maximum=25, next_page=None, type="Ingest", from_date=from_date, to_date=to_date) for entity in paged_set.results: yield entity while paged_set.has_more: paged_set = self._all_events_page(maximum=25, next_page=paged_set.next_page, type="Ingest", from_date=from_date, to_date=to_date) for entity in paged_set.results: yield entity
[docs] def all_events(self) -> Generator: """ Returns a list of events for the user's tenancy This method uses a generator function to make repeated calls to the server for every page of results. :return: A generator of events :rtype: Generator """ self.token = self.__token__() paged_set = self._all_events_page() for entity in paged_set.results: yield entity while paged_set.has_more: paged_set = self._all_events_page(next_page=paged_set.next_page) for entity in paged_set.results: yield entity
def _entity_from_event_page(self, event_id: str, maximum: int = 25, next_page: str = None): headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} if next_page is None: url = f'{self.protocol}://{self.server}/api/entity/events/{event_id}/event-actions' response = requests.get(url, params={'start': 0, 'max': maximum}, headers=headers) else: response = requests.get(next_page, headers=headers) if response.status_code == 200: xml_response = str(response.content.decode('utf-8')) entity_response = xml.etree.ElementTree.fromstring(xml_response) actions = entity_response.findall(f'.//{{{self.xip_ns}}}EventAction') result_list = [] for action in actions: item: dict = {} event = action.find(f'.//{{{self.xip_ns}}}Event') event_type = event.attrib["type"] item['EventType'] = event_type entity_date = action.find(f'.//{{{self.xip_ns}}}Date') item['Date'] = entity_date.text entity_ref = action.find(f'.//{{{self.xip_ns}}}Entity') item['Entity'] = entity_ref.text result_list.append(item) next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next') total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults') has_more = True url = None if next_url is None: has_more = False else: url = next_url.text return PagedSet(result_list, has_more, int(total_hits.text), url) return None
[docs] def entity_from_event(self, event_id: str) -> Generator: """ Returns an entity from the user's tenancy :rtype: Generator """ self.token = self.__token__() paged_set = self._entity_from_event_page(event_id, 25, None) for entity in paged_set.results: yield entity while paged_set.has_more: paged_set = self._entity_from_event_page(event_id, 25, next_page=paged_set.next_page) for entity in paged_set.results: yield entity
def _all_events_page(self, maximum: int = 25, next_page: str = None, **kwargs) -> PagedSet: """ event actions performed against this repository """ headers = {HEADER_TOKEN: self.token} params = {'start': str(0), 'max': str(maximum)} if "type" in kwargs: params["types"] = kwargs.get("type") if "from_date" in kwargs: params["from"] = kwargs.get("from_date") if "to_date" in kwargs: params["to"] = kwargs.get("to_date") if "username" in kwargs: params["username"] = kwargs.get("username") if next_page is None: request = self.session.get(f'{self.protocol}://{self.server}/api/entity/events', params=params, headers=headers) else: request = self.session.get(next_page, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) events = entity_response.findall(f'.//{{{self.xip_ns}}}Event') result_list = [] for event in events: result = {'eventType': event.attrib['type']} date_node = event.find(f'.//{{{self.xip_ns}}}Date') result['Date'] = date_node.text if hasattr(date_node, 'text') else None user_node = event.find(f'.//{{{self.xip_ns}}}User') result['User'] = user_node.text if hasattr(user_node, 'text') else None ref_node = event.find(f'.//{{{self.xip_ns}}}Ref') result['Ref'] = ref_node.text if hasattr(ref_node, 'text') else None workflow_name = event.find(f'.//{{{self.xip_ns}}}WorkflowName') if workflow_name is not None: result['WorkflowName'] = workflow_name.text workflow_instance_id = event.find(f'.//{{{self.xip_ns}}}WorkflowInstanceId') if workflow_instance_id is not None: result['WorkflowInstanceId'] = workflow_instance_id.text serialised_command = event.find(f'.//{{{self.xip_ns}}}SerialisedCommand') if serialised_command is not None: result['SerialisedCommand'] = serialised_command.text result_list.append(result) next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next') total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults') has_more = True url = None if next_url is None: has_more = False else: url = next_url.text return PagedSet(result_list, has_more, int(total_hits.text), url) else: exception = HTTPException("", request.status_code, request.url, "_all_events_page", request.content.decode('utf-8')) logger.error(exception) raise exception def _entity_events_page(self, entity: Entity, maximum: int = 25, next_page: str = None) -> PagedSet: """ event actions performed against this entity """ if self.major_version < 7 and self.minor_version < 1: logger.error("Entity events is only available when connected to a v6.1 System") raise RuntimeError("Entity events is only available when connected to a v6.1 System") headers = {HEADER_TOKEN: self.token} params = {'start': str(0), 'max': str(maximum)} if next_page is None: request = self.session.get( f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/event-actions', params=params, headers=headers) else: request = self.session.get(next_page, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) event_actions = entity_response.findall(f'.//{{{self.xip_ns}}}EventAction') result_list = [] for event_action in event_actions: result = {'commandType': ''} if 'commandType' in event_action.attrib: result['commandType'] = event_action.attrib['commandType'] event = event_action.find(f'.//{{{self.xip_ns}}}Event') if 'type' in event_action.attrib: result['eventType'] = event.attrib['type'] date_node = event.find(f'.//{{{self.xip_ns}}}Date') if date_node is not None: result['Date'] = date_node.text user_node = event.find(f'.//{{{self.xip_ns}}}User') if user_node is not None: result['User'] = user_node.text ref_node = event.find(f'.//{{{self.xip_ns}}}Ref') if ref_node is not None: result['Ref'] = ref_node.text workflow_name = event.find(f'.//{{{self.xip_ns}}}WorkflowName') if workflow_name is not None: result['WorkflowName'] = workflow_name.text workflow_instance_id = event.find(f'.//{{{self.xip_ns}}}WorkflowInstanceId') if workflow_instance_id is not None: result['WorkflowInstanceId'] = workflow_instance_id.text serialised_command = event_action.find(f'.//{{{self.xip_ns}}}SerialisedCommand') if serialised_command is not None: result['SerialisedCommand'] = serialised_command.text result_list.append(result) next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next') total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults') has_more = True url = None if next_url is None: has_more = False else: url = next_url.text return PagedSet(result_list, has_more, int(total_hits.text), url) else: exception = HTTPException(entity.reference, request.status_code, request.url, "_all_events_page", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def entity_events(self, entity: Entity) -> Generator: """ Returns a list of event actions performed against this entity This method uses a generator function to make repeated calls to the server for every page of results. :param Entity entity: The entity :return: A list of events :rtype: list """ self.token = self.__token__() paged_set = self._entity_events_page(entity) for entity in paged_set.results: yield entity while paged_set.has_more: paged_set = self._entity_events_page(entity, next_page=paged_set.next_page) for entity in paged_set.results: yield entity
[docs] def updated_entities(self, previous_days: int = 1) -> Generator: """ Fetch a list of entities which have changed (been updated) over the previous n days. This method uses a generator function to make repeated calls to the server for every page of results. :param int previous_days: The number of days to check for changes. :return: A list of entities :rtype: list """ self.token = self.__token__() maximum = 25 paged_set = self._updated_entities_page(previous_days=previous_days, maximum=maximum, next_page=None) for entity in paged_set.results: yield entity while paged_set.has_more: paged_set = self._updated_entities_page(previous_days=previous_days, maximum=maximum, next_page=paged_set.next_page) for entity in paged_set.results: yield entity
def _updated_entities_page(self, previous_days: int = 1, maximum: int = 50, next_page: str = None) -> PagedSet: headers = {HEADER_TOKEN: self.token} x = datetime.utcnow() - timedelta(days=previous_days) today = x.replace(tzinfo=timezone.utc).isoformat() if next_page is None: params = {'date': today, 'start': '0', 'max': str(maximum)} request = self.session.get(f'{self.protocol}://{self.server}/api/entity/entities/updated-since', headers=headers, params=params) else: request = self.session.get(next_page, headers=headers) if request.status_code == requests.codes.ok: xml_response = str(request.content.decode('utf-8')) logger.debug(xml_response) entity_response = xml.etree.ElementTree.fromstring(xml_response) entities = entity_response.findall(f'.//{{{self.entity_ns}}}Entity') result = [] for entity in entities: if 'type' in entity.attrib: if entity.attrib['type'] == EntityType.FOLDER.value: folder = Folder(entity.attrib['ref'], entity.attrib['title'], None, None, None, None) result.append(folder) elif entity.attrib['type'] == EntityType.ASSET.value: asset = Asset(entity.attrib['ref'], entity.attrib['title'], None, None, None, None) result.append(asset) elif entity.attrib['type'] == EntityType.CONTENT_OBJECT.value: co = ContentObject(entity.attrib['ref'], entity.attrib['title'], None, None, None, None) result.append(co) next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next') total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults') has_more = True url = None if next_url is None: has_more = False else: url = next_url.text return PagedSet(result, has_more, int(total_hits.text), url) else: exception = HTTPException(previous_days, request.status_code, request.url, "_updated_entities_page", request.content.decode('utf-8')) logger.error(exception) raise exception
[docs] def delete_asset(self, asset: Asset, operator_comment: str, supervisor_comment: str, credentials_path: str = "credentials.properties"): """ Initiate and approve the deletion of an asset. :param credentials_path: :type credentials_path: :param Asset asset: The asset to delete :param str operator_comment: The comments from the operator which are added to the logs :param str supervisor_comment: The comments from the supervisor which are added to the logs :return: The asset reference :rtype: str """ if isinstance(asset, Asset): return self._delete_entity(asset, operator_comment, supervisor_comment, credentials_path) else: raise RuntimeError("delete_asset only deletes assets")
[docs] def delete_folder(self, folder: Folder, operator_comment: str, supervisor_comment: str, credentials_path: str = "credentials.properties"): """ Initiate and approve the deletion of a folder. :param Folder folder: The folder to delete :param str operator_comment: The comments from the operator which are added to the logs :param str supervisor_comment: The comments from the supervisor which are added to the logs :return: The folder reference :rtype: str """ if isinstance(folder, Folder): return self._delete_entity(folder, operator_comment, supervisor_comment, credentials_path) else: raise RuntimeError("delete_folder only deletes folders")
def _delete_entity(self, entity: Entity, operator_comment: str, supervisor_comment: str, credentials_path: str = "credentials.properties"): """ Delete an asset from the repository :param entity: The entity :param operator_comment: The comment on the deletion """ # check manager password is available: config = configparser.ConfigParser() config.read(credentials_path, encoding='utf-8') try: manager_username = config['credentials']['manager.username'] manager_password = config['credentials']['manager.password'] self.manager_token(manager_username, manager_password) except KeyError: raise RuntimeError("No manager password set in credentials.properties") self.token = self.__token__() headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'} xml_object = xml.etree.ElementTree.Element('DeletionAction', {"xmlns:xip": self.xip_ns, "xmlns": self.entity_ns}) submission_el = xml.etree.ElementTree.SubElement(xml_object, "Submission") comment_el = xml.etree.ElementTree.SubElement(submission_el, "Comment") comment_el.text = operator_comment xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8') logger.debug(xml_request) request = self.session.delete(f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}', data=xml_request, headers=headers) logger.debug(request.content.decode("utf-8")) if request.status_code == requests.codes.accepted: progress = request.content.decode("utf-8") req = self.session.get(f"{self.protocol}://{self.server}/api/entity/progress/{progress}", headers=headers) while True: if req.status_code == requests.codes.ok: entity_response = xml.etree.ElementTree.fromstring(req.content.decode("utf-8")) status = entity_response.find(".//{http://status.preservica.com}Status") if hasattr(status, 'text'): if status.text == "COMPLETED": return entity.reference if status.text == "PENDING": headers = {HEADER_TOKEN: self.manager_token(manager_username, manager_password), 'Content-Type': 'application/xml;charset=UTF-8'} xml_object = xml.etree.ElementTree.Element('DeletionAction ', {"xmlns:xip": self.xip_ns, "xmlns": self.entity_ns}) approval_el = xml.etree.ElementTree.SubElement(xml_object, "Approval") xml.etree.ElementTree.SubElement(approval_el, "Approved").text = "true" xml.etree.ElementTree.SubElement(approval_el, "Comment").text = supervisor_comment xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8') logger.debug(xml_request) approve = self.session.put( f"{self.protocol}://{self.server}/api/entity/actions/deletions/{progress}", data=xml_request, headers=headers) if approve.status_code == requests.codes.accepted: return entity.reference else: logger.error(approve.content.decode('utf-8')) raise RuntimeError(approve.status_code, "delete_asset failed during approval") sleep(2.0) req = self.session.get(f"{self.protocol}://{self.server}/api/entity/progress/{progress}", headers=headers) if request.status_code == requests.codes.unprocessable: logger.error(request.content.decode('utf-8')) raise RuntimeError(request.status_code, "no active workflow context for full deletion exists in the system") if request.status_code == requests.codes.forbidden: logger.error(request.content.decode('utf-8')) raise RuntimeError(request.status_code, "User doesn't have deletion rights on the " "entity or the required operator role to evaluate a deletion") exception = HTTPException(entity.reference, request.status_code, request.url, "_delete_entity", request.content.decode('utf-8')) logger.error(exception) raise exception