"""
pyPreservica AdminAPI module definition
A client library for the Preservica Repository web Administration and Management API
https://us.preservica.com/api/admin/documentation.html
author: James Carr
licence: Apache License 2.0
"""
import csv
import xml.etree.ElementTree
from typing import List, Any
from pyPreservica.common import *
logger = logging.getLogger(__name__)
[docs]
class AdminAPI(AuthenticatedAPI):
[docs]
def delete_system_role(self, role_name):
"""
Delete a system role
:param role_name: The role to delete
:type role_name: str
"""
if (self.major_version < 7) and (self.minor_version < 5):
raise RuntimeError(
"delete_system_role API call is only available with a Preservica v6.5.0 system or higher")
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.delete(f'{self.protocol}://{self.server}/api/admin/security/roles/{role_name}',
headers=headers)
if request.status_code == requests.codes.no_content:
return
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.delete_system_role(role_name)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_system_role failed")
[docs]
def delete_security_tag(self, tag_name):
"""
Delete a security tag
:param tag_name: The security tag to delete
:type tag_name: str
"""
if (self.major_version < 7) and (self.minor_version < 4):
raise RuntimeError(
"delete_security_tag API call is only available with a Preservica v6.4.0 system or higher")
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.delete(f'{self.protocol}://{self.server}/api/admin/security/tags/{tag_name}',
headers=headers)
if request.status_code == requests.codes.no_content:
return
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.delete_security_tag(tag_name)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_security_tag failed")
[docs]
def add_system_role(self, role_name) -> str:
"""
Create a new user roles
:param role_name: The new role
:type role_name: str
:return: The new role
:rtype: str
"""
if (self.major_version < 7) and (self.minor_version < 5):
raise RuntimeError("add_system_role API call is only available with a Preservica v6.5.0 system or higher")
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_tag = xml.etree.ElementTree.Element('Role', {"xmlns": self.admin_ns})
xml_tag.text = str(role_name).strip()
xml_request = xml.etree.ElementTree.tostring(xml_tag, encoding='utf-8')
request = self.session.post(f'{self.protocol}://{self.server}/api/admin/security/roles', data=xml_request,
headers=headers)
if request.status_code == requests.codes.created:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
return entity_response.text
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_system_role(role_name)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_system_role failed")
[docs]
def add_security_tag(self, tag_name) -> str:
"""
Create a new security tag
:param tag_name: The new security tag
:type tag_name: str
:return: The new security tag
:rtype: str
"""
if (self.major_version < 7) and (self.minor_version < 4):
raise RuntimeError("add_security_tag API call is only available with a Preservica v6.4.0 system or higher")
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_tag = xml.etree.ElementTree.Element('Tag', {"xmlns": self.admin_ns})
xml_tag.text = str(tag_name).strip()
xml_request = xml.etree.ElementTree.tostring(xml_tag, encoding='utf-8')
request = self.session.post(f'{self.protocol}://{self.server}/api/admin/security/tags', data=xml_request,
headers=headers)
if request.status_code == requests.codes.created:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
return entity_response.text
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_security_tag(tag_name)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_security_tag failed")
[docs]
def system_roles(self) -> list:
"""
List all the user access roles in the system
:return: list of roles
:rtype: list
"""
self._check_if_user_has_manager_role()
if (self.major_version < 7) and (self.minor_version < 5):
raise RuntimeError(
"system_roles API call is only available with a Preservica v6.5.0 system or higher")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f'{self.protocol}://{self.server}/api/admin/security/roles', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
roles = entity_response.findall(f'.//{{{self.admin_ns}}}Role')
security_roles = []
for role in roles:
security_roles.append(role.text)
return security_roles
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.roles()
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "roles failed")
[docs]
def delete_user(self, username: str):
"""
Delete a user
:param username: email address of the preservica user
:type username: str
"""
self._check_if_user_has_manager_role()
self.disable_user(username)
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.delete(f'{self.protocol}://{self.server}/api/admin/users/{username}', headers=headers)
if request.status_code == requests.codes.no_content:
return
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.delete_user(username)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_user failed")
[docs]
def add_user(self, username: str, full_name: str, roles: list, externally_authenticated: bool = False):
"""
Add a new user
:param externally_authenticated:
:param username: email address of the preservica user
:type username: str
:param full_name: Users real name
:type full_name: str
:param roles: List of roles assigned to the user
:type roles: list
:return: dictionary of user attributes
:rtype: dict
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
xml_object = xml.etree.ElementTree.Element('User ', {"xmlns": self.admin_ns})
xml.etree.ElementTree.SubElement(xml_object, "FullName").text = full_name
xml.etree.ElementTree.SubElement(xml_object, "Email").text = username
if externally_authenticated:
xml.etree.ElementTree.SubElement(xml_object, "externallyAuthenticated").text = "true"
xml.etree.ElementTree.SubElement(xml_object, "userName").text = username
xml_roles = xml.etree.ElementTree.SubElement(xml_object, "Roles")
for role in roles:
xml.etree.ElementTree.SubElement(xml_roles, "Role").text = role
xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8')
logger.debug(xml_request)
request = self.session.post(f'{self.protocol}://{self.server}/api/admin/users', data=xml_request,
headers=headers)
if request.status_code == requests.codes.created:
return self.user_details(username)
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_user(username, full_name, roles)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_user failed")
[docs]
def change_user_display_name(self, username: str, new_display_name: str) -> dict:
"""
Change the user display name
:param username: email address of the preservica user
:type username: str
:param new_display_name: Users real name
:type new_display_name: str
:return: dictionary of user attributes
:rtype: dict
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f"{self.protocol}://{self.server}/api/admin/users/{username}", headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
fullname = entity_response.find(f'.//{{{self.admin_ns}}}FullName')
fullname.text = new_display_name
xml_request = xml.etree.ElementTree.tostring(entity_response, encoding='utf-8')
logger.debug(xml_request)
update_request = self.session.put(f'{self.protocol}://{self.server}/api/admin/users/{username}',
data=xml_request,
headers=headers)
if update_request.status_code == requests.codes.ok:
return self.user_details(username)
elif update_request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.change_user_display_name(username, new_display_name)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "change_user_display_name failed")
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.change_user_display_name(username, new_display_name)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "change_user_display_name failed")
[docs]
def user_details(self, username: str) -> dict:
"""
Get the details of a user by their username
:param username: email address of the preservica user
:type username: str
:return: dictionary of user attributes
:rtype: dict
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f"{self.protocol}://{self.server}/api/admin/users/{username}", headers=headers)
return_dict = {}
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
username = entity_response.find(f'.//{{{self.admin_ns}}}UserName')
return_dict['UserName'] = username.text
fullname = entity_response.find(f'.//{{{self.admin_ns}}}FullName')
return_dict['FullName'] = fullname.text
email = entity_response.find(f'.//{{{self.admin_ns}}}Email')
return_dict['Email'] = email.text
tenant = entity_response.find(f'.//{{{self.admin_ns}}}Tenant')
return_dict['Tenant'] = tenant.text
enable = entity_response.find(f'.//{{{self.admin_ns}}}Enabled')
return_dict['Enabled'] = bool(enable.text == "true")
roles = entity_response.findall(f'.//{{{self.admin_ns}}}Role')
return_roles = []
for role in roles:
return_roles.append(role.text)
return_dict['Roles'] = return_roles
return return_dict
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.user_details(username)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "user_details failed")
def _account_status_(self, username: str, status: str, name: str):
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'text/plain;charset=UTF-8'}
data = {"userEnabledStatus": status}
request = self.session.put(f"{self.protocol}://{self.server}/api/admin/users/{username}/enabled",
headers=headers,
data=data)
if request.status_code == requests.codes.ok:
return request.content.decode("utf-8")
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._account_status_(username, status, name)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, f"{name} failed")
[docs]
def disable_user(self, username):
"""
Disable a Preservica User to prevent them logging in
:param username: email address of the preservica user
:type username: str
"""
self._check_if_user_has_manager_role()
return self._account_status_(username, "false", "disable_user")
[docs]
def enable_user(self, username):
"""
Enable a Preservica User
:param username: email address of the preservica user
:type username: str
"""
self._check_if_user_has_manager_role()
return self._account_status_(username, "true", "enable_user")
[docs]
def user_report(self, report_name="users.csv"):
"""
Create a report on all tenancy users
:return:
"""
self._check_if_user_has_manager_role()
fieldnames = ['UserName', 'FullName', 'Email', 'Tenant', 'Enabled', 'Roles']
with open(report_name, newline='', mode="wt", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
for username in self.all_users():
user_details = self.user_details(username)
writer.writerow(user_details)
[docs]
def all_users(self) -> list:
"""
Return a list of all users in the system
:return list of usernames:
:rtype: list
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f"{self.protocol}://{self.server}/api/admin/users", headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
users = entity_response.findall(f'.//{{{self.admin_ns}}}User')
system_users = []
for user in users:
system_users.append(user.text)
return system_users
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.all_users()
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "all_users failed")
[docs]
def add_xml_schema(self, name: str, description: str, originalName: str, xml_data: Any):
"""
Add a new XSD document to Preservica
:param name: Name for the XSD schema
:type name: str
:param description: Description for the XSD schema
:type description: str
:param originalName: The original file name for the schema on disk
:type originalName: str
:param xml_data: The xml schema as a UTF-8 string or a file like object
:type xml_data: Any
:return: None
:rtype: None
"""
self._check_if_user_has_manager_role()
params = {"name": name, "description": description, "originalName": originalName}
if isinstance(xml_data, str):
xml.etree.ElementTree.fromstring(xml_data)
xml_data = xml_data.encode("utf-8")
elif hasattr(xml_data, "read"):
pass
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.post(f"{self.protocol}://{self.server}/api/admin/schemas", headers=headers,
params=params,
data=xml_data)
if request.status_code == requests.codes.created:
return
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_xml_schema(name, description, originalName, xml_data)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_xml_schema failed")
[docs]
def add_xml_document(self, name: str, xml_data: Any, document_type: str = "MetadataTemplate"):
"""
Add a new XML document to Preservica
The default type of XML document is a descriptive metadata template
Options are:
MetadataDropdownLists -> Authority Lists
CustomIndexDefinition -> Custom Search Indexes
MetadataTemplate -> Metadata Template
UploadWizardConfigurationFile -> Upload Wizard Config
ConfigurationFile -> Heritrix Config File
:param name: The name of the xml document
:type name: str
:param xml_data: The xml schema as a UTF-8 string or as a file like object
:type xml_data:
:param document_type: The type of the XML document, defaults to descriptive metadata templates
:type document_type: str
:return: None
:rtype: None
"""
self._check_if_user_has_manager_role()
params = {"name": name, "type": document_type}
if isinstance(xml_data, str):
xml.etree.ElementTree.fromstring(xml_data)
xml_data = xml_data.encode("utf-8")
elif hasattr(xml_data, "read"):
pass
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.post(f"{self.protocol}://{self.server}/api/admin/documents", headers=headers,
params=params,
data=xml_data)
if request.status_code == requests.codes.created:
return
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.add_xml_document(name, xml_data, document_type)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "add_xml_document failed")
[docs]
def delete_xml_document(self, uri: str):
"""
Delete a XML document from Preservica
:param uri: The URI of the xml document to delete
:type uri: str
:return: None
:rtype: None
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for document in self.xml_documents():
if document['SchemaUri'] == uri.strip():
request = self.session.delete(
f"{self.protocol}://{self.server}/api/admin/documents/{document['ApiId']}",
headers=headers)
if request.status_code == requests.codes.no_content:
return
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.delete_xml_document(uri)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_xml_document failed")
[docs]
def delete_xml_schema(self, uri: str):
"""
Delete an XML schema from Preservica
:param uri: The URI of the xml schema to delete
:type uri: str
:return: None
:rtype: None
"""
self._check_if_user_has_manager_role()
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for schema in self.xml_schemas():
if schema['SchemaUri'] == uri.strip():
request = self.session.delete(f"{self.protocol}://{self.server}/api/admin/schemas/{schema['ApiId']}",
headers=headers)
if request.status_code == requests.codes.no_content:
return
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.delete_xml_schema(uri)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "delete_xml_schema failed")
[docs]
def xml_schema(self, uri: str) -> str:
"""
fetch the metadata schema XSD document as a string by its URI
:param uri: The URI of the xml schema
:type uri: str
:return: The XML schema as a string
:rtype: str
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for schema in self.xml_schemas():
if schema['SchemaUri'] == uri.strip():
request = self.session.get(
f"{self.protocol}://{self.server}/api/admin/schemas/{schema['ApiId']}/content",
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
return xml_response
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.xml_schema(uri)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_schema failed")
[docs]
def xml_document(self, uri: str) -> str:
"""
fetch the metadata XML document as a string by its URI
:param uri: The URI of the xml document
:type uri: str
:return: The XML document as a string
:rtype: str
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
for document in self.xml_documents():
if document['SchemaUri'] == uri.strip():
request = self.session.get(
f"{self.protocol}://{self.server}/api/admin/documents/{document['ApiId']}/content",
headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
return xml_response
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.xml_document(uri)
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_document failed")
[docs]
def xml_documents(self) -> List:
"""
fetch the list of XML documents stored in Preservica
:return: List of XML documents stored in Preservica
:rtype: list
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f'{self.protocol}://{self.server}/api/admin/documents', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
documents = entity_response.findall(f'.//{{{self.admin_ns}}}Document')
results = list()
for document in documents:
document_dict = {}
api_id = document.find(f'.//{{{self.admin_ns}}}ApiId')
name = document.find(f'.//{{{self.admin_ns}}}Name')
document_type = document.find(f'.//{{{self.admin_ns}}}DocumentType')
schema_uri = document.find(f'.//{{{self.admin_ns}}}SchemaUri')
document_dict['SchemaUri'] = schema_uri.text
document_dict['Name'] = name.text
document_dict['DocumentType'] = document_type.text
document_dict['ApiId'] = api_id.text
results.append(document_dict)
return results
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.xml_documents()
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_documents failed")
[docs]
def xml_schemas(self) -> List:
"""
fetch the list of metadata schema XSD documents stored in Preservica
:return: List of XML schema's stored in Preservica
:rtype: list
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f'{self.protocol}://{self.server}/api/admin/schemas', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
schemas = entity_response.findall(f'.//{{{self.admin_ns}}}Schema')
results = []
for schema in schemas:
schema_dict = {}
schema_uri = schema.find(f'.//{{{self.admin_ns}}}SchemaUri')
name = schema.find(f'.//{{{self.admin_ns}}}Name')
description = schema.find(f'.//{{{self.admin_ns}}}Description')
aip_id = schema.find(f'.//{{{self.admin_ns}}}ApiId')
schema_dict['SchemaUri'] = schema_uri.text
schema_dict['Name'] = name.text
if description is not None:
schema_dict['Description'] = description.text
else:
schema_dict['Description'] = ""
schema_dict['ApiId'] = aip_id.text
results.append(schema_dict)
return results
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.xml_schemas()
else:
logger.error(request.content.decode('utf-8'))
raise RuntimeError(request.status_code, "xml_schemas failed")