Source code for pyPreservica.contentAPI
"""
pyPreservica ContentAPI module definition
A client library for the Preservica Repository web services Content API
https://us.preservica.com/api/content/documentation.html
author: James Carr
licence: Apache License 2.0
"""
import csv
from io import BytesIO
from typing import Generator, Callable, Optional, Union
from pyPreservica.common import *
logger = logging.getLogger(__name__)
class SortOrder(Enum):
asc = 1
desc = 2
class Operator(Enum):
IS = "IS"
NOT = "NOT"
class Field:
name: str
value: Optional[Union[str, list[str]]] = None
operator: Optional[Operator]
sort_order: Optional[SortOrder]
def __init__(self, name: str, value: Union[str, list[str]], operator: Optional[Operator]=Operator.IS, sort_order: Optional[SortOrder]=None):
self.name = name
self.value = value
self.operator = operator
self.sort_order = sort_order
[docs]
class ContentAPI(AuthenticatedAPI):
"""
The ContentAPI class provides the search interface to the Preservica repository.
"""
def __init__(self, username: str|None = None, password: str|None = None, tenant: str|None = None, server: str|None = None,
use_shared_secret: bool = False, two_fa_secret_key: str|None = None,
protocol: str = "https", request_hook: Callable|None = None, credentials_path: str = 'credentials.properties'):
super().__init__(username, password, tenant, server, use_shared_secret, two_fa_secret_key,
protocol, request_hook, credentials_path)
self.callback: Callable|None = None
class SearchResult:
def __init__(self, metadata, refs, hits, results_list, next_start):
self.metadata = metadata
self.refs = refs
self.hits = int(hits)
self.results_list = results_list
self.next_start = next_start
def search_callback(self, fn: Callable):
self.callback = fn
[docs]
def user_security_tags(self, with_permissions: bool = False):
"""
Return available security tags
:return: dict of security tags
:rtype: dict
"""
return self.security_tags_base(with_permissions=with_permissions)
[docs]
def full_text(self, reference: str) -> str|None:
"""
Return the full text index value for asset.
The reference must be for an Asset.
If the Asset has been OCR'd then this will return the OCR text
:param reference: The Asset reference
:return The value of the full text index or None if not found:
:rtype str:
"""
hits = list(self.simple_search_list(query=f"id:{reference}",
list_indexes=['xip.reference', 'xip.full_text', 'xip.document_type']))
if len(hits) == 1:
hit = hits[0]
if (hit['xip.reference'] == reference) and (hit['xip.document_type'] == 'IO'):
return str(hit['xip.full_text'])
return None
[docs]
def object_details(self, entity_type, reference: str, exclude_dates: bool = False) -> dict:
"""
:param exclude_dates: excludes cmis:createdBy, cmis:creationDate, cmis:lastModifiedBy, cmis:lastModificationDate
:type exclude_dates: bool
:param entity_type:
:param reference:
:return: Dictionary of object attributes
"""
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/json'}
if type(entity_type) == EntityType:
params = {'id': f'sdb:{entity_type.value}|{reference}'}
else:
params = {'id': f'sdb:{entity_type}|{reference}'}
if exclude_dates:
params['excludeproperties'] = 'history'
else:
params['excludeproperties'] = ''
request = self.session.get(f'{self.protocol}://{self.server}/api/content/object-details', params=params,
headers=headers)
if request.status_code == requests.codes.ok:
return request.json()["value"]
elif request.status_code == requests.codes.not_found:
logger.error(f"The requested reference is not found in the repository: {reference}")
raise RuntimeError(reference, "The requested reference is not found in the repository")
else:
logger.error(f"object_details failed with error code: {request.status_code}")
raise RuntimeError(request.status_code, f"object_details failed with error code: {request.status_code}")
def download_bytes(self, reference) -> BytesIO:
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream', 'X-STREAM-No-Retry': 'true'}
params = {'id': f'sdb:IO|{reference}'}
with self.session.get(f'{self.protocol}://{self.server}/api/content/download', params=params, headers=headers, stream=True) as req:
if req.status_code == requests.codes.ok:
file_bytes = BytesIO()
for chunk in req.iter_content(chunk_size=CHUNK_SIZE):
file_bytes.write(chunk)
file_bytes.seek(0)
return file_bytes
elif req.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.download_bytes(reference)
elif req.status_code == requests.codes.not_found:
logger.error(f"The requested asset reference is not found in the repository: {reference}")
raise RuntimeError(reference, "The requested reference is not found in the repository")
else:
logger.error(f"download failed with error code: {req.status_code}")
raise RuntimeError(req.status_code, f"download failed with error code: {req.status_code}")
def download(self, reference, filename) -> str:
headers = {HEADER_TOKEN: self.token, 'Content-Type': 'application/octet-stream', 'X-STREAM-No-Retry': 'true'}
params = {'id': f'sdb:IO|{reference}'}
with self.session.get(f'{self.protocol}://{self.server}/api/content/download', params=params, headers=headers,
stream=True) as req:
if req.status_code == requests.codes.ok:
with open(filename, 'wb') as file:
for chunk in req.iter_content(chunk_size=CHUNK_SIZE):
file.write(chunk)
file.flush()
return filename
elif req.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.download(reference, filename)
elif req.status_code == requests.codes.not_found:
logger.error(f"The requested asset reference is not found in the repository: {reference}")
raise RuntimeError(reference, "The requested reference is not found in the repository")
else:
logger.error(f"download failed with error code: {req.status_code}")
raise RuntimeError(req.status_code, f"download failed with error code: {req.status_code}")
def thumbnail_bytes(self, entity_type, reference: str, size: Thumbnail = Thumbnail.LARGE) -> BytesIO:
headers = {HEADER_TOKEN: self.token, 'accept': 'image/png', 'X-STREAM-No-Retry': 'true'}
params = {'id': f'sdb:{entity_type}|{reference}', 'size': f'{size.value}'}
with self.session.get(f'{self.protocol}://{self.server}/api/content/thumbnail', params=params, headers=headers, stream=True) as req:
if req.status_code == requests.codes.ok:
file_bytes = BytesIO()
for chunk in req.iter_content(chunk_size=CHUNK_SIZE):
file_bytes.write(chunk)
file_bytes.seek(0)
return file_bytes
elif req.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.thumbnail_bytes(entity_type, reference, size)
elif req.status_code == requests.codes.not_found:
logger.error(req.content.decode("utf-8"))
logger.error(f"The requested reference is not found in the repository: {reference}")
raise RuntimeError(reference, "The requested reference is not found in the repository")
else:
logger.error(f"thumbnail failed with error code: {req.status_code}")
raise RuntimeError(req.status_code, f"thumbnail failed with error code: {req.status_code}")
def thumbnail(self, entity_type, reference, filename, size=Thumbnail.LARGE) -> str:
headers = {HEADER_TOKEN: self.token, 'accept': 'image/png', 'X-STREAM-No-Retry': 'true'}
params = {'id': f'sdb:{entity_type}|{reference}', 'size': f'{size.value}'}
with self.session.get(f'{self.protocol}://{self.server}/api/content/thumbnail', params=params, headers=headers, stream=True) as req:
if req.status_code == requests.codes.ok:
with open(filename, 'wb') as file:
for chunk in req.iter_content(chunk_size=CHUNK_SIZE):
file.write(chunk)
file.flush()
return filename
elif req.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.thumbnail(entity_type, reference, filename, size)
elif req.status_code == requests.codes.not_found:
logger.error(req.content.decode("utf-8"))
logger.error(f"The requested reference is not found in the repository: {reference}")
raise RuntimeError(reference, "The requested reference is not found in the repository")
else:
logger.error(f"thumbnail failed with error code: {req.status_code}")
raise RuntimeError(req.status_code, f"thumbnail failed with error code: {req.status_code}")
def indexed_fields(self) -> dict:
headers = {HEADER_TOKEN: self.token}
results = self.session.get(f'{self.protocol}://{self.server}/api/content/indexed-fields', headers=headers)
if results.status_code == requests.codes.ok:
fields = {}
for ob in results.json()["value"]:
field = f'{ob["shortName"]}.{ob["index"]}'
fields[field] = ob["uri"]
return fields
else:
logger.error(f"indexed_fields failed with error code: {results.status_code}")
raise RuntimeError(results.status_code, f"indexed_fields failed with error code: {results.status_code}")
def simple_search_csv(self, query: str = "%", page_size: int = 50, csv_file="search.csv",
list_indexes: list|None = None):
if list_indexes is None or len(list_indexes) == 0:
metadata_fields = ["xip.reference", "xip.title", "xip.description", "xip.document_type",
"xip.parent_ref", "xip.security_descriptor"]
else:
metadata_fields = list(list_indexes)
if "xip.reference" not in metadata_fields:
metadata_fields.insert(0, "xip.reference")
with open(csv_file, newline='', mode="wt", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=metadata_fields)
writer.writeheader()
writer.writerows(self.simple_search_list(query, page_size, metadata_fields))
def simple_search_list(self, query: str = "%", page_size: int = 50, list_indexes: list|None = None) -> Generator:
search_result = self._simple_search(query, 0, page_size, list_indexes)
for e in search_result.results_list:
yield e
found = len(search_result.results_list)
while search_result.hits > found:
search_result = self._simple_search(query, found, page_size, list_indexes)
for e in search_result.results_list:
yield e
found = found + len(search_result.results_list)
def _simple_search(self, query: str = "%", start_index: int = 0, page_size: int = 10, list_indexes: list|None = None) -> SearchResult:
start_from = str(start_index)
headers = {'Content-Type': 'application/x-www-form-urlencoded', HEADER_TOKEN: self.token}
query_term = ('{ "q": "%s" }' % query)
if list_indexes is None or len(list_indexes) == 0:
metadata_fields = "xip.title,xip.description,xip.document_type,xip.parent_ref,xip.security_descriptor"
else:
metadata_fields = ','.join(list_indexes)
payload = {'start': start_from, 'max': str(page_size), 'metadata': metadata_fields, 'q': query_term}
results = self.session.post(f'{self.protocol}://{self.server}/api/content/search', data=payload,
headers=headers)
results_list = []
if results.status_code == requests.codes.ok:
json_doc = results.json()
metadata = json_doc['value']['metadata']
refs = list(json_doc['value']['objectIds'])
refs = list(map(lambda x: content_api_identifier_to_type(x), refs))
hits = int(json_doc['value']['totalHits'])
for m_row, r_row in zip(metadata, refs):
results_map = {'xip.reference': r_row[1]}
for li in m_row:
results_map[li['name']] = li['value']
results_list.append(results_map)
next_start = start_index + page_size
if self.callback is not None:
value = str(f'{len(results_list) + start_index}:{hits}')
self.callback(value)
search_results = self.SearchResult(metadata, refs, hits, results_list, next_start)
return search_results
else:
logger.error(f"search failed with error code: {results.status_code}")
raise RuntimeError(results.status_code, f"simple_search failed with error code: {results.status_code}")
def search_index_filter_csv(self, query: str = "%", csv_file="search.csv", page_size: int = 50,
filter_values: dict|None = None,
sort_values: dict|None = None):
if filter_values is None:
filter_values = {}
if "xip.reference" not in filter_values:
filter_values["xip.reference"] = ""
header_fields = list(filter_values.keys())
index = header_fields.index("xip.reference")
header_fields.insert(0, header_fields.pop(index))
with open(csv_file, newline='', mode="wt", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=header_fields)
writer.writeheader()
writer.writerows(self.search_index_filter_list(query, page_size, filter_values, sort_values))
[docs]
def search_fields(self, query: str = "%", fields: list[Field]|None=None, page_size: int = 25) -> Generator:
"""
Run a search query with multiple fields
:param query: The main search query.
:param fields: List of search fields
:param page_size: The default search page size
:return: search result
"""
if self.major_version < 7 or (self.major_version == 7 and self.minor_version < 5):
raise RuntimeError("search_fields API call is not available when connected to a v7.5 System")
search_result = self._search_fields(query=query, fields=fields, start_index=0, page_size=page_size)
for e in search_result.results_list:
yield e
found = len(search_result.results_list)
while search_result.hits > found:
search_result = self._search_fields(query=query, fields=fields, start_index=found, page_size=page_size)
for e in search_result.results_list:
yield e
found = found + len(search_result.results_list)
def _search_fields(self, query: str = "%", fields: list[Field]|None=None, start_index: int = 0, page_size: int = 25) -> SearchResult:
start_from = str(start_index)
headers = {'Content-Type': 'application/x-www-form-urlencoded', HEADER_TOKEN: self.token}
if fields is None:
fields = []
field_list = []
sort_list = []
metadata_elements = []
for field in fields:
metadata_elements.append(field.name)
if field.value is None or field.value == "":
field_list.append('{' f' "name": "{field.name}", "values": [] ' + '}')
else:
if isinstance(field.value, str):
if field.operator == Operator.NOT:
field_list.append(
'{' f' "name": "{field.name}", "values": ["{field.value}"], "operator": "NOT" ' + '}')
else:
field_list.append('{' f' "name": "{field.name}", "values":[ "{field.value}" ]' '}')
if isinstance(field.value, list):
values = [f'"{w}"' for w in field.value]
v:str = f' {",".join(values)} '
if field.operator == Operator.NOT:
field_list.append(
'{' f' "name": "{field.name}", "values": [ {v} ], "operator": "NOT" ' + '}')
else:
field_list.append('{' f' "name": "{field.name}", "values":[ {v} ]' '}')
if field.sort_order is not None:
sort_list.append(f'{{"sortFields": ["{field.name}"], "sortOrder": "{field.sort_order.name}"}}')
filter_terms = ','.join(field_list)
if len(sort_list) == 0:
query_term = ('{ "q": "%s", "fields": [ %s ] }' % (query, filter_terms))
else:
sort_terms = ','.join(sort_list)
query_term = ('{ "q": "%s", "fields": [ %s ], "sort": [ %s ]}' % (query, filter_terms, sort_terms))
if len(metadata_elements) == 0:
metadata_elements.append("xip.title")
payload = {'start': start_from, 'max': str(page_size), 'metadata': list(metadata_elements), 'q': query_term}
logger.debug(payload)
results = self.session.post(f'{self.protocol}://{self.server}/api/content/search', data=payload,
headers=headers)
results_list = []
if results.status_code == requests.codes.ok:
json_doc = results.json()
metadata = json_doc['value']['metadata']
refs = list(json_doc['value']['objectIds'])
refs = list(map(lambda x: content_api_identifier_to_type(x), refs))
hits = int(json_doc['value']['totalHits'])
for m_row, r_row in zip(metadata, refs):
results_map = {'xip.reference': r_row[1]}
for li in m_row:
results_map[li['name']] = li['value']
results_list.append(results_map)
next_start = start_index + page_size
if self.callback is not None:
value = str(f'{len(results_list) + start_index}:{hits}')
self.callback(value)
search_results = self.SearchResult(metadata, refs, hits, results_list, next_start)
return search_results
else:
logger.error(f"search failed with error code: {results.status_code}")
raise RuntimeError(results.status_code, f"search_index_filter failed")
[docs]
def search_index_filter_list(self, query: str = "%", page_size: int = 25, filter_values: dict|None = None,
sort_values: dict|None = None) -> Generator:
"""
Run a search query with optional filters
:param query: The main search query.
:param page_size: The default search page size
:param filter_values: Dictionary of index names and values
:param sort_values: Dictionary of sort index names and values
:return: search result
"""
search_result = self._search_index_filter(query, 0, page_size, filter_values, sort_values)
for e in search_result.results_list:
yield e
found = len(search_result.results_list)
while search_result.hits > found:
search_result = self._search_index_filter(query, found, page_size, filter_values, sort_values)
for e in search_result.results_list:
yield e
found = found + len(search_result.results_list)
[docs]
def search_index_filter_hits(self, query: str = "%", filter_values: dict|None = None) -> int:
"""
Run a search query with filters and return the number of hits only
:param query: The main search query.
:param filter_values: Dictionary of index names and values
:return: Number of search results
"""
start_from = str(0)
headers = {'Content-Type': 'application/x-www-form-urlencoded', HEADER_TOKEN: self.token}
if filter_values is None:
filter_values = {'xip.reference': '', 'xip.title': ''}
field_list = []
for key, value in filter_values.items():
if value == "":
field_list.append('{' f' "name": "{key}", "values": [] ' + '}')
else:
if isinstance(value, str):
field_list.append('{' f' "name": "{key}", "values": ["{value}"] ' + '}')
if isinstance(value, list):
values = [f'"{w}"' for w in value]
v: str = f' {",".join(values)} '
field_list.append('{' f' "name": "{key}", "values":[ {v} ]' '}')
filter_terms = ','.join(field_list)
query_term = ('{ "q": "%s", "fields": [ %s ] }' % (query, filter_terms))
payload = {'start': start_from, 'max': str(10), 'metadata': list(filter_values.keys()), 'q': query_term}
results = self.session.post(f'{self.protocol}://{self.server}/api/content/search', data=payload,
headers=headers)
if results.status_code == requests.codes.ok:
json_doc = results.json()
return int(json_doc['value']['totalHits'])
else:
logger.error(f"search failed with error code: {results.status_code}")
raise RuntimeError(results.status_code, f"_search_index_filter_hits failed")
def _search_index_filter(self, query: str = "%", start_index: int = 0, page_size: int = 25,
filter_values: dict|None = None, sort_values: dict|None = None) -> SearchResult:
start_from = str(start_index)
headers = {'Content-Type': 'application/x-www-form-urlencoded', HEADER_TOKEN: self.token}
if filter_values is None:
filter_values = {}
field_list = []
for key, value in filter_values.items():
if value == "":
field_list.append('{' f' "name": "{key}", "values": [] ' + '}')
else:
if isinstance(value, str):
field_list.append('{' f' "name": "{key}", "values": ["{value}"] ' + '}')
if isinstance(value, list):
values = [f'"{w}"' for w in value]
v: str = f' {",".join(values)} '
field_list.append('{' f' "name": "{key}", "values":[ {v} ]' '}')
filter_terms = ','.join(field_list)
if sort_values is None:
query_term = ('{ "q": "%s", "fields": [ %s ] }' % (query, filter_terms))
else:
sort_list = []
for key, value in sort_values.items():
direction = "asc"
if str(value).lower().startswith("d"):
direction = "desc"
sort_list.append(f'{{"sortFields": ["{key}"], "sortOrder": "{direction}"}}')
sort_terms = ','.join(sort_list)
query_term = ('{ "q": "%s", "fields": [ %s ], "sort": [ %s ]}' % (query, filter_terms, sort_terms))
payload = {'start': start_from, 'max': str(page_size), 'metadata': list(filter_values.keys()), 'q': query_term}
logger.debug(payload)
results = self.session.post(f'{self.protocol}://{self.server}/api/content/search', data=payload,
headers=headers)
results_list = []
if results.status_code == requests.codes.ok:
json_doc = results.json()
metadata = json_doc['value']['metadata']
refs = list(json_doc['value']['objectIds'])
refs = list(map(lambda x: content_api_identifier_to_type(x), refs))
hits = int(json_doc['value']['totalHits'])
for m_row, r_row in zip(metadata, refs):
results_map = {'xip.reference': r_row[1]}
for li in m_row:
results_map[li['name']] = li['value']
results_list.append(results_map)
next_start = start_index + page_size
if self.callback is not None:
value = str(f'{len(results_list) + start_index}:{hits}')
self.callback(value)
search_results = self.SearchResult(metadata, refs, hits, results_list, next_start)
return search_results
else:
logger.error(f"search failed with error code: {results.status_code}")
raise RuntimeError(results.status_code, f"search_index_filter failed")
class ReportProgressCallBack:
def __init__(self):
self.current = 0
self.total = 0
self._lock = threading.Lock()
def __call__(self, value):
with self._lock:
values = value.split(":")
self.total = int(values[1])
self.current = int(values[0])
if self.total == 0:
percentage = 100.0
else:
percentage = (self.current / self.total) * 100
sys.stdout.write("\rProcessing Hits %s from %s (%.2f%%)" % (self.current, self.total, percentage))
sys.stdout.flush()