Source code for pyPreservica.adminAPI
"""
pyPreservica AdminAPI module definition
A client library for the Preservica Repository web Administration and Management API
https://us.preservica.com/api/admin/documentation.html
author: James Carr
licence: Apache License 2.0
"""
import csv
import xml.etree.ElementTree
from typing import List, Any
from pyPreservica.common import *
logger = logging.getLogger(__name__)
[docs]
class AdminAPI(AuthenticatedAPI):
[docs]
def delete_system_role(self, role_name):
"""
Delete a system role
:param role_name: The role to delete
:type role_name: str
"""
if (self.major_version < 6) or (self.major_version == 6 and self.minor_version < 5):
raise RuntimeError(
"delete_system_role API call is only available with a Preservica v6.5.0 system or higher")
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.delete(f'{self.protocol}://{self.server}/api/admin/security/roles/{role_name}',
headers=headers)
if request.status_code == requests.codes.no_content:
return None
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_system_role failed")
[docs]
def delete_security_tag(self, tag_name):
"""
Delete a security tag
:param tag_name: The security tag to delete
:type tag_name: str
"""
if (self.major_version < 6) or (self.major_version == 6 and self.minor_version < 4):
raise RuntimeError(
"delete_security_tag API call is only available with a Preservica v6.4.0 system or higher")
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.delete(f'{self.protocol}://{self.server}/api/admin/security/tags/{tag_name}',
headers=headers)
if request.status_code == requests.codes.no_content:
return None
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_security_tag failed")
[docs]
def add_system_role(self, role_name) -> str:
"""
Create a new user role
:param role_name: The new role
:type role_name: str
:return: The new role
:rtype: str
"""
self._check_if_user_has_user_manager_role()
if not role_name:
raise RuntimeError("Invalid Role Name (Empty)")
if (self.major_version < 6) or ( (self.major_version == 6) and (self.minor_version < 5) ):
raise RuntimeError("add_system_role API call is only available with a Preservica v6.5.0 system or higher")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_tag = xml.etree.ElementTree.Element('Role', {"xmlns": self.admin_ns})
xml_tag.text = str(role_name).strip()
xml_request = xml.etree.ElementTree.tostring(xml_tag, encoding='utf-8')
request = self.session.post(f'{self.protocol}://{self.server}/api/admin/security/roles', data=xml_request,
headers=headers)
if request.status_code == requests.codes.created:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
if not entity_response.text:
raise RuntimeError("add_system_role returned an empty role name")
return entity_response.text
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_system_role failed")
[docs]
def add_security_tag(self, tag_name) -> str:
"""
Create a new security tag
:param tag_name: The new security tag
:type tag_name: str
:return: The new security tag
:rtype: str
"""
self._check_if_user_has_user_manager_role()
if not tag_name:
raise RuntimeError("Invalid Tag Name (Empty)")
if (self.major_version < 6) or (self.major_version == 6 and self.minor_version < 4):
raise RuntimeError("add_security_tag API call is only available with a Preservica v6.4.0 system or higher")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_tag = xml.etree.ElementTree.Element('Tag', {"xmlns": self.admin_ns})
xml_tag.text = str(tag_name).strip()
xml_request = xml.etree.ElementTree.tostring(xml_tag, encoding='utf-8')
request = self.session.post(f'{self.protocol}://{self.server}/api/admin/security/tags', data=xml_request,
headers=headers)
if request.status_code == requests.codes.created:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
if not entity_response.text:
raise RuntimeError("add_security_tag returned an empty tag name")
return entity_response.text
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_security_tag failed")
[docs]
def system_roles(self) -> list[str]:
"""
List all the user access roles in the system
:return: list of roles
:rtype: list
"""
if (self.major_version < 6) or (self.major_version == 6 and self.minor_version < 5):
raise RuntimeError(
"system_roles API call is only available with a Preservica v6.5.0 system or higher")
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f'{self.protocol}://{self.server}/api/admin/security/roles', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
roles = entity_response.findall(f'.//{{{self.admin_ns}}}Role')
return [role.text for role in roles if role.text is not None]
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "system_roles failed")
[docs]
def security_tags(self) -> list[str]:
"""
List all the security tags in the system
:return: list of security tags
:rtype: list
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f'{self.protocol}://{self.server}/api/admin/security/tags', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
tags = entity_response.findall(f'.//{{{self.admin_ns}}}Tag')
return [tag.text for tag in tags if tag.text is not None]
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "security_tags failed")
[docs]
def delete_user(self, username: str):
"""
Delete a user
:param username: email address of the preservica user
:type username: str
"""
self._check_if_user_has_manager_role()
self.disable_user(username)
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.delete(f'{self.protocol}://{self.server}/api/admin/users/{username}', headers=headers)
if request.status_code == requests.codes.no_content:
return None
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_user failed")
[docs]
def add_user(self, username: str, full_name: str, roles: list[str], externally_authenticated: bool = False) -> dict:
"""
Add a new user
:param externally_authenticated:
:param username: email address of the preservica user
:type username: str
:param full_name: Users real name
:type full_name: str
:param roles: List of roles assigned to the user
:type roles: list
:return: dictionary of user attributes
:rtype: dict
"""
self._check_if_user_has_user_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_object = xml.etree.ElementTree.Element('User', {"xmlns": self.admin_ns})
xml.etree.ElementTree.SubElement(xml_object, "FullName").text = full_name
xml.etree.ElementTree.SubElement(xml_object, "Email").text = username
if externally_authenticated:
xml.etree.ElementTree.SubElement(xml_object, "externallyAuthenticated").text = "true"
xml.etree.ElementTree.SubElement(xml_object, "userName").text = username
xml_roles = xml.etree.ElementTree.SubElement(xml_object, "Roles")
for role in roles:
xml.etree.ElementTree.SubElement(xml_roles, "Role").text = role
xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8')
logger.debug(xml_request)
params = {"source": "UX2"}
request = self.session.post(f'{self.protocol}://{self.server}/api/admin/users', data=xml_request,
headers=headers, params=params)
if request.status_code == requests.codes.created:
return self.user_details(username)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_user failed")
[docs]
def change_user_display_name(self, username: str, new_display_name: str) -> dict:
"""
Change the user display name
:param username: email address of the preservica user
:type username: str
:param new_display_name: Users real name
:type new_display_name: str
:return: dictionary of user attributes
:rtype: dict
"""
self._check_if_user_has_user_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f"{self.protocol}://{self.server}/api/admin/users/{username}", headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
fullname = entity_response.find(f'.//{{{self.admin_ns}}}FullName')
fullname.text = new_display_name
xml_request = xml.etree.ElementTree.tostring(entity_response, encoding='utf-8')
logger.debug(xml_request)
update_request = self.session.put(f'{self.protocol}://{self.server}/api/admin/users/{username}',
data=xml_request,
headers=headers)
if update_request.status_code == requests.codes.ok:
return self.user_details(username)
else:
logger.error(update_request.content.decode('utf-8'))
raise RuntimeError(update_request.status_code, "change_user_display_name failed")
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "change_user_display_name failed")
[docs]
def current_user(self):
"""
Returns details about the current authenticated user
:return: dictionary of user attributes
:rtype: dict
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/json;charset=UTF-8'}
request = self.session.get(f"{self.protocol}://{self.server}/api/user/details", headers=headers)
if request.status_code == requests.codes.ok:
json_response = str(request.content.decode('utf-8'))
logger.debug(json_response)
return json.loads(json_response)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "current_user failed")
[docs]
def user_details(self, username: str) -> dict:
"""
Get the details of a user by their username
:param username: email address of the preservica user
:type username: str
:return: dictionary of user attributes
:rtype: dict
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f"{self.protocol}://{self.server}/api/admin/users/{username}", headers=headers)
return_dict = {}
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
user_name = entity_response.find(f'.//{{{self.admin_ns}}}UserName')
return_dict['UserName'] = user_name.text
fullname = entity_response.find(f'.//{{{self.admin_ns}}}FullName')
return_dict['FullName'] = fullname.text
email = entity_response.find(f'.//{{{self.admin_ns}}}Email')
return_dict['Email'] = email.text
tenant = entity_response.find(f'.//{{{self.admin_ns}}}Tenant')
return_dict['Tenant'] = tenant.text
enable = entity_response.find(f'.//{{{self.admin_ns}}}Enabled')
return_dict['Enabled'] = bool(enable.text == "true")
roles = entity_response.findall(f'.//{{{self.admin_ns}}}Role')
return_dict['Roles'] = [role.text for role in roles if role.text is not None]
return return_dict
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "user_details failed")
def _account_status_(self, username: str, status: str, name: str):
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain;charset=UTF-8'}
data = {"userEnabledStatus": status}
request = self.session.put(f"{self.protocol}://{self.server}/api/admin/users/{username}/enabled",
headers=headers,
data=data)
if request.status_code == requests.codes.ok:
return request.content.decode("utf-8")
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, f"{name} failed")
[docs]
def disable_user(self, username):
"""
Disable a Preservica User to prevent them logging in
:param username: email address of the preservica user
:type username: str
"""
self._check_if_user_has_manager_role()
return self._account_status_(username, "false", "disable_user")
[docs]
def enable_user(self, username):
"""
Enable a Preservica User
:param username: email address of the preservica user
:type username: str
"""
self._check_if_user_has_manager_role()
return self._account_status_(username, "true", "enable_user")
[docs]
def user_report(self, report_name="users.csv"):
"""
Create a report on all tenancy users
:return:
"""
self._check_if_user_has_manager_role()
fieldnames = ['UserName', 'FullName', 'Email', 'Tenant', 'Enabled', 'Roles']
with open(report_name, newline='', mode="wt", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for username in self.all_users():
user_details = self.user_details(username)
writer.writerow(user_details)
[docs]
def all_users(self) -> list[str]:
"""
Return a list of all users in the system
:return list of usernames:
:rtype: list
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f"{self.protocol}://{self.server}/api/admin/users", headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
users = entity_response.findall(f'.//{{{self.admin_ns}}}User')
return [user.text for user in users if user.text is not None]
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "all_users failed")
[docs]
def add_xml_schema(self, name: str, description: str, originalName: str, xml_data: Any):
"""
Add a new XSD document to Preservica
:param name: Name for the XSD schema
:type name: str
:param description: Description for the XSD schema
:type description: str
:param originalName: The original file name for the schema on disk
:type originalName: str
:param xml_data: The xml schema as a UTF-8 string or a file like object
:type xml_data: Any
:return:
:rtype: None
"""
self._check_if_user_has_config_manager_role()
params = {"name": name, "description": description, "originalName": originalName}
if isinstance(xml_data, str):
xml.etree.ElementTree.fromstring(xml_data)
xml_data = xml_data.encode("utf-8")
elif hasattr(xml_data, "read"):
pass
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.post(f"{self.protocol}://{self.server}/api/admin/schemas", headers=headers,
params=params,
data=xml_data)
if request.status_code == requests.codes.created:
return None
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_xml_schema failed")
[docs]
def add_xml_document(self, name: str, xml_data: Any, document_type: str = "MetadataTemplate"):
"""
Add a new XML document to Preservica
The default type of XML document is a descriptive metadata template
Options are:
MetadataDropdownLists -> Authority Lists
CustomIndexDefinition -> Custom Search Indexes
MetadataTemplate -> Metadata Template
UploadWizardConfigurationFile -> Upload Wizard Config
ConfigurationFile -> Heritrix Config File
:param name: The name of the xml document
:type name: str
:param xml_data: The xml schema as a UTF-8 string or as a file like object
:type xml_data:
:param document_type: The type of the XML document, defaults to descriptive metadata templates
:type document_type: str
:return:
:rtype: None
"""
self._check_if_user_has_config_manager_role()
params = {"name": name, "type": document_type}
if isinstance(xml_data, str):
xml.etree.ElementTree.fromstring(xml_data)
xml_data = xml_data.encode("utf-8")
elif hasattr(xml_data, "read"):
pass
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.post(f"{self.protocol}://{self.server}/api/admin/documents", headers=headers,
params=params,
data=xml_data)
if request.status_code == requests.codes.created:
return None
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_xml_document failed")
[docs]
def delete_xml_document(self, uri: str):
"""
Delete an XML document from Preservica's XML document store
:param uri: The URI of the xml document to delete
:type uri: str
:return:
:rtype: None
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for document in self.xml_documents():
if document['SchemaUri'] == uri.strip():
request = self.session.delete(
f"{self.protocol}://{self.server}/api/admin/documents/{document['ApiId']}",
headers=headers)
if request.status_code == requests.codes.no_content:
return None
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_xml_document failed")
return None
[docs]
def delete_xml_schema(self, uri: str):
"""
Delete an XML schema from Preservica
:param uri: The URI of the xml schema to delete
:type uri: str
:return:
:rtype: None
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for schema in self.xml_schemas():
if schema['SchemaUri'] == uri.strip():
request = self.session.delete(f"{self.protocol}://{self.server}/api/admin/schemas/{schema['ApiId']}",
headers=headers)
if request.status_code == requests.codes.no_content:
return None
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_xml_schema failed")
return None
[docs]
def xml_schema(self, uri: str) -> str | None:
"""
Fetch the metadata schema XSD document as a string by its URI
:param uri: The URI of the xml schema
:type uri: str
:return: The XML schema as a string
:rtype: str
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for schema in self.xml_schemas():
if schema['SchemaUri'] == uri.strip():
request = self.session.get(
f"{self.protocol}://{self.server}/api/admin/schemas/{schema['ApiId']}/content",
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
return xml_response
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_schema failed")
return None
[docs]
def xml_document(self, uri: str) -> str | None:
"""
fetch the metadata XML document as a string by its URI
:param uri: The URI of the xml document
:type uri: str
:return: The XML document as a string
:rtype: str
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for document in self.xml_documents():
if document['SchemaUri'] == uri.strip():
request = self.session.get(
f"{self.protocol}://{self.server}/api/admin/documents/{document['ApiId']}/content",
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
return xml_response
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_document failed")
return None
[docs]
def xml_documents(self) -> List:
"""
fetch the list of XML documents stored in Preservica
:return: List of XML documents stored in Preservica
:rtype: list
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f'{self.protocol}://{self.server}/api/admin/documents', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
documents = entity_response.findall(f'.//{{{self.admin_ns}}}Document')
results = list()
for document in documents:
document_dict = {}
api_id = document.find(f'.//{{{self.admin_ns}}}ApiId')
name = document.find(f'.//{{{self.admin_ns}}}Name')
document_type = document.find(f'.//{{{self.admin_ns}}}DocumentType')
schema_uri = document.find(f'.//{{{self.admin_ns}}}SchemaUri')
document_dict['SchemaUri'] = schema_uri.text
document_dict['Name'] = name.text
document_dict['DocumentType'] = document_type.text
document_dict['ApiId'] = api_id.text
results.append(document_dict)
return results
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_documents failed")
[docs]
def xml_schemas(self) -> List:
"""
fetch the list of metadata schema XSD documents stored in Preservica
:return: List of XML schema's stored in Preservica
:rtype: list
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f'{self.protocol}://{self.server}/api/admin/schemas', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
schemas = entity_response.findall(f'.//{{{self.admin_ns}}}Schema')
results = []
for schema in schemas:
schema_dict = {}
schema_uri = schema.find(f'.//{{{self.admin_ns}}}SchemaUri')
name = schema.find(f'.//{{{self.admin_ns}}}Name')
description = schema.find(f'.//{{{self.admin_ns}}}Description')
aip_id = schema.find(f'.//{{{self.admin_ns}}}ApiId')
schema_dict['SchemaUri'] = schema_uri.text
schema_dict['Name'] = name.text
if description is not None:
schema_dict['Description'] = description.text
else:
schema_dict['Description'] = ""
schema_dict['ApiId'] = aip_id.text
results.append(schema_dict)
return results
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_schemas failed")
[docs]
def xml_transforms(self) -> List:
"""
fetch the list of xml transforms stored in Preservica
:return: List of XML transforms stored in Preservica
:rtype: list
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f'{self.protocol}://{self.server}/api/admin/transforms', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
transforms = entity_response.findall(f'.//{{{self.admin_ns}}}Transform')
results = []
for transform in transforms:
transform_dict = {}
to_schema_uri = transform.find(f'.//{{{self.admin_ns}}}ToSchemaUri')
from_schema_uri = transform.find(f'.//{{{self.admin_ns}}}FromSchemaUri')
name = transform.find(f'.//{{{self.admin_ns}}}Name')
purpose = transform.find(f'.//{{{self.admin_ns}}}Purpose')
aip_id = transform.find(f'.//{{{self.admin_ns}}}ApiId')
if to_schema_uri is not None:
transform_dict['ToSchemaUri'] = to_schema_uri.text
else:
transform_dict['ToSchemaUri'] = ""
if from_schema_uri is not None:
transform_dict['FromSchemaUri'] = from_schema_uri.text
else:
transform_dict['FromSchemaUri'] = ""
transform_dict['Name'] = name.text
transform_dict['Purpose'] = purpose.text
transform_dict['ApiId'] = aip_id.text
results.append(transform_dict)
return results
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_transforms failed")
[docs]
def xml_transform(self, input_uri: str, output_uri: str) -> str | None:
"""
fetch the XML transform as a string by its URIs
:param input_uri: The URI of the input XML document
:type input_uri: str
:param output_uri: The URI of the output XML document
:type output_uri: str
:return: The XML transform as a string
:rtype: str
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for transform in self.xml_transforms():
if (transform['FromSchemaUri'] == input_uri.strip()) and (transform['ToSchemaUri'] == output_uri.strip()):
request = self.session.get(
f"{self.protocol}://{self.server}/api/admin/transforms/{transform['ApiId']}/content",
headers=headers)
if request.status_code == requests.codes.ok:
return str(request.content.decode('utf-8'))
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_transform failed")
return None
[docs]
def delete_xml_transform(self, input_uri: str, output_uri: str):
"""
Delete a XSD document from Preservica
:param input_uri: The URI of the input XML document
:type input_uri: str
:param output_uri: The URI of the output XML document
:type output_uri: str
:return:
:rtype: None
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for transform in self.xml_transforms():
if (transform['FromSchemaUri'] == input_uri.strip()) and (transform['ToSchemaUri'] == output_uri.strip()):
request = self.session.delete(
f"{self.protocol}://{self.server}/api/admin/transforms/{transform['ApiId']}",
headers=headers)
if request.status_code == requests.codes.no_content:
return None
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_xml_transform failed")
return None
[docs]
def add_xml_transform(self, name: str, input_uri: str, output_uri: str, purpose: str, originalName: str,
xml_data: Any):
"""
Add a new XML transform to Preservica
:param name: The name of the XML transform
:type name: str
:param input_uri: The URI of the input XML document
:type input_uri: str
:param output_uri: The URI of the output XML document
:type output_uri: str
:param purpose: The purpose of the transform, "transform" , "edit", "view"
:type purpose: str
:param originalName: The original file name of the transform
:type originalName: str
:param xml_data: The transform xml as a string or file like object
:type xml_data: Any
:return:
:rtype: None
"""
self._check_if_user_has_config_manager_role()
params = {"name": name, "from": input_uri, "to": output_uri, "purpose": purpose.lower(),
"originalName": originalName}
if isinstance(xml_data, str):
xml.etree.ElementTree.fromstring(xml_data)
xml_data = xml_data.encode("utf-8")
elif hasattr(xml_data, "read"):
pass
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.post(f"{self.protocol}://{self.server}/api/admin/transforms", headers=headers,
params=params,
data=xml_data)
if request.status_code == requests.codes.created:
return None
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_xml_transform failed")