"""
pyPreservica EntityAPI module definition
A client library for the Preservica Repository web services Entity API
https://us.preservica.com/api/entity/documentation.html
author: James Carr
licence: Apache License 2.0
"""
import hashlib
import os.path
import uuid
import xml.etree.ElementTree
from datetime import datetime, timedelta, timezone
from io import BytesIO
from time import sleep
from typing import Any, Generator, Tuple, Iterable, Union
from pyPreservica.common import *
logger = logging.getLogger(__name__)
[docs]
class EntityAPI(AuthenticatedAPI):
"""
A class for the Preservica Repository web services Entity API
https://us.preservica.com/api/entity/documentation.html
The EntityAPI allows users to interact with the Preservica repository
"""
def __init__(self, username: str = None, password: str = None, tenant: str = None, server: str = None,
use_shared_secret: bool = False, two_fa_secret_key: str = None, protocol: str = "https"):
super().__init__(username, password, tenant, server, use_shared_secret, two_fa_secret_key, protocol)
xml.etree.ElementTree.register_namespace("oai_dc", "http://www.openarchives.org/OAI/2.0/oai_dc/")
xml.etree.ElementTree.register_namespace("ead", "urn:isbn:1-931666-22-9")
def user_security_tags(self, with_permissions: bool = False) -> dict:
"""
Return security tags available for the current user
:param with_permissions: Return the permissions for each security tag
:type with_permissions: bool
:return: dict of security tags
:rtype: dict
"""
return self.security_tags_base(with_permissions=with_permissions)
def bitstream_chunks(self, bitstream: Bitstream, chunk_size: int = CHUNK_SIZE) -> Generator:
"""
Generator function to return bitstream chunks
:param bitstream: The bitstream
:param chunk_size: The chunk size to return
:return: A chunk of the requested bitstream content
"""
if not isinstance(bitstream, Bitstream):
logger.error("bitstream_content argument is not a Bitstream object")
raise RuntimeError("bitstream_bytes argument is not a Bitstream object")
with self.session.get(bitstream.content_url, headers={HEADER_TOKEN: self.token}, stream=True) as request:
if request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.bitstream_chunks(bitstream)
elif request.status_code == requests.codes.ok:
for chunk in request.iter_content(chunk_size=chunk_size):
yield chunk
else:
exception = HTTPException(bitstream.filename, request.status_code, request.url, "bitstream_content",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def bitstream_bytes(self, bitstream: Bitstream, chunk_size: int = CHUNK_SIZE) -> Union[BytesIO, None]:
"""
Download a file represented as a Bitstream to a byteIO array
Returns the byteIO
Returns None if the file does not contain the correct number of bytes (default 2k)
:param chunk_size: The buffer copy chunk size in bytes default
:param bitstream: A Bitstream object
:type bitstream: Bitstream
:return: The file in bytes
:rtype: byteIO
"""
if not isinstance(bitstream, Bitstream):
logger.error("bitstream_content argument is not a Bitstream object")
raise RuntimeError("bitstream_bytes argument is not a Bitstream object")
with self.session.get(bitstream.content_url, headers={HEADER_TOKEN: self.token}, stream=True) as request:
if request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.bitstream_bytes(bitstream)
elif request.status_code == requests.codes.ok:
file_bytes = BytesIO()
for chunk in request.iter_content(chunk_size=chunk_size):
file_bytes.write(chunk)
file_bytes.seek(0)
if file_bytes.getbuffer().nbytes == bitstream.length:
logger.debug(f"Downloaded {bitstream.length} bytes from {bitstream.filename}")
return file_bytes
else:
logger.error("Downloaded file size did not match the Preservica held value")
return None
else:
exception = HTTPException(bitstream.filename, request.status_code, request.url, "bitstream_content",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def bitstream_content(self, bitstream: Bitstream, filename: str, chunk_size: int = CHUNK_SIZE) -> Union[int, None]:
"""
Download a file represented as a Bitstream to a local filename
Returns the number of bytes written to the file
Returns None if the file does not contain the correct number of bytes
:param chunk_size: The buffer copy chunk size in bytes default
:param bitstream: A Bitstream object
:type bitstream: Bitstream
:param filename: The filename to write the bytes to
:type filename: str
:return: The size of the file in bytes
:rtype: int
"""
if not isinstance(bitstream, Bitstream):
logger.error("bitstream_content argument is not a Bitstream object")
raise RuntimeError("bitstream_content argument is not a Bitstream object")
with self.session.get(bitstream.content_url, headers={HEADER_TOKEN: self.token}, stream=True) as request:
if request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.bitstream_content(bitstream, filename)
elif request.status_code == requests.codes.ok:
with open(filename, 'wb') as file:
for chunk in request.iter_content(chunk_size=chunk_size):
file.write(chunk)
file.flush()
if os.path.getsize(filename) == bitstream.length:
logger.debug(f"Downloaded {bitstream.length} bytes into {filename}")
return bitstream.length
else:
logger.error("Download file size did not match the Preservica held value")
os.remove(filename)
return None
else:
exception = HTTPException(bitstream.filename, request.status_code, request.url, "bitstream_content",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def download_opex(self, pid: str) -> str:
"""
Download a completed OPEX export using the workflow process ID
:param pid: A process id which identifiers the export workflow
:type pid: str
:return: The downloaded zip file name
:rtype: str
"""
headers = {HEADER_TOKEN: self.__token__(), 'Content-Type': 'application/xml;charset=UTF-8'}
download = self.session.get(f'{self.protocol}://{self.server}/api/entity/actions/exports/{pid}/content',
stream=True, headers=headers)
if download.status_code == requests.codes.ok:
with open(f'{pid}.zip', 'wb') as file:
for chunk in download.iter_content(chunk_size=CHUNK_SIZE):
file.write(chunk)
file.flush()
logger.debug(f"Downloaded open package into {pid}.zip")
return f'{pid}.zip'
elif download.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.download_opex(pid)
else:
exception = HTTPException(pid, download.status_code, download.url, "download_opex",
download.content.decode('utf-8'))
logger.error(exception)
raise exception
def __export_opex_start__(self, entity: Entity, **kwargs) -> str:
"""
Initiates export of the entity and downloads the opex package
By default, includes content, metadata with the latest active generations
and the parent hierarchy.
Arguments are kwargs map
IncludeContent
IncludeMetadata
IncludedGenerations
IncludeParentHierarchy
"""
if self.major_version < 7 and self.minor_version < 2:
raise RuntimeError("export_opex API is only available when connected to a v6.2 system or above")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
include_content_options = ("Content", "NoContent")
include_metadata_options = ("Metadata", "NoMetadata", "MetadataWithEvents")
include_generation_options = ("LatestActive", "AllActive", "All")
include_parent_options = ("true", "false")
include_content = "Content"
if 'IncludeContent' in kwargs:
value = str(kwargs.get("IncludeContent")).strip()
if value.casefold() in map(str.casefold, include_content_options):
include_content = value
include_metadata = "Metadata"
if 'IncludeMetadata' in kwargs:
value = str(kwargs.get("IncludeMetadata")).strip()
if value.casefold() in map(str.casefold, include_metadata_options):
include_metadata = value
include_generation = "All"
if 'IncludedGenerations' in kwargs:
value = str(kwargs.get("IncludedGenerations")).strip()
if value.casefold() in map(str.casefold, include_generation_options):
include_generation = value
include_parent = "true"
if 'IncludeParentHierarchy' in kwargs:
value = str(kwargs.get("IncludeParentHierarchy")).strip()
if value.casefold() in map(str.casefold, include_parent_options):
include_parent = value
xml_object = xml.etree.ElementTree.Element('ExportAction', {"xmlns": self.entity_ns})
xml.etree.ElementTree.SubElement(xml_object, "IncludeContent").text = include_content
xml.etree.ElementTree.SubElement(xml_object, "IncludeMetadata").text = include_metadata
xml.etree.ElementTree.SubElement(xml_object, "IncludedGenerations").text = include_generation
xml.etree.ElementTree.SubElement(xml_object, "IncludeParentHierarchy").text = include_parent.lower()
xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8')
logger.debug(xml_request)
request = self.session.post(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/exports',
headers=headers, data=xml_request)
if request.status_code == requests.codes.accepted:
return str(request.content.decode('utf-8'))
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.__export_opex_start__(entity, IncludeContent=include_content,
IncludeMetadata=include_metadata, IncludedGenerations=include_generation,
IncludeParentHierarchy=include_parent)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "__export_opex_start__",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def export_opex_async(self, entity: Entity, **kwargs) -> str:
"""
Initiates export of the entity returns an id to track progress
"""
return self.__export_opex_start__(entity, **kwargs)
def export_opex_sync(self, entity: Entity, **kwargs) -> str:
"""
Initiates export of the entity and downloads the opex package
Blocks until the package is downloaded
By default, includes content, metadata with the latest active generations
and the parent hierarchy.
Arguments are kwargs map
IncludeContent
IncludeMetadata
IncludedGenerations
IncludeParentHierarchy
"""
return self.export_opex(entity, **kwargs)
[docs]
def export_opex(self, entity: Entity, **kwargs) -> str:
"""
Initiates export of the entity and downloads the opex package
Blocks until the package is downloaded
By default, includes content, metadata with the latest active generations
and the parent hierarchy.
Arguments are kwargs map
IncludeContent
IncludeMetadata
IncludedGenerations
IncludeParentHierarchy
"""
status = "ACTIVE"
pid = self.__export_opex_start__(entity, **kwargs)
while status == "ACTIVE":
status = self.get_async_progress(pid)
logger.debug(status)
if status == "COMPLETED":
return self.download_opex(pid)
else:
logger.error(status)
raise RuntimeError(f"export progress failed {status}")
[docs]
def download(self, entity: Entity, filename: str) -> str:
"""
Download a file from an asset
Returns the filename of the new file
:param entity: The entity containing the file
:param filename: The filename to write the bytes to
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'}
params = {'id': f'sdb:{entity.entity_type.value}|{entity.reference}'}
with self.session.get(f'{self.protocol}://{self.server}/api/content/download', params=params, headers=headers,
stream=True) as request:
if request.status_code == requests.codes.ok:
with open(filename, 'wb') as file:
for chunk in request.iter_content(chunk_size=CHUNK_SIZE):
file.write(chunk)
file.flush()
return filename
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.download(entity, filename)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "download",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def has_thumbnail(self, entity: Entity) -> bool:
"""
Does the entity have a thumbnail image attached
Returns false if the entity has no thumbnail
:param entity: The entity
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'}
params = {'id': f'sdb:{entity.entity_type.value}|{entity.reference}', 'size': f'{Thumbnail.SMALL.value}'}
with self.session.get(f'{self.protocol}://{self.server}/api/content/thumbnail', params=params,
headers=headers) as request:
if request.status_code == requests.codes.ok:
return True
if request.status_code == requests.codes.not_found:
return False
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.has_thumbnail(entity)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "has_thumbnail",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def thumbnail(self, entity: Entity, filename: str, size=Thumbnail.LARGE):
"""
Download the thumbnail of an asset or folder
Returns the filename of the new thumbnail file or None if the entity has no thumbnail
:param entity: The entity containing the file
:param filename: The filename to write the bytes to
:param size: The size of the thumbnail
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'}
params = {'id': f'sdb:{entity.entity_type.value}|{entity.reference}', 'size': f'{size.value}'}
with self.session.get(f'{self.protocol}://{self.server}/api/content/thumbnail', params=params,
headers=headers, stream=True) as request:
if request.status_code == requests.codes.ok:
with open(filename, 'wb') as file:
for chunk in request.iter_content(chunk_size=CHUNK_SIZE):
file.write(chunk)
file.flush()
return filename
elif request.status_code == requests.codes.not_found:
return None
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.thumbnail(entity, filename, size=size)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "thumbnail",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def delete_identifiers(self, entity: Entity, identifier_type: str = None, identifier_value: str = None):
"""
Delete external identifiers from an entity
Returns the entity
:param entity: The entity to delete identifiers from
:param identifier_type: The type of the identifier to delete.
:param identifier_value: The value of the identifier to delete.
"""
if (self.major_version < 7) and (self.minor_version < 1):
raise RuntimeError("delete_identifiers API call is not available when connected to a v6.0 System")
headers = {HEADER_TOKEN: self.token}
request = self.session.get(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/identifiers',
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
entity_response = xml.etree.ElementTree.fromstring(xml_response)
identifier_list = entity_response.findall(f'.//{{{self.xip_ns}}}Identifier')
for identifier_element in identifier_list:
_ref = _type = _value = _aipid = None
for identifier in identifier_element:
if identifier.tag.endswith("Entity"):
_ref = identifier.text
if identifier.tag.endswith("Type") and identifier_type is not None:
_type = identifier.text
if identifier.tag.endswith("Value") and identifier_value is not None:
_value = identifier.text
if identifier.tag.endswith("ApiId"):
_aipid = identifier.text
if _ref == entity.reference and _type == identifier_type and _value == identifier_value:
del_req = self.session.delete(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/identifiers/{_aipid}',
headers=headers)
if del_req.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.delete_identifiers(entity, identifier_type, identifier_value)
if del_req.status_code == requests.codes.no_content:
pass
else:
return None
return entity
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.delete_identifiers(entity, identifier_type, identifier_value)
else:
logger.error(request)
raise RuntimeError(request.status_code, "delete_identifier failed")
[docs]
def identifiers_for_entity(self, entity: Entity) -> set[Tuple]:
"""
Get all external identifiers on an entity
Returns the set of external identifiers on the entity
:param entity: The Entity (Asset or Folder)
:type entity: Entity
"""
headers = {HEADER_TOKEN: self.token}
request = self.session.get(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/identifiers',
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
identifier_list = entity_response.findall(f'.//{{{self.xip_ns}}}Identifier')
result = set()
for identifier in identifier_list:
identifier_value = identifier_type = ""
for child in identifier:
if child.tag.endswith("Type"):
identifier_type = child.text
if child.tag.endswith("Value"):
identifier_value = child.text
result.add(tuple((identifier_type, identifier_value)))
return result
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.identifiers_for_entity(entity)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "identifiers_for_entity",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def identifier(self, identifier_type: str, identifier_value: str) -> set[Entity]:
"""
Get all entities which have the external identifier
Returns the set of entities which have the external identifier
:param identifier_type: The identifier type
:param identifier_value: The identifier value
"""
headers = {HEADER_TOKEN: self.token}
payload = {'type': identifier_type, 'value': identifier_value}
request = self.session.get(f'{self.protocol}://{self.server}/api/entity/entities/by-identifier', params=payload,
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
entity_list = entity_response.findall(f'.//{{{self.entity_ns}}}Entity')
result = set()
for entity in entity_list:
if entity.attrib['type'] == EntityType.FOLDER.value:
folder = Folder(entity.attrib['ref'], entity.attrib['title'], None, None, None, None)
result.add(folder)
elif entity.attrib['type'] == EntityType.ASSET.value:
asset = Asset(entity.attrib['ref'], entity.attrib['title'], None, None, None, None)
result.add(asset)
elif entity.attrib['type'] == EntityType.CONTENT_OBJECT.value:
co = ContentObject(entity.attrib['ref'], entity.attrib['title'], None, None, None, None)
result.add(co)
return result
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.identifier(identifier_type, identifier_value)
else:
exception = HTTPException(payload, request.status_code, request.url, "identifier",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def add_identifier(self, entity: Entity, identifier_type: str, identifier_value: str):
"""
Add a new identifier to an entity
Returns the internal identifier DB key
:param entity: The Entity
:param identifier_type: The identifier type
:param identifier_value: The identifier value
"""
if self.major_version < 7 and self.minor_version < 1:
raise RuntimeError("add_identifier API call is not available when connected to a v6.0 System")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_object = xml.etree.ElementTree.Element('Identifier', {"xmlns": self.xip_ns})
xml.etree.ElementTree.SubElement(xml_object, "Type").text = identifier_type
xml.etree.ElementTree.SubElement(xml_object, "Value").text = identifier_value
xml.etree.ElementTree.SubElement(xml_object, "Entity").text = entity.reference
end_point = f"/{entity.path}/{entity.reference}/identifiers"
xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8')
logger.debug(xml_request)
request = self.session.post(f'{self.protocol}://{self.server}/api/entity{end_point}', data=xml_request,
headers=headers)
if request.status_code == requests.codes.ok:
xml_string = str(request.content.decode("utf-8"))
identifier_response = xml.etree.ElementTree.fromstring(xml_string)
aip_id = identifier_response.find(f'.//{{{self.xip_ns}}}ApiId')
if hasattr(aip_id, 'text'):
return aip_id.text
else:
return None
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_identifier(entity, identifier_type, identifier_value)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "add_identifier",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def delete_relationships(self, entity: Entity, relationship_type: str = None):
"""
Delete a relationship between two entities by its internal id
This function only deletes the relationship FROM the specified entity to another entity
It does not delete relationships TO this entity
If relationship_type is not specified all relationships FROM this entity are deleted.
:param entity:
:type entity: Entity
:param relationship_type: The relationship type to delete
:type relationship_type: str
"""
if (self.major_version < 7) and (self.minor_version < 4) and (self.patch_version < 1):
raise RuntimeError("add_relation API call is only available with a Preservica v6.3.1 system or higher")
for relationship in self.relationships(entity=entity):
if relationship.direction == RelationshipDirection.FROM:
assert relationship.this_ref == entity.reference
if relationship_type is None:
self.__delete_relationship(relationship)
if relationship_type == relationship.relationship_type:
self.__delete_relationship(relationship)
def __delete_relationship(self, relationship: Relationship):
"""
Delete a relationship between two entities by its internal id
:param relationship:
:return:
"""
headers = {HEADER_TOKEN: self.token}
entity = self.entity(relationship.entity_type, relationship.this_ref)
end_point = f"{entity.path}/{entity.reference}/links/{relationship.api_id}"
request = self.session.delete(f'{self.protocol}://{self.server}/api/entity/{end_point}', headers=headers)
if request.status_code == requests.codes.no_content:
print(relationship)
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.__delete_relationship(relationship)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "delete_relationships",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def relationships(self, entity: Entity, page_size: int = 25) -> Generator[Relationship, None, None]:
"""
List the relationship links between entities
:param page_size: The number of items returned in a single server call
:type: page_size: int
:param entity: The Source Entity
:type: entity: Entity
:return: Generator
:rtype: Relationship
"""
paged_set = self.__relationships__(entity, maximum=page_size, next_page=None)
for e in paged_set.results:
yield e
while paged_set.has_more:
paged_set = self.__relationships__(entity, maximum=page_size, next_page=paged_set.next_page)
for e in paged_set.results:
yield e
def __relationships__(self, entity: Entity, maximum: int = 50, next_page: str = None) -> PagedSet:
"""
List the relationship links between entities
:param next_page: URL to next page of results
:type: next_page: str
:param maximum: The number of items returned in a single server call
:type: maximum: int
:param entity: The Source Entity
:type: from_entity: Entity
:return: relationship links
:rtype: list
"""
headers = {HEADER_TOKEN: self.token}
end_point = f"{entity.path}/{entity.reference}/links"
if next_page is None:
params = {'start': '0', 'max': str(maximum)}
request = self.session.get(f'{self.protocol}://{self.server}/api/entity/{end_point}', headers=headers,
params=params)
else:
request = self.session.get(next_page, headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
links = entity_response.findall(f'.//{{{self.entity_ns}}}Link')
next_url = entity_response.find(f'.//{{{self.entity_ns}}}Paging/{{{self.entity_ns}}}Next')
total_hits = entity_response.find(f'.//{{{self.entity_ns}}}Paging/{{{self.entity_ns}}}TotalResults')
results = []
for link in links:
link_type = link.attrib['linkType']
link_direction = link.attrib['linkDirection']
title = link.attrib['title']
other_ref = link.attrib['ref']
this_ref = entity.reference
entity_type = link.attrib['type']
api_id = link.attrib['apiId']
results.append(Relationship(api_id, link_type, RelationshipDirection(link_direction), other_ref, title,
EntityType(entity_type), this_ref, api_id))
has_more = True
url = None
if next_url is None:
has_more = False
else:
url = next_url.text
return PagedSet(results, has_more, int(total_hits.text), url)
elif request.status_code == requests.codes.unauthorized:
self.__relationships__(entity=entity, maximum=maximum, next_page=next_page)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "relationships",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def add_relation(self, from_entity: Entity, relationship_type: str, to_entity: Entity):
"""
Add a new relationship link between two Assets or Folders
:param from_entity: The Source Entity
:type from_entity: Entity
:param to_entity: The Target Entity
:type to_entity: Entity
:param relationship_type: The Relationship type
:type relationship_type: str
:return: relationship_type
:rtype: str
"""
if (self.major_version < 7) and (self.minor_version < 4) and (self.patch_version < 1):
raise RuntimeError("add_relation API call is only available with a Preservica v6.3.1 system or higher")
assert from_entity.entity_type is not EntityType.CONTENT_OBJECT
assert to_entity.entity_type is not EntityType.CONTENT_OBJECT
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_object = xml.etree.ElementTree.Element('Link ', {"xmlns": self.xip_ns})
xml.etree.ElementTree.SubElement(xml_object, "Type").text = relationship_type
xml.etree.ElementTree.SubElement(xml_object, "FromEntity").text = from_entity.reference
xml.etree.ElementTree.SubElement(xml_object, "ToEntity").text = to_entity.reference
end_point = f"/{from_entity.path}/{from_entity.reference}/links"
xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8')
logger.debug(xml_request)
request = self.session.post(f'{self.protocol}://{self.server}/api/entity{end_point}', data=xml_request,
headers=headers)
if request.status_code == requests.codes.ok:
xml_string = str(request.content.decode("utf-8"))
logger.debug(xml_string)
link_response = xml.etree.ElementTree.fromstring(xml_string)
relation = link_response.find(f'.//{{{self.xip_ns}}}Link')
relation_type = relation.find(f'.//{{{self.xip_ns}}}Type')
return relation_type.text
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_relation(from_entity, relationship_type, to_entity)
else:
exception = HTTPException(from_entity.reference, request.status_code, request.url, "add_relation",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def save(self, entity: Entity) -> Entity:
"""
Save the title and description of an entity
Returns The updated Entity
:param entity: The Entity to update
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_object = xml.etree.ElementTree.Element(entity.tag, {"xmlns": self.xip_ns})
xml.etree.ElementTree.SubElement(xml_object, "Ref").text = entity.reference
xml.etree.ElementTree.SubElement(xml_object, "Title").text = entity.title
xml.etree.ElementTree.SubElement(xml_object, "Description").text = entity.description
xml.etree.ElementTree.SubElement(xml_object, "SecurityTag").text = entity.security_tag
if entity.custom_type is not None:
xml.etree.ElementTree.SubElement(xml_object, "CustomType").text = entity.custom_type
if entity.parent is not None:
xml.etree.ElementTree.SubElement(xml_object, "Parent").text = entity.parent
xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8')
logger.debug(xml_request)
request = self.session.put(f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}',
data=xml_request, headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
response = self.entity_from_string(xml_response)
if isinstance(entity, Asset):
asset = Asset(response['reference'], response['title'], response['description'],
response['security_tag'],
response['parent'], response['metadata'])
if 'CustomType' in response:
asset.custom_type = response['CustomType']
return asset
elif isinstance(entity, Folder):
folder = Folder(response['reference'], response['title'], response['description'],
response['security_tag'],
response['parent'], response['metadata'])
if 'CustomType' in response:
folder.custom_type = response['CustomType']
return folder
elif isinstance(entity, ContentObject):
content_object = ContentObject(response['reference'], response['title'],
response['description'],
response['security_tag'],
response['parent'], response['metadata'])
if 'CustomType' in response:
content_object.custom_type = response['CustomType']
return content_object
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.save(entity)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "save",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def move_async(self, entity: Entity, dest_folder: Folder) -> str:
"""
Move an Entity (Asset or Folder) to a new Folder
If dest_folder is None then the entity must be a Folder and will be moved to the root of the repository
Returns a process ID asynchronous (without blocking)
Returns The updated Entity
:param entity: The Entity to update
:param dest_folder: The Folder which will become the new parent of this entity
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'}
if isinstance(entity, Asset) and dest_folder is None:
raise RuntimeError(entity.reference, "Only folders can be moved to the root of the repository")
if dest_folder is not None:
data = dest_folder.reference
else:
data = "@root@"
request = self.session.put(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/parent-ref',
data=data, headers=headers)
if request.status_code == requests.codes.accepted:
return request.content.decode()
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.move_async(entity, dest_folder)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "move_async",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def get_async_progress(self, pid: str) -> str:
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'}
request = self.session.get(f"{self.protocol}://{self.server}/api/entity/progress/{pid}", headers=headers)
if request.status_code == requests.codes.ok:
entity_response = xml.etree.ElementTree.fromstring(request.content.decode("utf-8"))
status = entity_response.find(".//{http://status.preservica.com}Status")
if hasattr(status, 'text'):
return status.text
else:
return "UNKNOWN"
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.get_async_progress(pid)
else:
exception = HTTPException(pid, request.status_code, request.url, "get_async_progress",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def move_sync(self, entity: Entity, dest_folder: Folder) -> Entity:
"""
Move an Entity (Asset or Folder) to a new Folder
If dest_folder is None then the entity must be a Folder and will be moved to the root of the repository
Returns The updated Entity.
Blocks until the move is complete.
:param entity: The Entity to update
:param dest_folder: The Folder which will become the new parent of this entity
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'}
if isinstance(entity, Asset) and dest_folder is None:
raise RuntimeError(entity.reference, "Only folders can be moved to the root of the repository")
if dest_folder is not None:
data = dest_folder.reference
else:
data = "@root@"
request = self.session.put(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/parent-ref',
data=data, headers=headers)
if request.status_code == requests.codes.accepted:
sleep_sec = 1
while True:
status = self.get_async_progress(request.content.decode("utf-8"))
if status != "ACTIVE":
return self.entity(entity.entity_type, entity.reference)
else:
sleep(sleep_sec)
sleep_sec = sleep_sec + 1
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.move_sync(entity, dest_folder)
else:
exception = HTTPException(entity, request.status_code, request.url, "move_sync",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def move(self, entity: Entity, dest_folder: Folder) -> Entity:
"""
Move an Entity (Asset or Folder) to a new Folder
If dest_folder is None then the entity must be a Folder and will be moved to the root of the repository
Returns The updated Entity
:param entity: The Entity to update
:param dest_folder: The Folder which will become the new parent of this entity
"""
return self.move_sync(entity, dest_folder)
[docs]
def create_folder(self, title: str, description: str, security_tag: str, parent: str = None) -> Folder:
"""
Create a new Folder in the repository
If parent is None then the new Folder will be created at the root of the repository
Returns The updated Entity
:param title: The title of the new folder
:param description: The description of the new folder
:param security_tag: The security_tag of the new folder
:param parent: The parent of the new folder, Can be None to create a root Folder
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
structural_object = xml.etree.ElementTree.Element('StructuralObject', {"xmlns": self.xip_ns})
xml.etree.ElementTree.SubElement(structural_object, "Ref").text = str(uuid.uuid4())
xml.etree.ElementTree.SubElement(structural_object, "Title").text = title
xml.etree.ElementTree.SubElement(structural_object, "Description").text = description
xml.etree.ElementTree.SubElement(structural_object, "SecurityTag").text = security_tag
if parent is not None:
xml.etree.ElementTree.SubElement(structural_object, "Parent").text = parent
xml_request = xml.etree.ElementTree.tostring(structural_object, encoding='utf-8')
logger.debug(xml_request)
request = self.session.post(f'{self.protocol}://{self.server}/api/entity/structural-objects', data=xml_request,
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
entity = self.entity_from_string(xml_response)
return Folder(entity['reference'], entity['title'], entity['description'],
entity['security_tag'],
entity['parent'],
entity['metadata'])
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.create_folder(title, description, security_tag, parent=parent)
else:
exception = HTTPException(title, request.status_code, request.url, "create_folder",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def all_metadata(self, entity: Entity) -> Tuple:
"""
Retrieve all metadata fragments on an entity
Returns XML documents in a tuple
:param entity: The entity with the metadata
"""
for uri, schema in entity.metadata.items():
yield tuple((str(schema), self.metadata(uri)))
def metadata_tag_for_entity(self, entity: Entity, schema: str, tag: str, isXpath: bool = False) -> str:
"""
Retrieve the first value of the tag from a metadata template given by schema
Returns XML document as a string
:param isXpath: True if the tag name is a fully qualified xpath expression
:param entity: The entity with the metadata
:param schema: The schema URI
:param tag: The tag name
"""
xml_doc = self.metadata_for_entity(entity, schema)
if xml_doc:
xml_object = xml.etree.ElementTree.fromstring(xml_doc)
if isXpath is False:
return xml_object.find(f'.//{{*}}{tag}').text
else:
return xml_object.find(tag).text
[docs]
def security_tag_sync(self, entity: Entity, new_tag: str):
"""
Change the security tag for a folder or asset
Returns the updated entity after the security tag has been updated.
:param entity: The entity to change
:param new_tag: The new security tag
"""
self.token = self.__token__()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'}
end_point = f"/{entity.path}/{entity.reference}/security-descriptor"
request = self.session.put(f'{self.protocol}://{self.server}/api/entity{end_point}?includeDescendants=false',
data=new_tag, headers=headers)
if request.status_code == requests.codes.accepted:
sleep_sec = 1
while True:
status = self.get_async_progress(request.content.decode("utf-8"))
if status != "ACTIVE":
return self.entity(entity.entity_type, entity.reference)
else:
sleep(sleep_sec)
sleep_sec = sleep_sec + 1
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.security_tag_sync(entity, new_tag)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "security_tag_sync",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def security_tag_async(self, entity: Entity, new_tag: str):
"""
Change the security tag for a folder or asset
Returns a process ID asynchronous (without blocking)
:param entity: The entity to change
:param new_tag: The new security tag
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain'}
end_point = f"/{entity.path}/{entity.reference}/security-descriptor"
request = self.session.put(f'{self.protocol}://{self.server}/api/entity{end_point}?includeDescendants=false',
data=new_tag, headers=headers)
if request.status_code == requests.codes.accepted:
return request.content.decode("utf-8")
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.security_tag_async(entity, new_tag)
else:
exception = HTTPException(entity.reference, request.status_code, request.url, "security_tag_async",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def entity(self, entity_type: EntityType, reference: str) -> Entity:
"""
Retrieve an entity by its type and reference
Returns Entity (Asset, Folder, ContentObject)
:param entity_type: The type of entity to fetch
:param reference: The unique identifier of the entity
"""
if entity_type is EntityType.CONTENT_OBJECT:
return self.content_object(reference)
if entity_type is EntityType.FOLDER:
return self.folder(reference)
if entity_type is EntityType.ASSET:
return self.asset(reference)
def add_physical_asset(self, title: str, description: str, parent: Folder, security_tag: str = "open") -> Asset:
"""
Create a new asset which represents a physical object
Returns Asset
:param title: The title of the new Asset
:param description: The description of the new Asset
:param parent: The parent folder
:param security_tag: The security setting
"""
if (self.major_version < 7) and (self.minor_version < 4):
raise RuntimeError(
"add_physical_asset API call is only available with a Preservica v6.4.0 system or higher")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xip_object = xml.etree.ElementTree.Element('XIP ', {"xmlns": self.xip_ns})
io_object = xml.etree.ElementTree.SubElement(xip_object, "InformationObject")
xml.etree.ElementTree.SubElement(io_object, "Title").text = str(title)
xml.etree.ElementTree.SubElement(io_object, "Description").text = str(description)
xml.etree.ElementTree.SubElement(io_object, "SecurityTag").text = str(security_tag)
xml.etree.ElementTree.SubElement(io_object, "Parent").text = parent.reference
rep_object = xml.etree.ElementTree.SubElement(xip_object, "Representation")
xml.etree.ElementTree.SubElement(rep_object, "Type").text = "Physical"
xml_request = xml.etree.ElementTree.tostring(xip_object, encoding='utf-8')
request = self.session.post(f'{self.protocol}://{self.server}/api/entity/{IO_PATH}', data=xml_request,
headers=headers)
if request.status_code == requests.codes.ok:
xml_string = str(request.content.decode("utf-8"))
entity = self.entity_from_string(xml_string)
return Asset(entity['reference'], entity['title'], entity['description'],
entity['security_tag'], entity['parent'],
entity['metadata'])
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_physical_asset(title, description, parent, security_tag)
else:
exception = HTTPException(title, request.status_code, request.url, "add_physical_asset",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def asset(self, reference: str) -> Asset:
"""
Retrieve an Asset by its reference
Returns Asset
:param reference: The unique identifier of the entity
"""
headers = {HEADER_TOKEN: self.token}
request = self.session.get(f'{self.protocol}://{self.server}/api/entity/{IO_PATH}/{reference}', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
entity = self.entity_from_string(xml_response)
asset = Asset(entity['reference'], entity['title'], entity['description'],
entity['security_tag'], entity['parent'],
entity['metadata'])
if 'CustomType' in entity:
asset.custom_type = entity['CustomType']
return asset
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.asset(reference)
elif request.status_code == requests.codes.not_found:
exception = ReferenceNotFoundException(reference, request.status_code, request.url, "asset")
logger.error(exception)
raise exception
else:
exception = HTTPException(reference, request.status_code, request.url, "asset",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def folder(self, reference: str) -> Folder:
"""
Retrieve a Folder by its reference
Returns Folder
:param reference: The unique identifier of the entity
"""
headers = {HEADER_TOKEN: self.token}
request = self.session.get(f'{self.protocol}://{self.server}/api/entity/{SO_PATH}/{reference}', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
entity = self.entity_from_string(xml_response)
folder = Folder(entity['reference'], entity['title'], entity['description'],
entity['security_tag'], entity['parent'],
entity['metadata'])
if 'CustomType' in entity:
folder.custom_type = entity['CustomType']
return folder
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.folder(reference)
elif request.status_code == requests.codes.not_found:
exception = ReferenceNotFoundException(reference, request.status_code, request.url, "folder")
logger.error(exception)
raise exception
else:
exception = HTTPException(reference, request.status_code, request.url, "folder",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def content_object(self, reference: str) -> ContentObject:
"""
Retrieve an ContentObject by its reference
Returns ContentObject
:param reference: The unique identifier of the entity
"""
headers = {HEADER_TOKEN: self.token}
request = self.session.get(f'{self.protocol}://{self.server}/api/entity/{CO_PATH}/{reference}', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
entity = self.entity_from_string(xml_response)
content_object = ContentObject(entity['reference'], entity['title'], entity['description'],
entity['security_tag'], entity['parent'],
entity['metadata'])
if 'CustomType' in entity:
content_object.custom_type = entity['CustomType']
return content_object
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.content_object(reference)
elif request.status_code == requests.codes.not_found:
exception = ReferenceNotFoundException(reference, request.status_code, request.url, "content_object")
logger.error(exception)
raise exception
else:
exception = HTTPException(reference, request.status_code, request.url, "content_object",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def content_objects(self, representation: Representation) -> list[ContentObject]:
"""
Retrieve a list of content objects within a representation
:param representation:
:returns list[ContentObject]
"""
headers = {HEADER_TOKEN: self.token}
if not isinstance(representation, Representation):
logger.warning("representation is not of type Representation")
return []
request = self.session.get(f'{representation.url}', headers=headers)
if request.status_code == requests.codes.ok:
results = []
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
content_objects = entity_response.findall(f'.//{{{self.xip_ns}}}Representation/'
f'{{{self.xip_ns}}}ContentObjects/{{{self.xip_ns}}}ContentObject')
for co in content_objects:
content_object = self.content_object(co.text)
content_object.representation_type = representation.rep_type
content_object.asset = representation.asset
results.append(content_object)
return results
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.content_objects(representation)
else:
exception = HTTPException(representation.name, request.status_code, request.url, "content_objects",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def generation(self, url: str) -> Generation:
"""
Retrieve a list of generation objects
:param url:
:returns Generation
"""
headers = {HEADER_TOKEN: self.token}
request = self.session.get(url, headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
ge = entity_response.find(f'.//{{{self.xip_ns}}}Generation')
format_group = entity_response.find(f'.//{{{self.xip_ns}}}FormatGroup')
effective_date = entity_response.find(f'.//{{{self.xip_ns}}}EffectiveDate')
formats = entity_response.findall(f'.//{{{self.xip_ns}}}Formats/{{{self.xip_ns}}}Format')
formats_list = []
for tech_format in formats:
format_dict = {'Valid': tech_format.attrib['valid']}
puid = tech_format.find(f'.//{{{self.xip_ns}}}PUID')
format_dict['PUID'] = puid.text if hasattr(puid, 'text') else None
priority = tech_format.find(f'.//{{{self.xip_ns}}}Priority')
format_dict['Priority'] = priority.text if hasattr(priority, 'text') else None
method = tech_format.find(f'.//{{{self.xip_ns}}}IdentificationMethod')
format_dict['IdentificationMethod'] = method.text if hasattr(method, 'text') else None
name = tech_format.find(f'.//{{{self.xip_ns}}}FormatName')
format_dict['FormatName'] = name.text if hasattr(name, 'text') else None
version = tech_format.find(f'.//{{{self.xip_ns}}}FormatVersion')
format_dict['FormatVersion'] = version.text if hasattr(version, 'text') else None
formats_list.append(format_dict)
index = int(url.rsplit("/", 1)[-1])
properties = entity_response.findall(f'.//{{{self.xip_ns}}}Properties/{{{self.xip_ns}}}Property')
property_set = []
for tech_props in properties:
tech_props_dict = {}
puid = tech_props.find(f'.//{{{self.xip_ns}}}PUID')
tech_props_dict['PUID'] = puid.text if hasattr(puid, 'text') else None
name = tech_props.find(f'.//{{{self.xip_ns}}}PropertyName')
tech_props_dict['PropertyName'] = name.text if hasattr(name, 'text') else None
value = tech_props.find(f'.//{{{self.xip_ns}}}Value')
tech_props_dict['Value'] = value.text if hasattr(value, 'text') else None
property_set.append(tech_props_dict)
bitstreams = entity_response.findall(f'./{{{self.entity_ns}}}Bitstreams/{{{self.entity_ns}}}Bitstream')
bitstream_list = []
for bit in bitstreams:
bitstream_list.append(self.bitstream(bit.text))
generation = Generation(strtobool(ge.attrib['original']), strtobool(ge.attrib['active']),
format_group.text if hasattr(format_group, 'text') else None,
effective_date.text if hasattr(effective_date, 'text') else None,
bitstream_list)
generation.formats = formats_list
generation.properties = property_set
generation.gen_index = index
return generation
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.generation(url)
else:
exception = HTTPException(url, request.status_code, request.url, "generation",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def _integrity_checks(self, bitstream: Bitstream, maximum: int = 10, next_page: str = None):
headers = {HEADER_TOKEN: self.token}
if next_page is None:
url = re.sub('content$', f'integrity-check-history', bitstream.content_url)
params = {'start': '0', 'max': str(maximum)}
request = self.session.get(url, headers=headers, params=params)
else:
request = self.session.get(next_page, headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
histories = entity_response.findall(f'.//{{{self.entity_ns}}}IntegrityCheckHistory')
next_url = entity_response.find(f'.//{{{self.entity_ns}}}Paging/{{{self.entity_ns}}}Next')
total_hits = entity_response.find(f'.//{{{self.entity_ns}}}Paging/{{{self.entity_ns}}}TotalResults')
results = []
for history in histories:
xip_type = history.find(f'./{{{self.xip_ns}}}Type')
xip_success = history.find(f'./{{{self.xip_ns}}}Success')
xip_date = history.find(f'./{{{self.xip_ns}}}Date')
xip_adapter_name = history.find(f'./{{{self.xip_ns}}}AdapterName')
xip_fixed = history.find(f'./{{{self.xip_ns}}}Fixed')
xip_reason = history.find(f'./{{{self.xip_ns}}}Reason')
check = IntegrityCheck(xip_type.text if hasattr(xip_type, 'text') else None,
xip_success.text if hasattr(xip_success, 'text') else None,
xip_date.text if hasattr(xip_date, 'text') else None,
xip_adapter_name.text if hasattr(xip_adapter_name, 'text') else None,
xip_fixed.text if hasattr(xip_fixed, 'text') else None,
xip_reason.text if hasattr(xip_reason, 'text') else None)
results.append(check)
has_more = True
url = None
if next_url is None:
has_more = False
else:
url = next_url.text
return PagedSet(results, has_more, int(total_hits.text), url)
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._integrity_checks(bitstream, maximum, next_page)
else:
exception = HTTPException(bitstream.filename, request.status_code, request.url, "_integrity_checks",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
def integrity_checks(self, bitstream: Bitstream) -> Generator:
"""
Return integrity checks for a bitstream
"""
maximum = 10
paged_set = self._integrity_checks(bitstream=bitstream, maximum=maximum, next_page=None)
for entity in paged_set.results:
yield entity
while paged_set.has_more:
paged_set = self._integrity_checks(bitstream=bitstream, maximum=maximum, next_page=paged_set.next_page)
for entity in paged_set.results:
yield entity
def bitstream(self, url: str) -> Bitstream:
"""
Retrieve a bitstream by its url
Returns Bitstream
:param url:
"""
headers = {HEADER_TOKEN: self.token}
request = self.session.get(url, headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
entity_response = xml.etree.ElementTree.fromstring(xml_response)
logger.debug(xml_response)
filename = entity_response.find(f'.//{{{self.xip_ns}}}Filename')
filesize = entity_response.find(f'.//{{{self.xip_ns}}}FileSize')
fixity_values = entity_response.findall(f'.//{{{self.xip_ns}}}Fixity')
content = entity_response.find(f'.//{{{self.entity_ns}}}Content')
index = int(url.rsplit("/", 1)[-1])
fixity = {}
for f in fixity_values:
fixity[f[0].text] = f[1].text
bitstream = Bitstream(filename.text if hasattr(filename, 'text') else None,
int(filesize.text) if hasattr(filesize, 'text') else None, fixity,
content.text if hasattr(content, 'text') else None)
bitstream.bs_index = index
return bitstream
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.bitstream(url)
else:
exception = HTTPException(url, request.status_code, request.url, "bitstream",
request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def replace_generation_sync(self, content_object: ContentObject, file_name, fixity_algorithm=None,
fixity_value=None) -> str:
"""
Replace the last active generation of a content object with a new digital file.
Starts the workflow and blocks until the workflow completes.
"""
status = "ACTIVE"
pid = self.replace_generation_async(content_object=content_object, file_name=file_name,
fixity_algorithm=fixity_algorithm, fixity_value=fixity_value)
while status == "ACTIVE":
status = self.get_async_progress(pid)
return status
[docs]
def replace_generation_async(self, content_object: ContentObject, file_name, fixity_algorithm=None,
fixity_value=None) -> str:
"""
Replace the last active generation of a content object with a new digital file.
Starts the workflow and returns
"""
if (self.major_version < 7) and (self.minor_version < 2) and (self.patch_version < 1):
raise RuntimeError("replace API call is only available when connected to a v6.2.1 System")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'}
params = {"replaceType": "previous"}
if fixity_algorithm is None:
generation = self.generations(content_object=content_object).pop()
bitstream = generation.bitstreams.pop()
for algo, value in bitstream.fixity.items():
fixity_algorithm = algo
if "MD5" in fixity_algorithm.upper():
fixity_value = FileHash(hashlib.md5)(file_name)
if "SHA1" in fixity_algorithm.upper() or "SHA-1" in fixity_algorithm.upper():
fixity_value = FileHash(hashlib.sha1)(file_name)
if "SHA256" in fixity_algorithm.upper() or "SHA-256" in fixity_algorithm.upper():
fixity_value = FileHash(hashlib.sha256)(file_name)
if "SHA512" in fixity_algorithm.upper() or "SHA-512" in fixity_algorithm.upper():
fixity_value = FileHash(hashlib.sha512)(file_name)
if fixity_algorithm and fixity_value:
if "MD5" in fixity_algorithm.upper():
headers["Fixity-MD5"] = fixity_value
if "SHA1" in fixity_algorithm.upper() or "SHA-1" in fixity_algorithm.upper():
headers["Fixity-SHA-1"] = fixity_value
if "SHA256" in fixity_algorithm.upper() or "SHA-256" in fixity_algorithm.upper():
headers["Fixity-SHA-256"] = fixity_value
if "SHA512" in fixity_algorithm.upper() or "SHA-512" in fixity_algorithm.upper():
headers["Fixity-SHA-512"] = fixity_value
headers["Filename"] = os.path.basename(file_name)
headers['Content-Length'] = str(os.path.getsize(file_name))
with open(file_name, 'rb') as f:
request = self.session.post(
f'{self.protocol}://{self.server}/api/entity/{CO_PATH}/{content_object.reference}/generations',
params=params, data=f, headers=headers)
if request.status_code == requests.codes.ok:
return str(request.content.decode('utf-8'))
elif request.status_code == requests.codes.unauthorized:
return self.replace_generation_async(content_object=content_object, file_name=file_name,
fixity_algorithm=fixity_algorithm, fixity_value=fixity_value)
else:
exception = HTTPException(content_object.reference, request.status_code, request.url,
"replace_generation_async", request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def generations(self, content_object: ContentObject) -> list[Generation]:
"""
Retrieve list of generations on a content object
Returns list
:param content_object:
"""
headers = {HEADER_TOKEN: self.token}
request = self.session.get(
f'{self.protocol}://{self.server}/api/entity/{CO_PATH}/{content_object.reference}/generations',
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
entity_response = xml.etree.ElementTree.fromstring(xml_response)
generations = entity_response.findall(f'.//{{{self.entity_ns}}}Generation')
result = []
for g in generations:
if hasattr(g, 'text'):
generation = self.generation(g.text)
generation.asset = content_object.asset
generation.content_object = content_object
generation.representation_type = content_object.representation_type
result.append(generation)
return result
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.generations(content_object)
else:
exception = HTTPException(content_object.reference, request.status_code, request.url,
"generations", request.content.decode('utf-8'))
logger.error(exception)
raise exception
def bitstreams_for_asset(self, asset: Union[Asset, Entity]) -> Iterable[Bitstream]:
"""
Return all the bitstreams within an asset.
This includes all the representations and content objects
:param asset: The asset
:return:
"""
for representation in self.representations(asset):
for content_object in self.content_objects(representation):
for generation in self.generations(content_object):
if generation.active:
for bitstream in generation.bitstreams:
bitstream.representation = representation
bitstream.content_object = content_object
bitstream.generation = generation
yield bitstream
[docs]
def representations(self, asset: Asset) -> set[Representation]:
"""
Retrieve set of representations on an Asset
:param asset: The asset
:returns set[Representation]
"""
headers = {HEADER_TOKEN: self.token}
if not isinstance(asset, Asset):
return set()
request = self.session.get(
f'{self.protocol}://{self.server}/api/entity/{asset.path}/{asset.reference}/representations',
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
entity_response = xml.etree.ElementTree.fromstring(xml_response)
representations = entity_response.findall(f'.//{{{self.entity_ns}}}Representation')
result = set()
for r in representations:
representation = Representation(asset, r.get('type'), r.get("name", None), r.text)
result.add(representation)
return result
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.representations(asset)
else:
exception = HTTPException(asset.reference, request.status_code, request.url,
"representations", request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def remove_thumbnail(self, entity: Entity):
"""
remove a thumbnail icon to a folder or asset
:param entity: The Entity
"""
if self.major_version < 7 and self.minor_version < 2:
raise RuntimeError("Thumbnail API is only available when connected to a v6.2 System")
if isinstance(entity, ContentObject):
raise RuntimeError("Thumbnails cannot be added to Content Objects")
headers = {HEADER_TOKEN: self.token}
request = self.session.delete(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/preview',
headers=headers)
if request.status_code == requests.codes.no_content:
return str(request.content.decode('utf-8'))
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.remove_thumbnail(entity)
else:
exception = HTTPException(entity.reference, request.status_code, request.url,
"remove_thumbnail", request.content.decode('utf-8'))
logger.error(exception)
raise exception
def add_access_representation(self, entity: Entity, access_file: str, name: str = "Access"):
"""
Add a new representation to an existing asset.
:param entity: The existing asset which will receive the new representation
:param access_file: The new digital file
:param name: The name of the new access representation defaults to "Access"
:return:
"""
if self.major_version < 7 and self.minor_version < 12:
raise RuntimeError("Add Representation API is only available when connected to a v6.12 System")
if isinstance(entity, Folder) or isinstance(entity, ContentObject):
raise RuntimeError("Add Representation cannot be added to Folders and Content Objects")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'}
filename = os.path.basename(access_file)
params = {'type': 'Access', 'name': name, 'filename': filename}
with open(access_file, 'rb') as fd:
request = self.session.post(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/representations',
data=fd, headers=headers, params=params)
if request.status_code == requests.codes.accepted:
return str(request.content.decode('utf-8'))
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_access_representation(entity, access_file, name)
else:
exception = HTTPException(entity.reference, request.status_code, request.url,
"add_access_representation", request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def add_thumbnail(self, entity: Entity, image_file: str):
"""
add a thumbnail icon to a folder or asset
:param entity: The Entity
:param image_file: Path to image file
"""
if self.major_version < 7 and self.minor_version < 2:
raise RuntimeError("Thumbnail API is only available when connected to a v6.2 System")
if isinstance(entity, ContentObject):
raise RuntimeError("Thumbnails cannot be added to Content Objects")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'}
with open(image_file, 'rb') as fd:
request = self.session.put(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/preview',
data=fd, headers=headers)
if request.status_code == requests.codes.no_content:
return str(request.content.decode('utf-8'))
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_thumbnail(entity, image_file)
else:
exception = HTTPException(entity.reference, request.status_code, request.url,
"add_thumbnail", request.content.decode('utf-8'))
logger.error(exception)
raise exception
def _event_actions(self, entity: Entity, maximum: int):
"""
event actions performed against this entity
"""
if self.major_version < 7 and self.minor_version < 1:
logger.error("Entity events is only available when connected to a v6.1 System")
raise RuntimeError("Entity events is only available when connected to a v6.1 System")
headers = {HEADER_TOKEN: self.token}
params = {'start': str(0), 'max': str(maximum)}
request = self.session.get(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/event-actions',
params=params, headers=headers)
if request.status_code == requests.codes.ok:
pass
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._event_actions(entity, maximum=maximum)
else:
exception = HTTPException(entity.reference, request.status_code, request.url,
"_event_actions", request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def all_descendants(self, folder: Union[Folder, Entity] = None) -> Generator[Entity, None, None]:
"""
Retrieve list of entities below a folder in the repository
Returns list
:param folder: The folder to find children of
"""
for entity in self.descendants(folder=folder):
yield entity
if entity.entity_type == EntityType.FOLDER:
yield from self.all_descendants(folder=entity)
[docs]
def descendants(self, folder: Union[str, Folder] = None) -> Generator[Entity, None, None]:
maximum = 100
paged_set = self.children(folder, maximum=maximum, next_page=None)
for entity in paged_set.results:
yield entity
while paged_set.has_more:
paged_set = self.children(folder, maximum=maximum, next_page=paged_set.next_page)
for entity in paged_set.results:
yield entity
[docs]
def children(self, folder: Union[str, Folder] = None, maximum: int = 100, next_page: str = None) -> PagedSet:
headers = {HEADER_TOKEN: self.token}
data = {'start': str(0), 'max': str(maximum)}
folder_reference = folder
if next_page is None:
if folder_reference is None:
request = self.session.get(f'{self.protocol}://{self.server}/api/entity/root/children', params=data,
headers=headers)
else:
if hasattr(folder, "reference"):
folder_reference = folder.reference
request = self.session.get(
f'{self.protocol}://{self.server}/api/entity/structural-objects/{folder_reference}/children',
params=data, headers=headers)
else:
request = self.session.get(next_page, headers=headers)
logger.debug(request.url)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
children = entity_response.findall(f'.//{{{self.entity_ns}}}Child')
result = set()
next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next')
total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults')
for child in children:
if child.attrib['type'] == EntityType.FOLDER.value:
folder = Folder(child.attrib['ref'], child.attrib['title'], None, None, folder_reference, None)
result.add(folder)
else:
asset = Asset(child.attrib['ref'], child.attrib['title'], None, None, folder_reference, None)
result.add(asset)
has_more = True
url = None
if next_url is None:
has_more = False
else:
url = next_url.text
return PagedSet(result, has_more, int(total_hits.text), url)
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.children(folder_reference, maximum=maximum, next_page=next_page)
else:
exception = HTTPException(folder.reference, request.status_code, request.url,
"children", request.content.decode('utf-8'))
logger.error(exception)
raise exception
def all_ingest_events(self, previous_days: int = 1) -> Generator:
self.token = self.__token__()
previous = datetime.utcnow() - timedelta(days=previous_days)
from_date = previous.replace(tzinfo=timezone.utc).isoformat()
to_date = datetime.utcnow().replace(tzinfo=timezone.utc).isoformat()
paged_set = self._all_events_page(maximum=25, next_page=None, type="Ingest", from_date=from_date,
to_date=to_date)
for entity in paged_set.results:
yield entity
while paged_set.has_more:
paged_set = self._all_events_page(maximum=25, next_page=paged_set.next_page, type="Ingest",
from_date=from_date, to_date=to_date)
for entity in paged_set.results:
yield entity
[docs]
def all_events(self) -> Generator:
self.token = self.__token__()
paged_set = self._all_events_page()
for entity in paged_set.results:
yield entity
while paged_set.has_more:
paged_set = self._all_events_page(next_page=paged_set.next_page)
for entity in paged_set.results:
yield entity
def _entity_from_event_page(self, event_id: str, maximum: int = 25, next_page: str = None):
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
if next_page is None:
url = f'{self.protocol}://{self.server}/api/entity/events/{event_id}/event-actions'
response = requests.get(url, params={'start': 0, 'max': maximum}, headers=headers)
else:
response = requests.get(next_page, headers=headers)
if response.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._entity_from_event_page(event_id, maximum, next_page)
if response.status_code == 200:
xml_response = str(response.content.decode('utf-8'))
entity_response = xml.etree.ElementTree.fromstring(xml_response)
actions = entity_response.findall(f'.//{{{self.xip_ns}}}EventAction')
result_list = []
for action in actions:
entity_ref = action.findall(f'.//{{{self.xip_ns}}}Entity')
for refs in entity_ref:
result_list.append(refs.text)
next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next')
total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults')
has_more = True
url = None
if next_url is None:
has_more = False
else:
url = next_url.text
return PagedSet(result_list, has_more, int(total_hits.text), url)
def entity_from_event(self, event_id: str) -> Generator:
self.token = self.__token__()
paged_set = self._entity_from_event_page(event_id, 25, None)
for entity in paged_set.results:
yield entity
while paged_set.has_more:
paged_set = self._entity_from_event_page(event_id, 25, next_page=paged_set.next_page)
for entity in paged_set.results:
yield entity
def _all_events_page(self, maximum: int = 25, next_page: str = None, **kwargs) -> PagedSet:
"""
event actions performed against this repository
"""
headers = {HEADER_TOKEN: self.token}
params = {'start': str(0), 'max': str(maximum)}
if "type" in kwargs:
params["types"] = kwargs.get("type")
if "from_date" in kwargs:
params["from"] = kwargs.get("from_date")
if "to_date" in kwargs:
params["to"] = kwargs.get("to_date")
if next_page is None:
request = self.session.get(f'{self.protocol}://{self.server}/api/entity/events', params=params,
headers=headers)
else:
request = self.session.get(next_page, headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
events = entity_response.findall(f'.//{{{self.xip_ns}}}Event')
result_list = []
for event in events:
result = {'eventType': event.attrib['type']}
date_node = event.find(f'.//{{{self.xip_ns}}}Date')
result['Date'] = date_node.text if hasattr(date_node, 'text') else None
user_node = event.find(f'.//{{{self.xip_ns}}}User')
result['User'] = user_node.text if hasattr(user_node, 'text') else None
ref_node = event.find(f'.//{{{self.xip_ns}}}Ref')
result['Ref'] = ref_node.text if hasattr(ref_node, 'text') else None
workflow_name = event.find(f'.//{{{self.xip_ns}}}WorkflowName')
if workflow_name is not None:
result['WorkflowName'] = workflow_name.text
workflow_instance_id = event.find(f'.//{{{self.xip_ns}}}WorkflowInstanceId')
if workflow_instance_id is not None:
result['WorkflowInstanceId'] = workflow_instance_id.text
serialised_command = event.find(f'.//{{{self.xip_ns}}}SerialisedCommand')
if serialised_command is not None:
result['SerialisedCommand'] = serialised_command.text
result_list.append(result)
next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next')
total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults')
has_more = True
url = None
if next_url is None:
has_more = False
else:
url = next_url.text
return PagedSet(result_list, has_more, int(total_hits.text), url)
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._all_events_page(maximum, next_page, **kwargs)
else:
exception = HTTPException("", request.status_code, request.url,
"_all_events_page", request.content.decode('utf-8'))
logger.error(exception)
raise exception
def _entity_events_page(self, entity: Entity, maximum: int = 25, next_page: str = None) -> PagedSet:
"""
event actions performed against this entity
"""
if self.major_version < 7 and self.minor_version < 1:
logger.error("Entity events is only available when connected to a v6.1 System")
raise RuntimeError("Entity events is only available when connected to a v6.1 System")
headers = {HEADER_TOKEN: self.token}
params = {'start': str(0), 'max': str(maximum)}
if next_page is None:
request = self.session.get(
f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}/event-actions',
params=params, headers=headers)
else:
request = self.session.get(next_page, headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
event_actions = entity_response.findall(f'.//{{{self.xip_ns}}}EventAction')
result_list = []
for event_action in event_actions:
result = {'commandType': ''}
if 'commandType' in event_action.attrib:
result['commandType'] = event_action.attrib['commandType']
event = event_action.find(f'.//{{{self.xip_ns}}}Event')
if 'type' in event_action.attrib:
result['eventType'] = event.attrib['type']
date_node = event.find(f'.//{{{self.xip_ns}}}Date')
if date_node is not None:
result['Date'] = date_node.text
user_node = event.find(f'.//{{{self.xip_ns}}}User')
if user_node is not None:
result['User'] = user_node.text
ref_node = event.find(f'.//{{{self.xip_ns}}}Ref')
if ref_node is not None:
result['Ref'] = ref_node.text
workflow_name = event.find(f'.//{{{self.xip_ns}}}WorkflowName')
if workflow_name is not None:
result['WorkflowName'] = workflow_name.text
workflow_instance_id = event.find(f'.//{{{self.xip_ns}}}WorkflowInstanceId')
if workflow_instance_id is not None:
result['WorkflowInstanceId'] = workflow_instance_id.text
serialised_command = event_action.find(f'.//{{{self.xip_ns}}}SerialisedCommand')
if serialised_command is not None:
result['SerialisedCommand'] = serialised_command.text
result_list.append(result)
next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next')
total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults')
has_more = True
url = None
if next_url is None:
has_more = False
else:
url = next_url.text
return PagedSet(result_list, has_more, int(total_hits.text), url)
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._entity_events_page(entity)
else:
exception = HTTPException(entity.reference, request.status_code, request.url,
"_all_events_page", request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def entity_events(self, entity: Entity) -> Generator:
self.token = self.__token__()
paged_set = self._entity_events_page(entity)
for entity in paged_set.results:
yield entity
while paged_set.has_more:
paged_set = self._entity_events_page(entity, next_page=paged_set.next_page)
for entity in paged_set.results:
yield entity
[docs]
def updated_entities(self, previous_days: int = 1) -> Generator:
self.token = self.__token__()
maximum = 25
paged_set = self._updated_entities_page(previous_days=previous_days, maximum=maximum, next_page=None)
for entity in paged_set.results:
yield entity
while paged_set.has_more:
paged_set = self._updated_entities_page(previous_days=previous_days, maximum=maximum,
next_page=paged_set.next_page)
for entity in paged_set.results:
yield entity
def _updated_entities_page(self, previous_days: int = 1, maximum: int = 50, next_page: str = None) -> PagedSet:
headers = {HEADER_TOKEN: self.token}
x = datetime.utcnow() - timedelta(days=previous_days)
today = x.replace(tzinfo=timezone.utc).isoformat()
if next_page is None:
params = {'date': today, 'start': '0', 'max': str(maximum)}
request = self.session.get(f'{self.protocol}://{self.server}/api/entity/entities/updated-since',
headers=headers, params=params)
else:
request = self.session.get(next_page, headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
entities = entity_response.findall(f'.//{{{self.entity_ns}}}Entity')
result = []
for entity in entities:
if 'type' in entity.attrib:
if entity.attrib['type'] == EntityType.FOLDER.value:
folder = Folder(entity.attrib['ref'], entity.attrib['title'], None, None, None, None)
result.append(folder)
elif entity.attrib['type'] == EntityType.ASSET.value:
asset = Asset(entity.attrib['ref'], entity.attrib['title'], None, None, None, None)
result.append(asset)
elif entity.attrib['type'] == EntityType.CONTENT_OBJECT.value:
co = ContentObject(entity.attrib['ref'], entity.attrib['title'], None, None, None, None)
result.append(co)
next_url = entity_response.find(f'.//{{{self.entity_ns}}}Next')
total_hits = entity_response.find(f'.//{{{self.entity_ns}}}TotalResults')
has_more = True
url = None
if next_url is None:
has_more = False
else:
url = next_url.text
return PagedSet(result, has_more, int(total_hits.text), url)
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._updated_entities_page(previous_days=previous_days, maximum=maximum,
next_page=next_page)
else:
exception = HTTPException(previous_days, request.status_code, request.url,
"_updated_entities_page", request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def delete_asset(self, asset: Asset, operator_comment: str, supervisor_comment: str):
"""
Delete an asset from the repository
:param asset: The Asset
:param operator_comment: The operator comment on the deletion
:param supervisor_comment: The supervisor comment on the deletion
"""
if isinstance(asset, Asset):
return self._delete_entity(asset, operator_comment, supervisor_comment)
else:
raise RuntimeError("delete_asset only deletes assets")
[docs]
def delete_folder(self, folder: Folder, operator_comment: str, supervisor_comment: str):
"""
Delete an asset from the repository
:param folder: The Folder
:param operator_comment: The operator comment on the deletion
:param supervisor_comment: The supervisor comment on the deletion
"""
if isinstance(folder, Folder):
return self._delete_entity(folder, operator_comment, supervisor_comment)
else:
raise RuntimeError("delete_folder only deletes folders")
def _delete_entity(self, entity: Entity, operator_comment: str, supervisor_comment: str):
"""
Delete an asset from the repository
:param entity: The entity
:param operator_comment: The comment on the deletion
"""
# check manager password is available:
config = configparser.ConfigParser()
config.read('credentials.properties', encoding='utf-8')
try:
manager_username = config['credentials']['manager.username']
manager_password = config['credentials']['manager.password']
self.manager_token(manager_username, manager_password)
except KeyError:
raise RuntimeError("No manager password set in credentials.properties")
self.token = self.__token__()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_object = xml.etree.ElementTree.Element('DeletionAction',
{"xmlns:xip": self.xip_ns, "xmlns": self.entity_ns})
submission_el = xml.etree.ElementTree.SubElement(xml_object, "Submission")
comment_el = xml.etree.ElementTree.SubElement(submission_el, "Comment")
comment_el.text = operator_comment
xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8')
logger.debug(xml_request)
request = self.session.delete(f'{self.protocol}://{self.server}/api/entity/{entity.path}/{entity.reference}',
data=xml_request, headers=headers)
logger.debug(request.content.decode("utf-8"))
if request.status_code == requests.codes.accepted:
progress = request.content.decode("utf-8")
req = self.session.get(f"{self.protocol}://{self.server}/api/entity/progress/{progress}", headers=headers)
while True:
if req.status_code == requests.codes.ok:
entity_response = xml.etree.ElementTree.fromstring(req.content.decode("utf-8"))
status = entity_response.find(".//{http://status.preservica.com}Status")
if hasattr(status, 'text'):
if status.text == "PENDING":
headers = {HEADER_TOKEN: self.manager_token(manager_username, manager_password),
'Content-Type': 'application/xml;charset=UTF-8'}
xml_object = xml.etree.ElementTree.Element('DeletionAction ', {"xmlns:xip": self.xip_ns,
"xmlns": self.entity_ns})
approval_el = xml.etree.ElementTree.SubElement(xml_object, "Approval")
xml.etree.ElementTree.SubElement(approval_el, "Approved").text = "true"
xml.etree.ElementTree.SubElement(approval_el, "Comment").text = supervisor_comment
xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8')
logger.debug(xml_request)
approve = self.session.put(
f"{self.protocol}://{self.server}/api/entity/actions/deletions/{progress}",
data=xml_request, headers=headers)
if approve.status_code == requests.codes.accepted:
return entity.reference
else:
logger.error(approve.content.decode('utf-8'))
raise RuntimeError(approve.status_code, "delete_asset failed during approval")
sleep(2.0)
req = self.session.get(f"{self.protocol}://{self.server}/api/entity/progress/{progress}",
headers=headers)
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._delete_entity(entity, operator_comment, supervisor_comment)
if request.status_code == requests.codes.unprocessable:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "no active workflow context for full deletion exists in the system")
if request.status_code == requests.codes.forbidden:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "User doesn't have deletion rights on the "
"entity or the required operator role to evaluate a deletion")
exception = HTTPException(entity.reference, request.status_code, request.url,
"_delete_entity", request.content.decode('utf-8'))
logger.error(exception)
raise exception