"""
pyPreservica ContentAPI module definition
A client library for the Preservica Repository web services Content API
https://us.preservica.com/api/content/documentation.html
author: James Carr
licence: Apache License 2.0
"""
import csv
from typing import Generator
from pyPreservica.common import *
logger = logging.getLogger(__name__)
[docs]
class ContentAPI(AuthenticatedAPI):
def __init__(self, username=None, password=None, tenant=None, server=None, use_shared_secret=False,
two_fa_secret_key: str = None, protocol: str = "https"):
super().__init__(username, password, tenant, server, use_shared_secret, two_fa_secret_key, protocol)
self.callback = None
class SearchResult:
def __init__(self, metadata, refs, hits, results_list, next_start):
self.metadata = metadata
self.refs = refs
self.hits = int(hits)
self.results_list = results_list
self.next_start = next_start
def search_callback(self, fn):
self.callback = fn
def user_security_tags(self, with_permissions: bool = False):
"""
Return available security tags
:return: dict of security tags
:rtype: dict
"""
return self.security_tags_base(with_permissions=with_permissions)
[docs]
def object_details(self, entity_type, reference: str) -> dict:
"""
:param entity_type:
:param reference:
:return: Dictionary of object attributes
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/json'}
if type(entity_type) == EntityType:
params = {'id': f'sdb:{entity_type.value}|{reference}'}
else:
params = {'id': f'sdb:{entity_type}|{reference}'}
request = self.session.get(f'{self.protocol}://{self.server}/api/content/object-details', params=params,
headers=headers)
if request.status_code == requests.codes.ok:
return request.json()["value"]
elif request.status_code == requests.codes.not_found:
logger.error(f"The requested reference is not found in the repository: {reference}")
raise RuntimeError(reference, "The requested reference is not found in the repository")
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.object_details(entity_type, reference)
else:
logger.error(f"object_details failed with error code: {request.status_code}")
raise RuntimeError(request.status_code, f"object_details failed with error code: {request.status_code}")
def download(self, reference, filename):
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream'}
params = {'id': f'sdb:IO|{reference}'}
with self.session.get(f'{self.protocol}://{self.server}/api/content/download', params=params, headers=headers,
stream=True) as req:
if req.status_code == requests.codes.ok:
with open(filename, 'wb') as file:
for chunk in req.iter_content(chunk_size=CHUNK_SIZE):
file.write(chunk)
file.flush()
file.close()
return filename
elif req.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.download(reference, filename)
elif req.status_code == requests.codes.not_found:
logger.error(f"The requested asset reference is not found in the repository: {reference}")
raise RuntimeError(reference, "The requested reference is not found in the repository")
else:
logger.error(f"download failed with error code: {req.status_code}")
raise RuntimeError(req.status_code, f"download failed with error code: {req.status_code}")
def thumbnail(self, entity_type, reference, filename, size=Thumbnail.LARGE):
headers = {HEADER_TOKEN: self.token, 'accept': 'image/png'}
params = {'id': f'sdb:{entity_type}|{reference}', 'size': f'{size.value}'}
with self.session.get(f'{self.protocol}://{self.server}/api/content/thumbnail', params=params, headers=headers,
stream=True) as req:
if req.status_code == requests.codes.ok:
with open(filename, 'wb') as file:
for chunk in req.iter_content(chunk_size=CHUNK_SIZE):
file.write(chunk)
file.flush()
return filename
elif req.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.thumbnail(entity_type, reference, filename, size)
elif req.status_code == requests.codes.not_found:
logger.error(req.content.decode("utf-8"))
logger.error(f"The requested reference is not found in the repository: {reference}")
raise RuntimeError(reference, "The requested reference is not found in the repository")
else:
logger.error(f"thumbnail failed with error code: {req.status_code}")
raise RuntimeError(req.status_code, f"thumbnail failed with error code: {req.status_code}")
[docs]
def indexed_fields(self):
headers = {HEADER_TOKEN: self.token}
results = self.session.get(f'{self.protocol}://{self.server}/api/content/indexed-fields', headers=headers)
if results.status_code == requests.codes.ok:
fields = {}
for ob in results.json()["value"]:
field = f'{ob["shortName"]}.{ob["index"]}'
fields[field] = ob["uri"]
return fields
elif results.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.indexed_fields()
else:
logger.error(f"indexed_fields failed with error code: {results.status_code}")
raise RuntimeError(results.status_code, f"indexed_fields failed with error code: {results.status_code}")
[docs]
def simple_search_csv(self, query: str = "%", page_size: int = 50, csv_file="search.csv", list_indexes: list = None):
if list_indexes is None or len(list_indexes) == 0:
metadata_fields = ["xip.reference", "xip.title", "xip.description", "xip.document_type",
"xip.parent_ref", "xip.security_descriptor"]
else:
metadata_fields = list(list_indexes)
if "xip.reference" not in metadata_fields:
metadata_fields.insert(0, "xip.reference")
with open(csv_file, newline='', mode="wt", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=metadata_fields)
writer.writeheader()
writer.writerows(self.simple_search_list(query, page_size, metadata_fields))
[docs]
def simple_search_list(self, query: str = "%", page_size: int = 50, list_indexes: list = None):
search_result = self._simple_search(query, 0, page_size, list_indexes)
for e in search_result.results_list:
yield e
found = len(search_result.results_list)
while search_result.hits > found:
search_result = self._simple_search(query, found, page_size, list_indexes)
for e in search_result.results_list:
yield e
found = found + len(search_result.results_list)
def _simple_search(self, query: str = "%", start_index: int = 0, page_size: int = 10, list_indexes: list = None):
start_from = str(start_index)
headers = {'Content-Type': 'application/x-www-form-urlencoded', HEADER_TOKEN: self.token}
query_term = ('{ "q": "%s" }' % query)
if list_indexes is None or len(list_indexes) == 0:
metadata_fields = "xip.title,xip.description,xip.document_type,xip.parent_ref,xip.security_descriptor"
else:
metadata_fields = ','.join(list_indexes)
payload = {'start': start_from, 'max': str(page_size), 'metadata': metadata_fields, 'q': query_term}
results = self.session.post(f'{self.protocol}://{self.server}/api/content/search', data=payload,
headers=headers)
results_list = []
if results.status_code == requests.codes.ok:
json_doc = results.json()
metadata = json_doc['value']['metadata']
refs = list(json_doc['value']['objectIds'])
refs = list(map(lambda x: content_api_identifier_to_type(x), refs))
hits = int(json_doc['value']['totalHits'])
for m_row, r_row in zip(metadata, refs):
results_map = {'xip.reference': r_row[1]}
for li in m_row:
results_map[li['name']] = li['value']
results_list.append(results_map)
next_start = start_index + page_size
if self.callback is not None:
value = str(f'{len(results_list) + start_index}:{hits}')
self.callback(value)
search_results = self.SearchResult(metadata, refs, hits, results_list, next_start)
return search_results
elif results.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._simple_search(query, start_index, page_size, list_indexes)
else:
logger.error(f"search failed with error code: {results.status_code}")
raise RuntimeError(results.status_code, f"simple_search failed with error code: {results.status_code}")
def search_index_filter_csv(self, query: str = "%", csv_file="search.csv", page_size: int = 50, filter_values: dict = None,
sort_values: dict = None):
if filter_values is None:
filter_values = {}
if "xip.reference" not in filter_values:
filter_values["xip.reference"] = ""
header_fields = list(filter_values.keys())
index = header_fields.index("xip.reference")
header_fields.insert(0, header_fields.pop(index))
with open(csv_file, newline='', mode="wt", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=header_fields)
writer.writeheader()
writer.writerows(self.search_index_filter_list(query, page_size, filter_values, sort_values))
def search_index_filter_list(self, query: str = "%", page_size: int = 25, filter_values: dict = None,
sort_values: dict = None) -> Generator:
"""
Run a search query with optional filters
:param query: The main search query.
:param page_size: The default search page size
:param filter_values: Dictionary of index names and values
:param sort_values: Dictionary of sort index names and values
:return: search result
"""
search_result = self._search_index_filter(query, 0, page_size, filter_values, sort_values)
for e in search_result.results_list:
yield e
found = len(search_result.results_list)
while search_result.hits > found:
search_result = self._search_index_filter(query, found, page_size, filter_values, sort_values)
for e in search_result.results_list:
yield e
found = found + len(search_result.results_list)
def search_index_filter_hits(self, query: str = "%", filter_values: dict = None) -> int:
"""
Run a search query with filters and return the number of hits only
:param query: The main search query.
:param filter_values: Dictionary of index names and values
:return: Number of search results
"""
start_from = str(0)
headers = {'Content-Type': 'application/x-www-form-urlencoded', HEADER_TOKEN: self.token}
field_list = []
for key, value in filter_values.items():
if value == "":
field_list.append('{' f' "name": "{key}", "values": [] ' + '}')
else:
field_list.append('{' f' "name": "{key}", "values": ["{value}"] ' + '}')
filter_terms = ','.join(field_list)
query_term = ('{ "q": "%s", "fields": [ %s ] }' % (query, filter_terms))
payload = {'start': start_from, 'max': str(10), 'metadata': list(filter_values.keys()), 'q': query_term}
results = self.session.post(f'{self.protocol}://{self.server}/api/content/search', data=payload,
headers=headers)
if results.status_code == requests.codes.ok:
json_doc = results.json()
return int(json_doc['value']['totalHits'])
elif results.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.search_index_filter_hits(query, filter_values)
else:
logger.error(f"search failed with error code: {results.status_code}")
raise RuntimeError(results.status_code, f"_search_index_filter_hits failed")
def _search_index_filter(self, query: str = "%", start_index: int = 0, page_size: int = 25,
filter_values: dict = None, sort_values: dict = None):
start_from = str(start_index)
headers = {'Content-Type': 'application/x-www-form-urlencoded', HEADER_TOKEN: self.token}
if filter_values is None:
filter_values = {}
field_list = []
for key, value in filter_values.items():
if value == "":
field_list.append('{' f' "name": "{key}", "values": [] ' + '}')
else:
field_list.append('{' f' "name": "{key}", "values": ["{value}"] ' + '}')
filter_terms = ','.join(field_list)
if sort_values is None:
query_term = ('{ "q": "%s", "fields": [ %s ] }' % (query, filter_terms))
else:
sort_list = []
for key, value in sort_values.items():
direction = "asc"
if str(value).lower().startswith("d"):
direction = "desc"
sort_list.append(f'{{"sortFields": ["{key}"], "sortOrder": "{direction}"}}')
sort_terms = ','.join(sort_list)
query_term = ('{ "q": "%s", "fields": [ %s ], "sort": [ %s ]}' % (query, filter_terms, sort_terms))
payload = {'start': start_from, 'max': str(page_size), 'metadata': list(filter_values.keys()), 'q': query_term}
logger.debug(payload)
results = self.session.post(f'{self.protocol}://{self.server}/api/content/search', data=payload,
headers=headers)
results_list = []
if results.status_code == requests.codes.ok:
json_doc = results.json()
metadata = json_doc['value']['metadata']
refs = list(json_doc['value']['objectIds'])
refs = list(map(lambda x: content_api_identifier_to_type(x), refs))
hits = int(json_doc['value']['totalHits'])
for m_row, r_row in zip(metadata, refs):
results_map = {'xip.reference': r_row[1]}
for li in m_row:
results_map[li['name']] = li['value']
results_list.append(results_map)
next_start = start_index + page_size
if self.callback is not None:
value = str(f'{len(results_list) + start_index}:{hits}')
self.callback(value)
search_results = self.SearchResult(metadata, refs, hits, results_list, next_start)
return search_results
elif results.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self._search_index_filter(query, start_index, page_size, filter_values)
else:
logger.error(f"search failed with error code: {results.status_code}")
raise RuntimeError(results.status_code, f"search_index_filter failed")
class ReportProgressCallBack:
def __init__(self):
self.current = 0
self.total = 0
self._lock = threading.Lock()
def __call__(self, value):
with self._lock:
values = value.split(":")
self.total = int(values[1])
self.current = int(values[0])
if self.total == 0:
percentage = 100.0
else:
percentage = (self.current / self.total) * 100
sys.stdout.write("\rProcessing Hits %s from %s (%.2f%%)" % (self.current, self.total, percentage))
sys.stdout.flush()