"""
Base class for authenticated API calls used by Entity, Content and Upload
Manages the authentication token lifetime and namespace versions.
author: James Carr
licence: Apache License 2.0
"""
import configparser
import functools
import hashlib
import json
import logging
import os
import platform
import re
import sys
import threading
import time
import unicodedata
import xml.etree.ElementTree
from enum import Enum
from pathlib import Path
import pyotp
from urllib3.util import Retry
import requests
from requests.adapters import HTTPAdapter
import pyPreservica
logger = logging.getLogger(__name__)
NS_XIP_ROOT = "http://preservica.com/XIP/"
NS_ENTITY_ROOT = "http://preservica.com/EntityAPI/"
NS_RM_ROOT = "http://preservica.com/RetentionManagement/"
NS_SEC_ROOT = "http://preservica.com/SecurityAPI"
NS_WORKFLOW = "http://workflow.preservica.com"
NS_ADMIN = "http://preservica.com/AdminAPI"
NS_XIP_V6 = "http://preservica.com/XIP/v6.0"
NS_ENTITY = "http://preservica.com/EntityAPI/v6.0"
HEADER_TOKEN = "Preservica-Access-Token"
IO_PATH = "information-objects"
SO_PATH = "structural-objects"
CO_PATH = "content-objects"
HASH_BLOCK_SIZE = 65536
TIME_OUT = 62
CHUNK_SIZE = 1024 * 4
class FileHash:
"""
A wrapper around the hashlib hash algorithms that allows an entire file to
be hashed in a chunked manner.
"""
def __init__(self, algorithm):
self.algorithm = algorithm
def get_algorithm(self):
return self.algorithm
def __call__(self, file):
hash_algorithm = self.algorithm()
with open(file, 'rb') as f:
buf = f.read(HASH_BLOCK_SIZE)
while len(buf) > 0:
hash_algorithm.update(buf)
buf = f.read(HASH_BLOCK_SIZE)
return hash_algorithm.hexdigest()
def identifiersToDict(identifiers: set) -> dict:
"""
Convert a set of tuples to a dict
:param identifiers:
:return:
"""
result = {}
for identifier_tuple in identifiers:
result[identifier_tuple[0]] = identifier_tuple[1]
return result
def strtobool(val) -> bool:
"""
Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
'val' is anything else.
"""
val = val.lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
return True
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
return False
else:
raise ValueError("invalid truth value %r" % (val,))
def _make_stored_zipfile(base_name, base_dir, owner, group, verbose=0, dry_run=0, logger=None):
"""
Create a non compressed zip file from all the files under 'base_dir'.
The output zip file will be named 'base_name' + ".zip". Returns the
name of the output zip file.
"""
import zipfile # late import for breaking circular dependency
zip_filename = base_name + ".zip"
archive_dir = os.path.dirname(base_name)
if archive_dir and not os.path.exists(archive_dir):
if logger is not None:
logger.info("creating %s", archive_dir)
if not dry_run:
os.makedirs(archive_dir)
if logger is not None:
logger.info("creating '%s' and adding '%s' to it",
zip_filename, base_dir)
if not dry_run:
with zipfile.ZipFile(zip_filename, "w", compression=zipfile.ZIP_STORED) as zf:
path = os.path.normpath(base_dir)
if path != os.curdir:
zf.write(path, path)
if logger is not None:
logger.info("adding '%s'", path)
for dirpath, dirnames, filenames in os.walk(base_dir):
for name in sorted(dirnames):
path = os.path.normpath(os.path.join(dirpath, name))
zf.write(path, path)
if logger is not None:
logger.info("adding '%s'", path)
for name in filenames:
path = os.path.normpath(os.path.join(dirpath, name))
if os.path.isfile(path):
zf.write(path, path)
if logger is not None:
logger.info("adding '%s'", path)
return zip_filename
class PagedSet:
"""
Class to represent a page of results
The results object contains the list of objects of interest
"""
def __init__(self, results, has_more: bool, total: int, next_page: str):
self.results = results
self.has_more = bool(has_more)
self.total = int(total)
self.next_page = next_page
def __str__(self):
return self.results.__str__()
def get_results(self):
return self.results
def get_total(self):
return self.total
def has_more_pages(self):
return self.has_more
class Sha1FixityCallBack:
def __call__(self, filename, full_path):
sha = FileHash(hashlib.sha1)
return "SHA1", sha(full_path)
class Sha256FixityCallBack:
def __call__(self, filename, full_path):
sha = FileHash(hashlib.sha256)
return "SHA256", sha(full_path)
class Sha512FixityCallBack:
def __call__(self, filename, full_path):
sha = FileHash(hashlib.sha512)
return "SHA512", sha(full_path)
class ReportProgressConsoleCallback:
def __init__(self, prefix='Progress:', suffix='', length=100, fill='█', printEnd="\r"):
self.prefix = prefix
self.suffix = suffix
self.length = length
self.fill = fill
self.printEnd = printEnd
self._lock = threading.Lock()
self.print_progress_bar(0)
def __call__(self, value):
with self._lock:
values = value.split(":")
self.total = int(values[1])
self.current = int(values[0])
if self.total == 0:
percentage = 100.0
else:
percentage = (self.current / self.total) * 100
self.print_progress_bar(percentage)
if int(percentage) == int(100):
self.print_progress_bar(100.0)
sys.stdout.write(self.printEnd)
sys.stdout.flush()
def print_progress_bar(self, percentage):
filled_length = int(self.length * (percentage / 100.0))
bar_sym = self.fill * filled_length + '-' * (self.length - filled_length)
sys.stdout.write(
'\r%s |%s| (%.2f%%) %s ' % (self.prefix, bar_sym, percentage, self.suffix))
sys.stdout.flush()
class UploadProgressConsoleCallback:
def __init__(self, filename: str, prefix='Progress:', suffix='', length=100, fill='█', printEnd="\r"):
self.prefix = prefix
self.suffix = suffix
self.length = length
self.fill = fill
self.printEnd = printEnd
self._filename = filename
self._size = float(Path(filename).stat().st_size)
self._seen_so_far = 0
self.start = time.time()
self._lock = threading.Lock()
self.print_progress_bar(0, 0)
def __call__(self, bytes_amount):
with self._lock:
seconds = time.time() - self.start
if seconds == 0:
seconds = 1.0
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * float(100.0)
rate = (self._seen_so_far / (1024 * 1024)) / seconds
self.print_progress_bar(percentage, rate)
if int(self._seen_so_far) == int(self._size):
self.print_progress_bar(100.0, rate)
sys.stdout.write(self.printEnd)
sys.stdout.flush()
def print_progress_bar(self, percentage, rate):
filled_length = int(self.length * (percentage / 100.0))
bar_sym = self.fill * filled_length + '-' * (self.length - filled_length)
sys.stdout.write(
'\r%s |%s| (%.2f%%) (%.2f %s) %s ' % (self.prefix, bar_sym, percentage, rate, "Mb/s", self.suffix))
sys.stdout.flush()
class UploadProgressCallback:
"""
Default implementation of a callback class to show upload progress of a file
"""
def __init__(self, filename: str):
self._filename = filename
self._size = float(os.path.getsize(filename))
self._seen_so_far = 0
self._lock = threading.Lock()
def __call__(self, bytes_amount):
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
sys.stdout.write("\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far, self._size, percentage))
sys.stdout.flush()
[docs]
class RelationshipDirection(Enum):
FROM = "From"
TO = "To"
[docs]
class EntityType(Enum):
"""
Enumeration of the Entity Types
"""
ASSET = "IO"
FOLDER = "SO"
CONTENT_OBJECT = "CO"
class HTTPException(Exception):
"""
Custom Exception non 404 errors
"""
def __init__(self, reference, http_status_code, url, method_name, message):
self.reference = reference
self.url = url
self.method_name = method_name
self.http_status_code = http_status_code
self.msg = message
Exception.__init__(self, self.reference, self.http_status_code, self.url, self.msg)
def __str__(self):
return f"Calling method {self.method_name}() {self.url} returned HTTP {self.http_status_code}. {self.msg}"
class ReferenceNotFoundException(Exception):
"""
Custom Exception for failed lookups by reference 404 Errors
"""
def __init__(self, reference, http_status_code, url, method_name):
self.reference = reference
self.url = url
self.method_name = method_name
self.http_status_code = http_status_code
self.msg = f"The requested reference {self.reference} is not found in the repository"
Exception.__init__(self, self.reference, self.http_status_code, self.url, self.msg)
def __str__(self):
return f"Calling method {self.method_name}() {self.url} returned HTTP {self.http_status_code}. {self.msg}"
class Relationship:
DCMI_hasFormat = "http://purl.org/dc/terms/hasFormat"
DCMI_isFormatOf = "http://purl.org/dc/terms/isFormatOf"
DCMI_hasPart = "http://purl.org/dc/terms/hasPart"
DCMI_isPartOf = "http://purl.org/dc/terms/isPartOf"
DCMI_hasVersion = "http://purl.org/dc/terms/hasVersion"
DCMI_isVersionOf = "http://purl.org/dc/terms/isVersionOf"
DCMI_isReferencedBy = "http://purl.org/dc/terms/isReferencedBy"
DCMI_references = "http://purl.org/dc/terms/references"
DCMI_isReplacedBy = "http://purl.org/dc/terms/isReplacedBy"
DCMI_replaces = "http://purl.org/dc/terms/replaces"
DCMI_isRequiredBy = "http://purl.org/dc/terms/isRequiredBy"
DCMI_requires = "http://purl.org/dc/terms/requires"
DCMI_conformsTo = "http://purl.org/dc/terms/conformsTo"
def __init__(self, relationship_id: str, relationship_type: str, direction: RelationshipDirection, other_ref: str,
title: str, entity_type: EntityType, this_ref: str, api_id: str):
self.api_id = api_id
self.this_ref = this_ref
self.entity_type = entity_type
self.title = title
self.other_ref = other_ref
self.direction = direction
self.relationship_type = relationship_type
self.relationship_id = relationship_id
def __str__(self):
if self.direction == RelationshipDirection.FROM:
return f"{self.this_ref} {self.relationship_type} {self.other_ref}"
else:
return f"{self.other_ref} {self.relationship_type} {self.this_ref}"
[docs]
class IntegrityCheck:
"""
Class to hold information about completed integrity checks
"""
def __init__(self, check_type, success, date, adapter, fixed, reason):
self.check_type = check_type
self.success = bool(success)
self.date = date
self.adapter = adapter
self.fixed = bool(fixed)
self.reason = reason
def __str__(self):
return f"Type:\t\t\t{self.check_type}\n" \
f"Success:\t\t\t{self.success}\n" \
f"Date:\t{self.date}\n" \
f"Storage Adapter:\t{self.adapter}\n"
def __repr__(self):
return self.__str__()
def get_adapter(self):
return self.adapter
def get_success(self):
return self.success
[docs]
class Bitstream:
"""
Class to represent the Bitstream Object or digital file in the Preservica data model
"""
def __init__(self, filename: str, length: int, fixity: dict, content_url: str):
self.filename = filename
self.length = int(length)
self.fixity = fixity
self.content_url = content_url
def __str__(self):
return f"""
Filename: {self.filename}
File Length: {self.length}
Fixity: {self.fixity}
"""
def __repr__(self):
return self.__str__()
[docs]
class Generation:
"""
Class to represent the Generation Object in the Preservica data model
"""
def __init__(self, original: bool, active: bool, format_group: str, effective_date: str, bitstreams: list):
self.original = bool(original)
self.active = bool(active)
self.content_object = None
self.format_group = format_group
self.effective_date = effective_date
self.bitstreams = bitstreams
self.properties = list()
self.formats = list()
def __str__(self):
return f"""
Active: {self.active}
Original: {self.original}
Format Group: {self.format_group}
Effective Date: {self.effective_date}
Formats: {self.formats}
Properties: {self.properties}
"""
def __repr__(self):
return self.__str__()
[docs]
class Entity:
"""
Base Class of Assets, Folders and Content Objects
"""
def __init__(self, reference: str, title: str, description: str, security_tag: str, parent: str, metadata: dict):
self.reference = reference
self.title = title
self.description = description
self.security_tag = security_tag
self.parent = parent
self.metadata = metadata
self.entity_type = None
self.path = None
self.tag = None
self.custom_type = None
def __str__(self):
return f"""
Entity: {self.entity_type}
Entity Ref: {self.reference}
Title: {self.title}
Description: {self.description}
Security Tag: {self.security_tag}
Parent: {self.parent}
Custom Type: {self.custom_type}
"""
def __repr__(self):
return self.__str__()
def has_metadata(self):
return bool(self.metadata)
def metadata_namespaces(self):
return list(self.metadata.values())
[docs]
class Folder(Entity):
"""
Class to represent the Structural Object or Folder in the Preservica data model
"""
def __init__(self, reference: str, title: str, description: str = None, security_tag: str = None,
parent: str = None, metadata: dict = None):
super().__init__(reference, title, description, security_tag, parent, metadata)
self.entity_type = EntityType.FOLDER
self.path = SO_PATH
self.tag = "StructuralObject"
[docs]
class Asset(Entity):
"""
Class to represent the Information Object or Asset in the Preservica data model
"""
def __init__(self, reference: str, title: str, description: str = None, security_tag: str = None,
parent: str = None, metadata: dict = None):
super().__init__(reference, title, description, security_tag, parent, metadata)
self.entity_type = EntityType.ASSET
self.path = IO_PATH
self.tag = "InformationObject"
[docs]
class ContentObject(Entity):
"""
Class to represent the Content Object in the Preservica data model
"""
def __init__(self, reference: str, title: str, description: str = None, security_tag: str = None,
parent: str = None, metadata: dict = None):
super().__init__(reference, title, description, security_tag, parent, metadata)
self.entity_type = EntityType.CONTENT_OBJECT
self.representation_type = None
self.asset = None
self.path = CO_PATH
self.tag = "ContentObject"
[docs]
class Representation:
"""
Class to represent the Representation Object in the Preservica data model
"""
def __init__(self, asset: Asset, rep_type: str, name: str, url: str):
self.asset = asset
self.rep_type = rep_type
self.name = name
self.url = url
def __str__(self):
return f"Type:\t\t\t{self.rep_type}\n" \
f"Name:\t\t\t{self.name}\n" \
f"URL:\t{self.url}"
def __repr__(self):
return self.__str__()
def only_assets(entity: Entity):
return bool(entity.entity_type is EntityType.ASSET)
def only_folders(entity: Entity):
return bool(entity.entity_type is EntityType.FOLDER)
def content_api_identifier_to_type(ref: str):
ref = ref.replace('sdb:', '')
parts = ref.split("|")
return tuple((EntityType(parts[0]), parts[1]))
class Thumbnail(Enum):
SMALL = "small"
MEDIUM = "medium"
LARGE = "large"
def sanitize(filename) -> str:
"""
Return a fairly safe version of the filename.
We don't limit ourselves to ascii, because we want to keep municipality
names, etc, but we do want to get rid of anything potentially harmful,
and make sure we do not exceed Windows filename length limits.
Hence, a less safe blacklist, rather than a whitelist.
"""
blacklist = ["\\", "/", ":", "*", "?", "\"", "<", ">", "|", "\0"]
reserved = [
"CON", "PRN", "AUX", "NUL", "COM1", "COM2", "COM3", "COM4", "COM5",
"COM6", "COM7", "COM8", "COM9", "LPT1", "LPT2", "LPT3", "LPT4", "LPT5",
"LPT6", "LPT7", "LPT8", "LPT9",
] # Reserved words on Windows
filename = "".join(c for c in filename if c not in blacklist)
# Remove all characters below code point 32
filename = "".join(c for c in filename if 31 < ord(c))
filename = unicodedata.normalize("NFKD", filename)
filename = filename.rstrip(". ") # Windows does not allow these at end
filename = filename.strip()
if all([x == "." for x in filename]):
filename = "__" + filename
if filename in reserved:
filename = "__" + filename
if len(filename) == 0:
filename = "__"
if len(filename) > 255:
parts = re.split(r"[/\\]", filename)[-1].split(".")
if len(parts) > 1:
ext = "." + parts.pop()
filename = filename[:-len(ext)]
else:
ext = ""
if filename == "":
filename = "__"
if len(ext) > 254:
ext = ext[254:]
maxl = 255 - len(ext)
filename = filename[:maxl]
filename = filename + ext
# Re-check last character (if there was no extension)
filename = filename.rstrip(". ")
if len(filename) == 0:
filename = "__"
return filename
class AuthenticatedAPI:
"""
Base class for authenticated calls which need an access token
Authenticated calls include a "Preservica-Access-Token" header in the request
"""
def _check_if_user_has_manager_role(self):
"""
Check if the current user has a least a manager role
:return: None
Throws RuntimeError if the user does not have required roles
"""
if ('ROLE_SDB_MANAGER_USER' not in self.roles) and ('ROLE_SDB_ADMIN_USER' not in self.roles):
logger.error(f"The AdminAPI requires the user to have ROLE_SDB_MANAGER_USER")
raise RuntimeError(f"The API requires the user to have at least the ROLE_SDB_MANAGER_USER")
def _find_user_roles_(self) -> list:
"""
Get a list of roles for the user
:return list of roles:
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f"{self.protocol}://{self.server}/api/user/details", headers=headers)
if request.status_code == requests.codes.ok:
roles = json.loads(str(request.content.decode('utf-8')))['roles']
return roles
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._find_user_roles_()
def security_tags_base(self, with_permissions: bool = False) -> dict:
"""
Return security tags available for the current user
:return: dict of security tags
:rtype: dict
"""
if (self.major_version < 7) and (self.minor_version < 4) and (self.patch_version < 1):
raise RuntimeError("security_tags API call is only available with a Preservica v6.3.1 system or higher")
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/xml;charset=UTF-8'}
request = self.session.get(f'{self.protocol}://{self.server}/api/security/tags', headers=headers)
if request.status_code == requests.codes.ok:
xml_response = str(request.content.decode('utf-8'))
logger.debug(xml_response)
entity_response = xml.etree.ElementTree.fromstring(xml_response)
security_tags = {}
tags = entity_response.findall(f'.//{{{self.sec_ns}}}Tag')
for tag in tags:
if with_permissions:
permissions = []
for p in tag.findall(f'.//{{{self.sec_ns}}}Permission'):
permissions.append(p.text)
security_tags[tag.attrib['name']] = permissions
else:
security_tags[tag.attrib['name']] = tag.attrib['name']
return security_tags
if request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.security_tags_base()
else:
logger.error(f'security_tags failed {request.status_code}')
raise RuntimeError(request.status_code, "security_tags failed")
def entity_from_string(self, xml_data: str) -> dict:
"""
Create a basic entity from XML data
:param xml_data:
:return: dict
"""
entity_response = xml.etree.ElementTree.fromstring(xml_data)
reference = entity_response.find(f'.//{{{self.xip_ns}}}Ref')
title = entity_response.find(f'.//{{{self.xip_ns}}}Title')
security_tag = entity_response.find(f'.//{{{self.xip_ns}}}SecurityTag')
description = entity_response.find(f'.//{{{self.xip_ns}}}Description')
parent = entity_response.find(f'.//{{{self.xip_ns}}}Parent')
custom_type = entity_response.find(f'.//{{{self.xip_ns}}}CustomType')
if hasattr(parent, 'text'):
parent = parent.text
else:
parent = None
fragments = entity_response.findall(f'.//{{{self.entity_ns}}}Metadata/{{{self.entity_ns}}}Fragment')
metadata = {}
for fragment in fragments:
metadata[fragment.text] = fragment.attrib['schema']
entity_dict = {'reference': reference.text, 'title': title.text if hasattr(title, 'text') else None,
'description': description.text if hasattr(description, 'text') else None,
'security_tag': security_tag.text, 'parent': parent, 'metadata': metadata}
if hasattr(custom_type, 'text'):
entity_dict['CustomType'] = custom_type.text
return entity_dict
def __version_namespace__(self):
"""
Generate version specific namespaces from the server version
"""
if self.major_version == 7:
self.xip_ns = f"{NS_XIP_ROOT}v{self.major_version}.{self.minor_version}"
self.entity_ns = f"{NS_ENTITY_ROOT}v{self.major_version}.{self.minor_version}"
self.rm_ns = f"{NS_RM_ROOT}v{6}.{2}"
self.sec_ns = f"{NS_SEC_ROOT}/v{self.major_version}.{self.minor_version}"
self.admin_ns = f"{NS_ADMIN}/v{self.major_version}.{self.minor_version}"
if self.major_version == 6:
if self.minor_version < 2:
self.xip_ns = NS_XIP_V6
self.entity_ns = NS_ENTITY
else:
self.xip_ns = f"{NS_XIP_ROOT}v{self.major_version}.{self.minor_version}"
self.entity_ns = f"{NS_ENTITY_ROOT}v{self.major_version}.{self.minor_version}"
self.rm_ns = f"{NS_RM_ROOT}v{self.major_version}.{2}"
self.sec_ns = f"{NS_SEC_ROOT}/v{self.major_version}.{self.minor_version}"
self.admin_ns = f"{NS_ADMIN}/v{self.major_version}.{self.minor_version}"
def __version_number__(self):
"""
Determine the version number of the server
"""
headers = {HEADER_TOKEN: self.token}
request = self.session.get(f'{self.protocol}://{self.server}/api/entity/versiondetails/version',
headers=headers)
if request.status_code == requests.codes.ok:
xml_ = str(request.content.decode('utf-8'))
version = xml_[xml_.find("<CurrentVersion>") + len("<CurrentVersion>"):xml_.find("</CurrentVersion>")]
version_numbers = version.split(".")
self.major_version = int(version_numbers[0])
self.minor_version = int(version_numbers[1])
self.patch_version = int(version_numbers[2])
return version
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.__version_number__()
else:
logger.error(f"version number failed with http response {request.status_code}")
logger.error(str(request.content))
RuntimeError(request.status_code, "version number failed")
def __str__(self):
return f"pyPreservica version: {pyPreservica.__version__} (Preservica 7.0 Compatible) " \
f"Connected to: {self.server} Preservica version: {self.version} as {self.username} " \
f"in tenancy {self.tenant}"
def __repr__(self):
return self.__str__()
def save_config(self):
config = configparser.RawConfigParser(interpolation=None)
config['credentials'] = {'username': self.username, 'password': self.password, 'tenant': self.tenant,
'server': self.server}
if self.two_fa_secret_key is not None:
config['credentials']['twoFactorToken'] = self.two_fa_secret_key
with open('credentials.properties', 'wt', encoding="utf-8") as configfile:
config.write(configfile)
def manager_token(self, username: str, password: str):
data = {'username': username, 'password': password, 'tenant': self.tenant}
response = self.session.post(f'{self.protocol}://{self.server}/api/accesstoken/login', data=data)
if response.status_code == requests.codes.ok:
return response.json()['token']
else:
msg = "Could not generate valid manager approval password"
logger.error(msg)
logger.error(response.status_code)
logger.error(str(response.content))
RuntimeError(response.status_code, "Could not generate valid manager approval token")
def __token__(self):
"""
Generate am API token to use to authenticate calls
:return: API Token
"""
logger.debug("Token Expired Requesting New Token")
if self.shared_secret is False:
if self.tenant is None:
data = {'username': self.username, 'password': self.password, 'includeUserDetails': 'true'}
else:
data = {'username': self.username, 'password': self.password, 'tenant': self.tenant}
response = self.session.post(f'{self.protocol}://{self.server}/api/accesstoken/login', data=data)
if response.status_code == requests.codes.ok:
if self.tenant is None:
self.tenant = response.json()['tenant']
return response.json()['token']
else:
if 'message' in response.json():
if response.json()['message'] == "needs.2fa":
logger.debug("2FA Found")
if self.tenant is None:
self.tenant = response.json()['tenant']
if self.two_fa_secret_key:
totp = pyotp.TOTP(self.two_fa_secret_key)
data = {'username': self.username,
'continuationToken': response.json()['continuationToken'],
'tenant': self.tenant, 'twoFactorToken': totp.now()}
response_2fa = self.session.post(
f'{self.protocol}://{self.server}/api/accesstoken/complete-2fa',
data=data)
if response_2fa.status_code == requests.codes.ok:
return response_2fa.json()['token']
else:
msg = "Failed to create a 2FA authentication token. Check your credentials are correct"
logger.error(msg)
logger.error(str(response.content))
raise RuntimeError(response.status_code, msg)
else:
msg = "2FA twoFactorToken required to authenticate against this account using 2FA"
logger.error(msg)
logger.error(str(response.content))
raise RuntimeError(response.status_code, msg)
if response.json()['message'] == "needs.2fa.setup":
msg = "2FA is activated but not yet set up"
logger.error(msg)
logger.error(str(response.content))
raise RuntimeError(response.status_code, msg)
msg = "Failed to create a password based authentication token. Check your credentials are correct"
logger.error(msg)
logger.error(str(response.content))
raise RuntimeError(response.status_code, msg)
if self.shared_secret is True:
endpoint = "api/accesstoken/acquire-external"
timestamp = int(time.time())
to_hash = f"preservica-external-auth{timestamp}{self.username}{self.password}"
sha1 = hashlib.sha1()
sha1.update(to_hash.encode(encoding='utf-8'))
data = {"username": self.username, "tenant": self.tenant, "timestamp": timestamp, "hash": sha1.hexdigest()}
response = self.session.post(f'{self.protocol}://{self.server}/{endpoint}', data=data)
if response.status_code == requests.codes.ok:
return response.json()['token']
else:
msg = "Failed to create a shared secret authentication token. Check your credentials are correct"
logger.error(msg)
raise RuntimeError(response.status_code, msg)
def __init__(self, username: str = None, password: str = None, tenant: str = None, server: str = None,
use_shared_secret: bool = False, two_fa_secret_key: str = None, protocol: str = "https"):
config = configparser.ConfigParser(interpolation=configparser.Interpolation())
config.read('credentials.properties', encoding='utf-8')
self.session = requests.Session()
retries = Retry(
total=3,
backoff_factor=0.1,
status_forcelist=[502, 503, 504],
allowed_methods=Retry.DEFAULT_ALLOWED_METHODS
)
self.shared_secret = bool(use_shared_secret)
self.protocol = protocol
self.two_fa_secret_key = two_fa_secret_key
self.session.mount(f'{self.protocol}://', HTTPAdapter(max_retries=retries))
self.session.request = functools.partial(self.session.request, timeout=TIME_OUT)
if not two_fa_secret_key:
two_fa_secret_key = os.environ.get('PRESERVICA_2FA_TOKEN')
if two_fa_secret_key is None:
try:
two_fa_secret_key = config['credentials']['twoFactorToken']
except KeyError:
pass
self.two_fa_secret_key = two_fa_secret_key
if not username:
username = os.environ.get('PRESERVICA_USERNAME')
if username is None:
try:
username = config['credentials']['username']
except KeyError:
pass
if not username:
msg = "No valid username found in method arguments, environment variables or credentials.properties file"
logger.error(msg)
raise RuntimeError(msg)
else:
self.username = username
if not password:
password = os.environ.get('PRESERVICA_PASSWORD')
if password is None:
try:
password = config['credentials']['password']
except KeyError:
pass
if not password:
msg = "No valid password found in method arguments, environment variables or credentials.properties file"
logger.error(msg)
raise RuntimeError(msg)
else:
self.password = password
if not tenant:
tenant = os.environ.get('PRESERVICA_TENANT')
if tenant is None:
try:
tenant = config['credentials']['tenant']
except KeyError:
pass
if not tenant:
msg = "No valid tenant found in method arguments, environment variables or credentials.properties file"
logger.debug(msg)
self.tenant = tenant
if not server:
server = os.environ.get('PRESERVICA_SERVER')
if server is None:
try:
server = config['credentials']['server']
except KeyError:
pass
if not server:
msg = "No valid server found in method arguments, environment variables or credentials.properties file"
logger.error(msg)
raise RuntimeError(msg)
else:
self.server = server
self.token = self.__token__()
self.version = self.__version_number__()
self.__version_namespace__()
self.roles = self._find_user_roles_()
self.session.headers.update({'User-Agent': f'pyPreservica SDK/({pyPreservica.__version__}) '
f' ({platform.platform()}/{os.name}/{sys.platform})'})
logger.debug(self.xip_ns)
logger.debug(self.entity_ns)