"""
pyPreservica UploadAPI module definition
A client library for the Preservica Repository Upload API
author: James Carr
licence: Apache License 2.0
"""
import csv
import shutil
import tempfile
import uuid
import xml
from datetime import datetime, timedelta
from time import sleep
from xml.dom import minidom
from xml.etree import ElementTree
from xml.etree.ElementTree import Element, SubElement
import boto3
import s3transfer.tasks
import s3transfer.upload
from boto3.s3.transfer import TransferConfig, S3Transfer
from botocore.config import Config
from botocore.credentials import RefreshableCredentials
from botocore.exceptions import ClientError
from s3transfer import S3UploadFailedError
from tqdm import tqdm
from pyPreservica.common import *
from pyPreservica.common import _make_stored_zipfile
logger = logging.getLogger(__name__)
MB = 1024 * 1024
GB = 1024 ** 3
transfer_config = TransferConfig(multipart_threshold=int((1 * GB) / 16))
CONTENT_FOLDER = "content"
PRESERVATION_CONTENT_FOLDER = "p1"
ACCESS_CONTENT_FOLDER = "a1"
def upload_file(self, filename, bucket, key, callback=None, extra_args=None):
"""Upload a file to an S3 object.
Variants have also been injected into S3 client, Bucket and Object.
You don't have to use S3Transfer.upload_file() directly.
.. seealso::
:py:meth:`S3.Client.upload_file`
:py:meth:`S3.Client.upload_fileobj`
"""
if not isinstance(filename, str):
raise ValueError('Filename must be a string')
subscribers = self._get_subscribers(callback)
future = self._manager.upload(filename, bucket, key, extra_args, subscribers)
try:
return future.result()
# If a client error was raised, add the backwards compatibility layer
# that raises a S3UploadFailedError. These specific errors were only
# ever thrown for upload_parts but now can be thrown for any related
# client error.
except ClientError as e:
raise S3UploadFailedError("Failed to upload %s to %s: %s" % (filename, '/'.join([bucket, key]), e))
class PutObjectTask(s3transfer.tasks.Task):
# Copied from s3transfer/upload.py, changed to return the result of client.put_object.
def _main(self, client, fileobj, bucket, key, extra_args):
with fileobj as body:
response = client.put_object(Bucket=bucket, Key=key, Body=body, **extra_args)
return response
class CompleteMultipartUploadTask(s3transfer.tasks.Task):
# Copied from s3transfer/tasks.py, changed to return a result.
def _main(self, client, bucket, key, upload_id, parts, extra_args):
return client.complete_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id, MultipartUpload={"Parts": parts},
**extra_args, )
s3transfer.upload.PutObjectTask = PutObjectTask
s3transfer.upload.CompleteMultipartUploadTask = CompleteMultipartUploadTask
def prettify(elem):
"""Return a pretty-printed XML string for the Element.
"""
rough_string = xml.etree.ElementTree.tostring(elem, 'utf-8')
re_parsed = minidom.parseString(rough_string)
return re_parsed.toprettyxml(indent=" ")
def __create_io__(xip=None, file_name=None, parent_folder=None, **kwargs):
if xip is None:
xip = Element('XIP')
assert xip is not None
xip.set('xmlns', 'http://preservica.com/XIP/v6.0')
io = SubElement(xip, 'InformationObject')
ref = SubElement(io, 'Ref')
if 'IO_Identifier_callback' in kwargs:
ident_callback = kwargs.get('IO_Identifier_callback')
ref.text = ident_callback()
else:
ref.text = str(uuid.uuid4())
title = SubElement(io, 'Title')
title.text = kwargs.get('Title', file_name)
description = SubElement(io, 'Description')
description.text = kwargs.get('Description', file_name)
security = SubElement(io, 'SecurityTag')
security.text = kwargs.get('SecurityTag', "open")
custom_type = SubElement(io, 'CustomType')
custom_type.text = kwargs.get('CustomType', "")
parent = SubElement(io, 'Parent')
if hasattr(parent_folder, "reference"):
parent.text = parent_folder.reference
elif isinstance(parent_folder, str):
parent.text = parent_folder
return xip, ref.text
def __make_representation__(xip, rep_name, rep_type, io_ref):
representation = SubElement(xip, 'Representation')
io_link = SubElement(representation, 'InformationObject')
io_link.text = io_ref
access_name = SubElement(representation, 'Name')
access_name.text = rep_name
access_type = SubElement(representation, 'Type')
access_type.text = rep_type
content_objects = SubElement(representation, 'ContentObjects')
content_object = SubElement(content_objects, 'ContentObject')
content_object_ref = str(uuid.uuid4())
content_object.text = content_object_ref
return content_object_ref
def __make_content_objects__(xip, content_title, co_ref, io_ref, tag, content_description, content_type):
content_object = SubElement(xip, 'ContentObject')
ref_element = SubElement(content_object, "Ref")
ref_element.text = co_ref
title = SubElement(content_object, "Title")
title.text = content_title
description = SubElement(content_object, "Description")
description.text = content_description
security_tag = SubElement(content_object, "SecurityTag")
security_tag.text = tag
custom_type = SubElement(content_object, "CustomType")
custom_type.text = content_type
parent = SubElement(content_object, "Parent")
parent.text = io_ref
def __make_generation__(xip, filename, co_ref, generation_label, location=None):
generation = SubElement(xip, 'Generation', {"original": "true", "active": "true"})
content_object = SubElement(generation, "ContentObject")
content_object.text = co_ref
label = SubElement(generation, "Label")
if generation_label:
label.text = generation_label
else:
label.text = os.path.splitext(filename)[0]
effective_date = SubElement(generation, "EffectiveDate")
effective_date.text = datetime.now().isoformat()
bitstreams = SubElement(generation, "Bitstreams")
bitstream = SubElement(bitstreams, "Bitstream")
bitstream.text = f"{location}/{filename}"
SubElement(generation, "Formats")
SubElement(generation, "Properties")
def __make_bitstream__(xip, file_name, full_path, callback, location=None):
bitstream = SubElement(xip, 'Bitstream')
filename_element = SubElement(bitstream, "Filename")
filename_element.text = file_name
filesize = SubElement(bitstream, "FileSize")
file_stats = os.stat(full_path)
filesize.text = str(file_stats.st_size)
physical_location = SubElement(bitstream, "PhysicalLocation")
physical_location.text = location
fixities = SubElement(bitstream, "Fixities")
fixity_result = callback(file_name, full_path)
if type(fixity_result) == tuple:
fixity = SubElement(fixities, "Fixity")
fixity_algorithm_ref = SubElement(fixity, "FixityAlgorithmRef")
fixity_value = SubElement(fixity, "FixityValue")
fixity_algorithm_ref.text = fixity_result[0]
fixity_value.text = fixity_result[1]
elif type(fixity_result) == dict:
for key, val in fixity_result.items():
fixity = SubElement(fixities, "Fixity")
fixity_algorithm_ref = SubElement(fixity, "FixityAlgorithmRef")
fixity_value = SubElement(fixity, "FixityValue")
fixity_algorithm_ref.text = key
fixity_value.text = val
else:
logger.error("Could Not Find Fixity Value")
raise RuntimeError("Could Not Find Fixity Value")
def __make_representation_multiple_co__(xip, rep_name, rep_type, rep_files, io_ref):
representation = SubElement(xip, 'Representation')
io_link = SubElement(representation, 'InformationObject')
io_link.text = io_ref
access_name = SubElement(representation, 'Name')
access_name.text = rep_name
access_type = SubElement(representation, 'Type')
access_type.text = rep_type
content_objects = SubElement(representation, 'ContentObjects')
refs_dict = {}
for f in rep_files:
content_object = SubElement(content_objects, 'ContentObject')
content_object_ref = str(uuid.uuid4())
content_object.text = content_object_ref
refs_dict[content_object_ref] = f
return refs_dict
def cvs_to_cmis_xslt(csv_file, xml_namespace, root_element, title="Metadata Title", export_folder=None, additional_namespaces=None):
"""
Create a custom CMIS transform to display metadata within UA.
"""
headers = set()
with open(csv_file, encoding='utf-8-sig', newline='') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
for header in row:
xml_tag = header.strip()
xml_tag = xml_tag.replace(" ", "")
xml_tag = xml_tag.replace("-", "")
headers.add(xml_tag)
break
namespaces = {"version": "2.0", "xmlns:xsl": "http://www.w3.org/1999/XSL/Transform",
"xmlns:fn": "http://www.w3.org/2005/xpath-functions", "xmlns:xs": "http://www.w3.org/2001/XMLSchema",
"xmlns:csv": xml_namespace, "xmlns": "http://www.tessella.com/sdb/cmis/metadata", "exclude-result-prefixes": "csv"}
if additional_namespaces is not None:
for prefix, uri in additional_namespaces.items():
namespaces["xmlns:" + prefix] = uri
xml_stylesheet = xml.etree.ElementTree.Element("xsl:stylesheet", namespaces)
xml.etree.ElementTree.SubElement(xml_stylesheet, "xsl:output", {"method": "xml", "indent": "yes"})
xml_template = xml.etree.ElementTree.SubElement(xml_stylesheet, "xsl:template", {"match": "csv:" + root_element})
xml_group = xml.etree.ElementTree.SubElement(xml_template, "group")
xml_title = xml.etree.ElementTree.SubElement(xml_group, "title")
xml_title.text = title
xml_templates = xml.etree.ElementTree.SubElement(xml_group, "xsl:apply-templates")
elements = ""
for header in headers:
if ":" in header:
elements = elements + "|" + header
else:
elements = elements + "|csv:" + header
elements = elements[1:]
xml_matches = xml.etree.ElementTree.SubElement(xml_stylesheet, "xsl:template", {"match": elements})
xml_item = xml.etree.ElementTree.SubElement(xml_matches, "item")
xml_name = xml.etree.ElementTree.SubElement(xml_item, "name")
xml_name_value = xml.etree.ElementTree.SubElement(xml_name, "xsl:value-of", {
"select": "fn:replace(translate(local-name(), '_', ' '), '([a-z])([A-Z])', '$1 $2')"})
xml_value = xml.etree.ElementTree.SubElement(xml_item, "value")
xml_value_value = xml.etree.ElementTree.SubElement(xml_value, "xsl:value-of", {"select": "."})
xml_type = xml.etree.ElementTree.SubElement(xml_item, "type")
xml_type_value = xml.etree.ElementTree.SubElement(xml_type, "xsl:value-of", {
"select": "fn:replace(translate(local-name(), '_', ' '), '([a-z])([A-Z])', '$1 $2')"})
xml_request = xml.etree.ElementTree.tostring(xml_stylesheet, encoding='utf-8', xml_declaration=True)
cmis_xslt = root_element + "-cmis.xslt"
if export_folder is not None:
cmis_xslt = os.path.join(export_folder, cmis_xslt)
file = open(cmis_xslt, "wt", encoding="utf-8")
file.write(xml_request.decode("utf-8"))
file.close()
return cmis_xslt
def cvs_to_xsd(csv_file, xml_namespace, root_element, export_folder=None, additional_namespaces=None):
"""
Create a XSD definition based on the csv file
"""
headers = set()
with open(csv_file, encoding='utf-8-sig', newline='') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
for header in row:
xml_tag = header.strip()
xml_tag = xml_tag.replace(" ", "")
xml_tag = xml_tag.replace("-", "")
headers.add(xml_tag)
break
namespaces = {"xmlns:xs": "http://www.w3.org/2001/XMLSchema", "attributeFormDefault": "unqualified", "elementFormDefault": "qualified",
"targetNamespace": xml_namespace}
if additional_namespaces is not None:
for prefix, uri in additional_namespaces.items():
namespaces["xmlns:" + prefix.strip()] = uri.strip()
xml_schema = xml.etree.ElementTree.Element("xs:schema", namespaces)
if additional_namespaces is not None:
for prefix, namespace in additional_namespaces.items():
xml_import = xml.etree.ElementTree.SubElement(xml_schema, "xs:import", {"namespace": namespace})
xml_element = xml.etree.ElementTree.SubElement(xml_schema, "xs:element", {"name": root_element})
xml_complex_type = xml.etree.ElementTree.SubElement(xml_element, "xs:complexType")
xml_sequence = xml.etree.ElementTree.SubElement(xml_complex_type, "xs:sequence")
for header in headers:
if ":" in header:
prefix, sep, tag = header.partition(":")
try:
namespace = additional_namespaces[prefix]
xml.etree.ElementTree.SubElement(xml_sequence, "xs:element", {"ref": header, "xmlns:" + prefix: namespace})
except KeyError:
xml.etree.ElementTree.SubElement(xml_sequence, "xs:element", {"type": "xs:string", "name": header})
else:
xml.etree.ElementTree.SubElement(xml_sequence, "xs:element", {"type": "xs:string", "name": header})
xml_request = xml.etree.ElementTree.tostring(xml_schema, encoding='utf-8', xml_declaration=True)
xsd_file = root_element + ".xsd"
if export_folder is not None:
xsd_file = os.path.join(export_folder, xsd_file)
file = open(xsd_file, "wt", encoding="utf-8")
file.write(xml_request.decode("utf-8"))
file.close()
return xsd_file
def csv_to_search_xml(csv_file, xml_namespace, root_element, title="Metadata Title", export_folder=None, additional_namespaces=None):
"""
Create a custom Preservica search index based on the columns in a csv file
"""
headers = set()
with open(csv_file, encoding='utf-8-sig', newline='') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
for header in row:
xml_tag = header.strip()
xml_tag = xml_tag.replace(" ", "")
xml_tag = xml_tag.replace("-", "")
headers.add(xml_tag)
break
xml_index = xml.etree.ElementTree.Element("index", {"xmlns": "http://www.preservica.com/customindex/v1"})
short_name = "csv"
xml_schema_name = xml.etree.ElementTree.SubElement(xml_index, "schemaName")
xml_schema_name.text = title
xml_schema_uri = xml.etree.ElementTree.SubElement(xml_index, "schemaUri")
xml_schema_uri.text = xml_namespace
xml_short_name = xml.etree.ElementTree.SubElement(xml_index, "shortName")
xml_short_name.text = short_name
for header in headers:
if ":" in header:
xpath_expression = f"//{short_name}:{root_element}/{header}"
else:
xpath_expression = f"//{short_name}:{root_element}/{short_name}:{header}"
attr = {"indexName": header, "displayName": header, "xpath": xpath_expression, "indexType": "STRING_DEFAULT"}
xml_term = xml.etree.ElementTree.SubElement(xml_index, "term", attr)
if additional_namespaces is not None:
for prefix, uri in additional_namespaces.items():
xml.etree.ElementTree.SubElement(xml_index, "namespaceMapping", {"key": prefix, "value": uri})
xml_request = xml.etree.ElementTree.tostring(xml_index, encoding='utf-8', xml_declaration=True)
search_xml = root_element + "-index.xml"
if export_folder is not None:
search_xml = os.path.join(export_folder, search_xml)
file = open(search_xml, "wt", encoding="utf-8")
file.write(xml_request.decode("utf-8"))
file.close()
return search_xml
[docs]
def cvs_to_xml(csv_file, xml_namespace, root_element, file_name_column="filename", export_folder=None, additional_namespaces=None):
"""
Export the rows of a CSV file as XML metadata documents which can be added to Preservica assets
:param str csv_file: Path to the csv file
:param str xml_namespace: The XML namespace for the created XML documents
:param str root_element: The root element for the XML documents
:param str file_name_column: The CSV column which should be used to name the xml files
:param str export_folder: The path to the export folder
:param dict additional_namespaces: A map of prefix, uris to use as additional namespaces
"""
headers = list()
link_column_id = 0
with open(csv_file, encoding='utf-8-sig', newline='') as csvfile:
reader = csv.reader(csvfile)
for row in reader:
col_id = 0
for header in row:
col_id += 1
if header == file_name_column:
link_column_id = col_id
xml_tag = header.strip()
xml_tag = xml_tag.replace(" ", "")
xml_tag = xml_tag.replace("-", "")
headers.append(xml_tag)
break
if link_column_id > 0:
namespaces = {"xmlns": xml_namespace}
if additional_namespaces is not None:
for prefix, uri in additional_namespaces.items():
namespaces["xmlns:" + prefix] = uri
for row in reader:
col_id = 0
xml_object = xml.etree.ElementTree.Element(root_element, namespaces)
for value, header in zip(row, headers):
col_id += 1
xml.etree.ElementTree.SubElement(xml_object, header).text = value
if col_id == link_column_id:
file_name = value
xml_request = xml.etree.ElementTree.tostring(xml_object, encoding='utf-8', xml_declaration=True)
name = file_name + ".xml"
name = sanitize(name)
if export_folder is not None:
name = os.path.join(export_folder, name)
file = open(name, "wt", encoding="utf-8")
file.write(xml_request.decode("utf-8"))
file.close()
yield name
def generic_asset_package(preservation_files_dict=None, access_files_dict=None, export_folder=None, parent_folder=None, compress=True,
**kwargs):
# some basic validation
if export_folder is None:
export_folder = tempfile.gettempdir()
if not os.path.isdir(export_folder):
logger.error("Export Folder Does Not Exist")
raise RuntimeError(export_folder, "Export Folder Does Not Exist")
if parent_folder is None:
logger.error("You must specify a parent folder for the package asset")
raise RuntimeError("You must specify a parent folder for the package asset")
io_ref = None
xip = None
default_asset_title = None
preservation_representation_refs_dict = {}
access_representation_refs_dict = {}
security_tag = kwargs.get('SecurityTag', "open")
content_type = kwargs.get('CustomType', "")
if not compress:
shutil.register_archive_format("szip", _make_stored_zipfile, None, "UnCompressed ZIP file")
has_preservation_files = bool((preservation_files_dict is not None) and (len(preservation_files_dict) > 0))
has_access_files = bool((access_files_dict is not None) and (len(access_files_dict) > 0))
if has_preservation_files:
if default_asset_title is None:
key = list(preservation_files_dict.keys())[0]
preservation_files_list = preservation_files_dict[key]
default_asset_title = os.path.splitext(os.path.basename(preservation_files_list[0]))[0]
# create the asset
xip, io_ref = __create_io__(file_name=default_asset_title, parent_folder=parent_folder, **kwargs)
if has_access_files:
if default_asset_title is None:
key = list(access_files_dict.keys())[0]
access_files_list = access_files_dict[key]
default_asset_title = os.path.splitext(os.path.basename(access_files_list[0]))[0]
if io_ref is None:
xip, io_ref = __create_io__(file_name=default_asset_title, parent_folder=parent_folder, **kwargs)
# loop over preservation_files_map
if has_preservation_files:
for representation_name in preservation_files_dict.keys():
preservation_files_list = preservation_files_dict[representation_name]
preservation_refs_dict = __make_representation_multiple_co__(xip, rep_name=representation_name, rep_type="Preservation",
rep_files=preservation_files_list, io_ref=io_ref)
preservation_representation_refs_dict[representation_name] = preservation_refs_dict
if has_access_files:
for representation_name in access_files_dict.keys():
access_files_list = access_files_dict[representation_name]
access_refs_dict = __make_representation_multiple_co__(xip, rep_name=representation_name, rep_type="Access",
rep_files=access_files_list, io_ref=io_ref)
access_representation_refs_dict[representation_name] = access_refs_dict
if has_preservation_files:
for representation_name in preservation_representation_refs_dict.keys():
preservation_refs_dict = preservation_representation_refs_dict[representation_name]
for content_ref, filename in preservation_refs_dict.items():
default_content_objects_title = os.path.splitext(os.path.basename(filename))[0]
preservation_content_title = kwargs.get('Preservation_Content_Title', default_content_objects_title)
preservation_content_description = kwargs.get('Preservation_Content_Description', default_content_objects_title)
if isinstance(preservation_content_title, dict):
preservation_content_title = preservation_content_title.get("filename", default_content_objects_title)
if isinstance(preservation_content_description, dict):
preservation_content_description = preservation_content_description.get("filename", default_content_objects_title)
__make_content_objects__(xip, preservation_content_title, content_ref, io_ref, security_tag,
preservation_content_description, content_type)
if has_access_files:
for representation_name in access_representation_refs_dict.keys():
access_refs_dict = access_representation_refs_dict[representation_name]
for content_ref, filename in access_refs_dict.items():
default_content_objects_title = os.path.splitext(os.path.basename(filename))[0]
access_content_title = kwargs.get('Access_Content_Title', default_content_objects_title)
access_content_description = kwargs.get('Access_Content_Description', default_content_objects_title)
if isinstance(access_content_title, dict):
access_content_title = access_content_title.get("filename", default_content_objects_title)
if isinstance(access_content_description, dict):
access_content_description = access_content_title.get("filename", default_content_objects_title)
__make_content_objects__(xip, access_content_title, content_ref, io_ref, security_tag, access_content_description,
content_type)
if has_preservation_files:
for representation_name in preservation_representation_refs_dict.keys():
location = sanitize(representation_name)
preservation_refs_dict = preservation_representation_refs_dict[representation_name]
preservation_generation_label = kwargs.get('Preservation_Generation_Label', "")
for content_ref, filename in preservation_refs_dict.items():
preservation_file_name = os.path.basename(filename)
__make_generation__(xip, preservation_file_name, content_ref, preservation_generation_label, location)
if has_access_files:
for representation_name in access_representation_refs_dict.keys():
location = sanitize(representation_name)
access_refs_dict = access_representation_refs_dict[representation_name]
access_generation_label = kwargs.get('Access_Generation_Label', "")
for content_ref, filename in access_refs_dict.items():
access_file_name = os.path.basename(filename)
__make_generation__(xip, access_file_name, content_ref, access_generation_label, location)
if has_preservation_files:
if 'Preservation_files_fixity_callback' in kwargs:
callback = kwargs.get('Preservation_files_fixity_callback')
else:
callback = Sha1FixityCallBack()
for representation_name in preservation_representation_refs_dict.keys():
location = sanitize(representation_name)
preservation_refs_dict = preservation_representation_refs_dict[representation_name]
for content_ref, filename in preservation_refs_dict.items():
preservation_file_name = os.path.basename(filename)
__make_bitstream__(xip, preservation_file_name, filename, callback, location)
if has_access_files:
if 'Access_files_fixity_callback' in kwargs:
callback = kwargs.get('Access_files_fixity_callback')
else:
callback = Sha1FixityCallBack()
for representation_name in access_representation_refs_dict.keys():
location = sanitize(representation_name)
access_refs_dict = access_representation_refs_dict[representation_name]
for content_ref, filename in access_refs_dict.items():
access_file_name = os.path.basename(filename)
__make_bitstream__(xip, access_file_name, filename, callback, location)
if 'Identifiers' in kwargs:
identifier_map = kwargs.get('Identifiers')
for identifier_key, identifier_value in identifier_map.items():
if identifier_key:
if identifier_value:
identifier = SubElement(xip, 'Identifier')
id_type = SubElement(identifier, "Type")
id_type.text = identifier_key
id_value = SubElement(identifier, "Value")
id_value.text = identifier_value
id_io = SubElement(identifier, "Entity")
id_io.text = io_ref
if 'Asset_Metadata' in kwargs:
metadata_map = kwargs.get('Asset_Metadata')
for metadata_ns, metadata_path in metadata_map.items():
if metadata_ns:
if metadata_path:
if os.path.exists(metadata_path) and os.path.isfile(metadata_path):
descriptive_metadata = xml.etree.ElementTree.parse(source=metadata_path)
metadata = SubElement(xip, 'Metadata', {'schemaUri': metadata_ns})
metadata_ref = SubElement(metadata, 'Ref')
metadata_ref.text = str(uuid.uuid4())
entity = SubElement(metadata, 'Entity')
entity.text = io_ref
content = SubElement(metadata, 'Content')
content.append(descriptive_metadata.getroot())
elif isinstance(metadata_path, str):
try:
descriptive_metadata = xml.etree.ElementTree.fromstring(metadata_path)
metadata = SubElement(xip, 'Metadata', {'schemaUri': metadata_ns})
metadata_ref = SubElement(metadata, 'Ref')
metadata_ref.text = str(uuid.uuid4())
entity = SubElement(metadata, 'Entity')
entity.text = io_ref
content = SubElement(metadata, 'Content')
content.append(descriptive_metadata)
except RuntimeError:
logging.info(f"Could not parse asset metadata in namespace {metadata_ns}")
if xip is not None:
export_folder = export_folder
top_level_folder = os.path.join(export_folder, io_ref)
os.mkdir(top_level_folder)
inner_folder = os.path.join(top_level_folder, io_ref)
os.mkdir(inner_folder)
content_folder = os.path.join(inner_folder, CONTENT_FOLDER)
os.mkdir(content_folder)
metadata_path = os.path.join(inner_folder, "metadata.xml")
metadata = open(metadata_path, "wt", encoding='utf-8')
metadata.write(prettify(xip))
metadata.close()
for representation_name in preservation_representation_refs_dict.keys():
location = sanitize(representation_name)
Path(os.path.join(content_folder, location)).mkdir(parents=True, exist_ok=True)
preservation_refs_dict = preservation_representation_refs_dict[representation_name]
for content_ref, filename in preservation_refs_dict.items():
src_file = filename
dst_file = os.path.join(os.path.join(content_folder, location), os.path.basename(filename))
shutil.copyfile(src_file, dst_file)
for representation_name in access_representation_refs_dict.keys():
location = sanitize(representation_name)
Path(os.path.join(content_folder, location)).mkdir(parents=True, exist_ok=True)
access_refs_dict = access_representation_refs_dict[representation_name]
for content_ref, filename in access_refs_dict.items():
src_file = filename
dst_file = os.path.join(os.path.join(content_folder, location), os.path.basename(filename))
shutil.copyfile(src_file, dst_file)
if compress:
shutil.make_archive(top_level_folder, 'zip', top_level_folder)
else:
shutil.make_archive(top_level_folder, 'szip', top_level_folder)
shutil.rmtree(top_level_folder)
return top_level_folder + ".zip"
def multi_asset_package(asset_file_list=None, export_folder=None, parent_folder=None, compress=True, **kwargs):
"""
Create a package containing multiple assets, all the assets are ingested into the same parent folder provided
by the parent_folder argument.
:param asset_file_list: List of files. One asset per file
:param export_folder: Location where the package is written to
:param parent_folder: The folder the assets will be ingested into
:param compress: Bool, compress the package
:param kwargs:
:return:
"""
# some basic validation
if export_folder is None:
export_folder = tempfile.gettempdir()
if not os.path.isdir(export_folder):
logger.error("Export Folder Does Not Exist")
raise RuntimeError(export_folder, "Export Folder Does Not Exist")
if parent_folder is None:
logger.error("You must specify a parent folder for the package asset")
raise RuntimeError("You must specify a parent folder for the package asset")
security_tag = kwargs.get('SecurityTag', "open")
content_type = kwargs.get('CustomType', "")
if not compress:
shutil.register_archive_format("szip", _make_stored_zipfile, None, "UnCompressed ZIP file")
if 'Preservation_files_fixity_callback' in kwargs:
fixity_callback = kwargs.get('Preservation_files_fixity_callback')
else:
fixity_callback = Sha1FixityCallBack()
package_id = str(uuid.uuid4())
export_folder = export_folder
top_level_folder = os.path.join(export_folder, package_id)
os.mkdir(top_level_folder)
inner_folder = os.path.join(top_level_folder, package_id)
os.mkdir(inner_folder)
os.mkdir(os.path.join(inner_folder, CONTENT_FOLDER))
asset_map = dict()
xip = Element('XIP')
for file in asset_file_list:
default_asset_title = os.path.splitext(os.path.basename(file))[0]
xip, io_ref = __create_io__(xip, file_name=default_asset_title, parent_folder=parent_folder, **kwargs)
asset_map[file] = io_ref
representation = SubElement(xip, 'Representation')
io_link = SubElement(representation, 'InformationObject')
io_link.text = io_ref
access_name = SubElement(representation, 'Name')
access_name.text = "Preservation"
access_type = SubElement(representation, 'Type')
access_type.text = "Preservation"
content_objects = SubElement(representation, 'ContentObjects')
content_object = SubElement(content_objects, 'ContentObject')
content_object_ref = str(uuid.uuid4())
content_object.text = content_object_ref
default_content_objects_title = os.path.splitext(os.path.basename(file))[0]
content_object = SubElement(xip, 'ContentObject')
ref_element = SubElement(content_object, "Ref")
ref_element.text = content_object_ref
title = SubElement(content_object, "Title")
title.text = default_content_objects_title
description = SubElement(content_object, "Description")
description.text = default_content_objects_title
security_tag_element = SubElement(content_object, "SecurityTag")
security_tag_element.text = security_tag
custom_type = SubElement(content_object, "CustomType")
custom_type.text = content_type
parent = SubElement(content_object, "Parent")
parent.text = io_ref
generation = SubElement(xip, 'Generation', {"original": "true", "active": "true"})
content_object = SubElement(generation, "ContentObject")
content_object.text = content_object_ref
label = SubElement(generation, "Label")
label.text = os.path.splitext(os.path.basename(file))[0]
effective_date = SubElement(generation, "EffectiveDate")
effective_date.text = datetime.now().isoformat()
bitstreams = SubElement(generation, "Bitstreams")
bitstream = SubElement(bitstreams, "Bitstream")
bitstream.text = os.path.basename(file)
SubElement(generation, "Formats")
SubElement(generation, "Properties")
bitstream = SubElement(xip, 'Bitstream')
filename_element = SubElement(bitstream, "Filename")
filename_element.text = os.path.basename(file)
filesize = SubElement(bitstream, "FileSize")
file_stats = os.stat(file)
filesize.text = str(file_stats.st_size)
physical_location = SubElement(bitstream, "PhysicalLocation")
fixities = SubElement(bitstream, "Fixities")
fixity_result = fixity_callback(filename_element.text, file)
if type(fixity_result) == tuple:
fixity = SubElement(fixities, "Fixity")
fixity_algorithm_ref = SubElement(fixity, "FixityAlgorithmRef")
fixity_value = SubElement(fixity, "FixityValue")
fixity_algorithm_ref.text = fixity_result[0]
fixity_value.text = fixity_result[1]
elif type(fixity_result) == dict:
for key, val in fixity_result.items():
fixity = SubElement(fixities, "Fixity")
fixity_algorithm_ref = SubElement(fixity, "FixityAlgorithmRef")
fixity_value = SubElement(fixity, "FixityValue")
fixity_algorithm_ref.text = key
fixity_value.text = val
else:
logger.error("Could Not Find Fixity Value")
raise RuntimeError("Could Not Find Fixity Value")
if 'Identifiers' in kwargs:
identifier_map = kwargs.get('Identifiers')
if str(file) in identifier_map:
identifier_map_values = identifier_map[str(file)]
for identifier_key, identifier_value in identifier_map_values.items():
if identifier_key:
if identifier_value:
identifier = SubElement(xip, 'Identifier')
id_type = SubElement(identifier, "Type")
id_type.text = identifier_key
id_value = SubElement(identifier, "Value")
id_value.text = identifier_value
id_io = SubElement(identifier, "Entity")
id_io.text = io_ref
src_file = file
dst_file = os.path.join(os.path.join(inner_folder, CONTENT_FOLDER), os.path.basename(file))
shutil.copyfile(src_file, dst_file)
if xip is not None:
metadata_path = os.path.join(inner_folder, "metadata.xml")
metadata = open(metadata_path, "wt", encoding='utf-8')
metadata.write(prettify(xip))
metadata.close()
if compress:
shutil.make_archive(top_level_folder, 'zip', top_level_folder)
else:
shutil.make_archive(top_level_folder, 'szip', top_level_folder)
shutil.rmtree(top_level_folder)
return top_level_folder + ".zip"
[docs]
def complex_asset_package(preservation_files_list=None, access_files_list=None, export_folder=None, parent_folder=None, compress=True,
**kwargs):
"""
Create a Preservica package containing a single Asset from a multiple preservation files
and optional access files.
The Asset contains multiple Content Objects within each representation.
If only the preservation files are provided the asset has one representation
:param list preservation_files_list: Paths to the preservation files
:param list access_files_list: Paths to the access files
:param str export_folder: The package location folder
:param Folder parent_folder: The folder to ingest the asset into
:param bool compress: Compress the ZIP file
:param str Title: Asset Title
:param str Description: Asset Description
:param str SecurityTag: Asset SecurityTag
:param str CustomType: Asset CustomType
:param str Preservation_Content_Title: Title of the Preservation Representation Content Object
:param str Preservation_Content_Description: Description of the Preservation Representation Content Object
:param str Access_Content_Title: Title of the Access Representation Content Object
:param str Access_Content_Description: Description of the Access Representation Content Object
:param dict Asset_Metadata: Dictionary of Asset metadata documents
:param dict Identifiers: Dictionary of Asset rd party identifiers
optional kwargs map
'Title' Asset Title
'Description' Asset Description
'SecurityTag' Asset Security Tag
'CustomType' Asset Type
'Preservation_Content_Title' Content Object Title of the Preservation Object
'Preservation_Content_Description' Content Object Description of the Preservation Object
'Access_Content_Title' Content Object Title of the Access Object
'Access_Content_Description' Content Object Description of the Access Object
'Preservation_Generation_Label' Generation Label for the Preservation Object
'Access_Generation_Label' Generation Label for the Access Object
'Asset_Metadata' Map of metadata schema/documents to add to asset
'Identifiers' Map of asset identifiers
'Preservation_files_fixity_callback' Callback to allow external generated fixity values
'Access_files_fixity_callback' Callback to allow external generated fixity values
'IO_Identifier_callback' Callback to allow external generated Asset identifier
'Preservation_Representation_Name' Name of the Preservation Representation
'Access_Representation_Name' Name of the Access Representation
"""
# some basic validation
if export_folder is None:
export_folder = tempfile.gettempdir()
if not os.path.isdir(export_folder):
logger.error("Export Folder Does Not Exist")
raise RuntimeError(export_folder, "Export Folder Does Not Exist")
if parent_folder is None:
logger.error("You must specify a parent folder for the package asset")
raise RuntimeError("You must specify a parent folder for the package asset")
io_ref = None
xip = None
default_asset_title = None
preservation_refs_dict = {}
access_refs_dict = {}
security_tag = kwargs.get('SecurityTag', "open")
content_type = kwargs.get('CustomType', "")
if not compress:
shutil.register_archive_format("szip", _make_stored_zipfile, None, "UnCompressed ZIP file")
has_preservation_files = bool((preservation_files_list is not None) and (len(preservation_files_list) > 0))
has_access_files = bool((access_files_list is not None) and (len(access_files_list) > 0))
if has_preservation_files:
if default_asset_title is None:
default_asset_title = os.path.splitext(os.path.basename(preservation_files_list[0]))[0]
# create the asset
xip, io_ref = __create_io__(file_name=default_asset_title, parent_folder=parent_folder, **kwargs)
if has_access_files:
if default_asset_title is None:
default_asset_title = os.path.splitext(os.path.basename(access_files_list[0]))[0]
if io_ref is None:
xip, io_ref = __create_io__(file_name=default_asset_title, parent_folder=parent_folder, **kwargs)
if has_preservation_files:
# add the content objects
representation_name = kwargs.get('Preservation_Representation_Name', "Preservation")
preservation_refs_dict = __make_representation_multiple_co__(xip, rep_name=representation_name, rep_type="Preservation",
rep_files=preservation_files_list, io_ref=io_ref)
if has_access_files:
# add the content objects
access_name = kwargs.get('Access_Representation_Name', "Access")
access_refs_dict = __make_representation_multiple_co__(xip, rep_name=access_name, rep_type="Access", rep_files=access_files_list,
io_ref=io_ref)
if has_preservation_files:
for content_ref, filename in preservation_refs_dict.items():
default_content_objects_title = os.path.splitext(os.path.basename(filename))[0]
preservation_content_title = kwargs.get('Preservation_Content_Title', default_content_objects_title)
preservation_content_description = kwargs.get('Preservation_Content_Description', default_content_objects_title)
if isinstance(preservation_content_title, dict):
preservation_content_title = preservation_content_title[filename]
if isinstance(preservation_content_description, dict):
preservation_content_description = preservation_content_description[filename]
__make_content_objects__(xip, preservation_content_title, content_ref, io_ref, security_tag, preservation_content_description,
content_type)
if has_access_files:
for content_ref, filename in access_refs_dict.items():
default_content_objects_title = os.path.splitext(os.path.basename(filename))[0]
access_content_title = kwargs.get('Access_Content_Title', default_content_objects_title)
access_content_description = kwargs.get('Access_Content_Description', default_content_objects_title)
if isinstance(access_content_title, dict):
access_content_title = access_content_title[filename]
if isinstance(access_content_description, dict):
access_content_title = access_content_title[filename]
__make_content_objects__(xip, access_content_title, content_ref, io_ref, security_tag, access_content_description, content_type)
if has_preservation_files:
preservation_generation_label = kwargs.get('Preservation_Generation_Label', "")
for content_ref, filename in preservation_refs_dict.items():
preservation_file_name = os.path.basename(filename)
__make_generation__(xip, preservation_file_name, content_ref, preservation_generation_label, PRESERVATION_CONTENT_FOLDER)
if has_access_files:
access_generation_label = kwargs.get('Access_Generation_Label', "")
for content_ref, filename in access_refs_dict.items():
access_file_name = os.path.basename(filename)
__make_generation__(xip, access_file_name, content_ref, access_generation_label, ACCESS_CONTENT_FOLDER)
if has_preservation_files:
if 'Preservation_files_fixity_callback' in kwargs:
callback = kwargs.get('Preservation_files_fixity_callback')
else:
callback = Sha1FixityCallBack()
for content_ref, filename in preservation_refs_dict.items():
preservation_file_name = os.path.basename(filename)
__make_bitstream__(xip, preservation_file_name, filename, callback, PRESERVATION_CONTENT_FOLDER)
if has_access_files:
if 'Access_files_fixity_callback' in kwargs:
callback = kwargs.get('Access_files_fixity_callback')
else:
callback = Sha1FixityCallBack()
for content_ref, filename in access_refs_dict.items():
access_file_name = os.path.basename(filename)
__make_bitstream__(xip, access_file_name, filename, callback, ACCESS_CONTENT_FOLDER)
if 'Identifiers' in kwargs:
identifier_map = kwargs.get('Identifiers')
for identifier_key, identifier_value in identifier_map.items():
if identifier_key:
if identifier_value:
identifier = SubElement(xip, 'Identifier')
id_type = SubElement(identifier, "Type")
id_type.text = identifier_key
id_value = SubElement(identifier, "Value")
id_value.text = identifier_value
id_io = SubElement(identifier, "Entity")
id_io.text = io_ref
if 'Asset_Metadata' in kwargs:
metadata_map = kwargs.get('Asset_Metadata')
for metadata_ns, metadata_path in metadata_map.items():
if metadata_ns:
if metadata_path and isinstance(metadata_path, str):
if os.path.exists(metadata_path) and os.path.isfile(metadata_path):
descriptive_metadata = xml.etree.ElementTree.parse(source=metadata_path)
metadata = SubElement(xip, 'Metadata', {'schemaUri': metadata_ns})
metadata_ref = SubElement(metadata, 'Ref')
metadata_ref.text = str(uuid.uuid4())
entity = SubElement(metadata, 'Entity')
entity.text = io_ref
content = SubElement(metadata, 'Content')
content.append(descriptive_metadata.getroot())
elif isinstance(metadata_path, str):
try:
descriptive_metadata = xml.etree.ElementTree.fromstring(metadata_path)
metadata = SubElement(xip, 'Metadata', {'schemaUri': metadata_ns})
metadata_ref = SubElement(metadata, 'Ref')
metadata_ref.text = str(uuid.uuid4())
entity = SubElement(metadata, 'Entity')
entity.text = io_ref
content = SubElement(metadata, 'Content')
content.append(descriptive_metadata)
except RuntimeError:
logging.info(f"Could not parse asset metadata in namespace {metadata_ns}")
if metadata_path and isinstance(metadata_path, list):
for path in metadata_path:
if os.path.exists(path) and os.path.isfile(path):
descriptive_metadata = xml.etree.ElementTree.parse(source=path)
metadata = SubElement(xip, 'Metadata', {'schemaUri': metadata_ns})
metadata_ref = SubElement(metadata, 'Ref')
metadata_ref.text = str(uuid.uuid4())
entity = SubElement(metadata, 'Entity')
entity.text = io_ref
content = SubElement(metadata, 'Content')
content.append(descriptive_metadata.getroot())
if xip is not None:
export_folder = export_folder
top_level_folder = os.path.join(export_folder, io_ref)
os.mkdir(top_level_folder)
inner_folder = os.path.join(top_level_folder, io_ref)
os.mkdir(inner_folder)
content_folder = os.path.join(inner_folder, CONTENT_FOLDER)
os.mkdir(content_folder)
preservation_content_folder = os.path.join(content_folder, PRESERVATION_CONTENT_FOLDER)
os.mkdir(preservation_content_folder)
access_content_folder = os.path.join(content_folder, ACCESS_CONTENT_FOLDER)
os.mkdir(access_content_folder)
metadata_path = os.path.join(inner_folder, "metadata.xml")
metadata = open(metadata_path, "wt", encoding='utf-8')
metadata.write(prettify(xip))
metadata.close()
for content_ref, filename in preservation_refs_dict.items():
src_file = filename
dst_file = os.path.join(preservation_content_folder, os.path.basename(filename))
shutil.copyfile(src_file, dst_file)
for content_ref, filename in access_refs_dict.items():
src_file = filename
dst_file = os.path.join(access_content_folder, os.path.basename(filename))
shutil.copyfile(src_file, dst_file)
if compress:
shutil.make_archive(top_level_folder, 'zip', top_level_folder)
else:
shutil.make_archive(top_level_folder, 'szip', top_level_folder)
shutil.rmtree(top_level_folder)
return top_level_folder + ".zip"
[docs]
def simple_asset_package(preservation_file=None, access_file=None, export_folder=None, parent_folder=None, compress=True, **kwargs):
"""
Create a Preservica package containing a single Asset from a single preservation file
and an optional access file.
The Asset contains one Content Object for each representation.
If only the preservation file is provided the asset has one representation
:param str preservation_file: Path to the preservation file
:param str access_file: Path to the access file
:param str export_folder: The package location folder
:param Folder parent_folder: The folder to ingest the asset into
:param bool compress: Compress the ZIP file
:param str Title: Asset Title
:param str Description: Asset Description
:param str SecurityTag: Asset SecurityTag
:param str CustomType: Asset CustomType
:param str Preservation_Content_Title: Title of the Preservation Representation Content Object
:param str Preservation_Content_Description: Description of the Preservation Representation Content Object
:param str Access_Content_Title: Title of the Access Representation Content Object
:param str Access_Content_Description: Description of the Access Representation Content Object
:param dict Asset_Metadata: Dictionary of Asset metadata documents
:param dict Identifiers: Dictionary of Asset rd party identifiers
"""
# some basic validation
if export_folder is None:
export_folder = tempfile.gettempdir()
if not os.path.isdir(export_folder):
logger.error("Export Folder Does Not Exist")
raise RuntimeError(export_folder, "Export Folder Does Not Exist")
if parent_folder is None:
logger.error("You must specify a parent folder for the package asset")
raise RuntimeError("You must specify a parent folder for the package asset")
preservation_file_list = list()
access_file_list = list()
if preservation_file is not None:
preservation_file_list.append(preservation_file)
if access_file is not None:
access_file_list.append(access_file)
return complex_asset_package(preservation_files_list=preservation_file_list, access_files_list=access_file_list,
export_folder=export_folder, parent_folder=parent_folder, compress=compress, **kwargs)
def upload_config():
return transfer_config
def _unpad(s):
return s[:-ord(s[len(s) - 1:])]
[docs]
class UploadAPI(AuthenticatedAPI):
[docs]
def ingest_web_video(self, url=None, parent_folder=None, **kwargs):
"""
Ingest a web video such as YouTube etc based on the URL
:param str url: URL to the youtube video
:param Folder parent_folder: The folder to ingest the video into
:param str Title: Optional asset title
:param str Description: Optional asset description
:param str SecurityTag: Optional asset security tag
:param dict Identifiers: Optional asset 3rd party identifiers
:param dict Asset_Metadata: Optional asset additional descriptive metadata
:param callback callback: Optional upload progress callback
:raises RuntimeError:
"""
try:
import youtube_dl
except ImportError:
logger.error("Package youtube_dl is required for this method. pip install --upgrade youtube-dl")
raise RuntimeError("Package youtube_dl is required for this method. pip install --upgrade youtube-dl")
ydl_opts = {}
def my_hook(d):
if d['status'] == 'finished':
logger.info('Download Complete. Uploading to Preservica ...')
ydl_opts = {'outtmpl': '%(id)s.mp4', 'progress_hooks': [my_hook], }
# if True:
# ydl_opts['writesubtitles'] = True
# ydl_opts['writeautomaticsub'] = True
# ydl_opts['subtitleslangs'] = ['en']
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
meta = ydl.extract_info(url, download=True)
vid_id = meta.get('id')
if 'Title' in kwargs:
title = kwargs.get("Title")
else:
title = meta.get('title')
if 'Description' in kwargs:
description = kwargs.get("Description")
else:
description = meta.get('description')
if 'SecurityTag' in kwargs:
security_tag = kwargs.get("SecurityTag")
else:
security_tag = "open"
if 'Identifiers' in kwargs:
identifier_map = kwargs.get('Identifiers')
identifier_map["Video-ID"] = vid_id
else:
identifier_map = {"Video-ID": vid_id}
if 'Asset_Metadata' in kwargs:
descriptive_metadata = kwargs.get('Asset_Metadata')
else:
descriptive_metadata = {}
if 'callback' in kwargs:
callback = kwargs.get("callback")
else:
callback = None
upload_date = meta.get('upload_date')
duration = meta.get('duration')
package = simple_asset_package(preservation_file=f"{vid_id}.mp4", parent_folder=parent_folder, Title=title,
Description=description, Identifiers=identifier_map, Asset_Metadata=descriptive_metadata,
Preservation_Content_Title=title, SecurityTag=security_tag)
self.upload_zip_package(path_to_zip_package=package, folder=parent_folder, callback=callback)
[docs]
def upload_credentials(self, location_id: str):
"""
Retrieves temporary upload credentials (Amazon STS, or Azure SAS) for this location.
:return: dict
"""
headers = {HEADER_TOKEN: self.token}
endpoint = f"/upload/{location_id}/upload-credentials"
request = self.session.get(f'https://{self.server}/api/location{endpoint}', headers=headers)
if request.status_code == requests.codes.ok:
json_response = str(request.content.decode('utf-8'))
return json.loads(json_response)
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.upload_credentials(location_id)
else:
exception = HTTPException(location_id, request.status_code, request.url, "upload_credentials", request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def upload_locations(self):
"""
Upload locations are configured on the Sources page as 'SIP Upload'.
:return: dict
"""
headers = {HEADER_TOKEN: self.token}
endpoint = "/api/location/upload"
request = self.session.get(f'https://{self.server}{endpoint}', headers=headers)
if request.status_code == requests.codes.ok:
json_response = str(request.content.decode('utf-8'))
return json.loads(json_response)['locations']
elif request.status_code == requests.codes.unauthorized:
self.token = self.__token__()
return self.upload_locations()
else:
exception = HTTPException("", request.status_code, request.url, "upload_locations", request.content.decode('utf-8'))
logger.error(exception)
raise exception
[docs]
def upload_buckets(self):
"""
Get a list of available upload buckets
:return: dict of bucket names and regions
"""
return self.upload_locations()
def crawl_filesystem(self, filesystem_path, bucket_name, preservica_parent, callback: bool = False, security_tag: str = "open",
delete_after_upload: bool = True, max_MB_ingested: int = -1):
def get_parent(client, identifier, parent_reference):
id = str(os.path.dirname(identifier))
if not id:
id = identifier
entities = client.identifier("code", id)
if len(entities) > 0:
folder = entities.pop()
folder = client.folder(folder.reference)
return folder.reference
else:
return parent_reference
def get_folder(client, name, tag, parent_reference, identifier):
entities = client.identifier("code", identifier)
if len(entities) == 0:
logger.info(f"Creating new folder with name {name}")
folder = client.create_folder(name, name, tag, parent_reference)
client.add_identifier(folder, "code", identifier)
else:
logger.info(f"Found existing folder with name {name}")
folder = entities.pop()
return folder
from pyPreservica import EntityAPI
entity_client = EntityAPI(username=self.username, password=self.password, server=self.server, tenant=self.tenant,
two_fa_secret_key=self.two_fa_secret_key, use_shared_secret=self.shared_secret, protocol=self.protocol)
if preservica_parent:
parent = entity_client.folder(preservica_parent)
logger.info(f"Folders will be created inside Preservica collection {parent.title}")
parent_ref = parent.reference
else:
parent_ref = None
bytes_ingested = 0
folder_path = os.path.normpath(filesystem_path)
for dirname, subdirs, files in os.walk(folder_path):
base = os.path.basename(dirname)
code = os.path.relpath(dirname, Path(folder_path).parent)
p = get_parent(entity_client, code, parent_ref)
f = get_folder(entity_client, base, security_tag, p, code)
identifiers = dict()
for file in list(files):
full_path = os.path.join(dirname, file)
if os.path.islink(full_path):
logger.info(f"Skipping link {file}")
files.remove(file)
continue
asset_code = os.path.join(code, file)
if len(entity_client.identifier("code", asset_code)) == 0:
bytes_ingested = bytes_ingested + os.stat(full_path).st_size
logger.info(f"Adding new file: {file} to package ready for upload")
file_identifiers = {"code": asset_code}
identifiers[full_path] = file_identifiers
else:
logger.info(f"Skipping file {file} already exists in repository")
files.remove(file)
if len(files) > 0:
full_path_list = [os.path.join(dirname, file) for file in files]
package = multi_asset_package(asset_file_list=full_path_list, parent_folder=f, SecurityTag=security_tag,
Identifiers=identifiers)
if callback:
progress_display = UploadProgressConsoleCallback(package)
else:
progress_display = None
self.upload_zip_package_to_S3(path_to_zip_package=package, bucket_name=bucket_name, callback=progress_display,
delete_after_upload=delete_after_upload)
logger.info(f"Uploaded " + "{:.1f}".format(bytes_ingested / (1024 * 1024)) + " MB")
if max_MB_ingested > 0:
if bytes_ingested > (1024 * 1024 * max_MB_ingested):
logger.info(f"Reached Max Upload Limit")
break
[docs]
def upload_zip_to_Source(self, path_to_zip_package, container_name, folder=None, delete_after_upload=False, show_progress=False):
"""
Uploads a zip file package to either an Azure container or S3 bucket
depending on the Preservica system deployment
:param str path_to_zip_package: Path to the package
:param str container_name: container connected to the ingest workflow
:param Folder folder: The folder to ingest the package into
:param bool delete_after_upload: Delete the local copy of the package after the upload has completed
:param bool show_progress: Show upload progress bar
"""
locations = self.upload_locations()
for location in locations:
if location['containerName'] == container_name:
if location['type'] == 'AWS':
callback = None
if show_progress:
callback = UploadProgressConsoleCallback(path_to_zip_package)
self.upload_zip_package_to_S3(path_to_zip_package=path_to_zip_package, bucket_name=container_name, folder=folder,
callback=callback, delete_after_upload=delete_after_upload)
else:
self.upload_zip_package_to_Azure(path_to_zip_package=path_to_zip_package, container_name=container_name, folder=folder,
delete_after_upload=delete_after_upload, show_progress=show_progress)
[docs]
def upload_zip_package_to_Azure(self, path_to_zip_package, container_name, folder=None, delete_after_upload=False, show_progress=False):
"""
Uploads a zip file package to an Azure container connected to a Preservica Cloud System
:param str path_to_zip_package: Path to the package
:param str container_name: container connected to the ingest workflow
:param Folder folder: The folder to ingest the package into
:param bool delete_after_upload: Delete the local copy of the package after the upload has completed
"""
if (self.major_version < 7) and (self.minor_version < 5):
raise RuntimeError("This call [upload_zip_package_to_Azure] is only available against v6.5 systems and above")
from azure.storage.blob import ContainerClient
locations = self.upload_locations()
for location in locations:
if location['containerName'] == container_name:
credentials = self.upload_credentials(location['apiId'])
account_key = credentials['key']
session_token = credentials['sessionToken']
sas_url = f"https://{account_key}.blob.core.windows.net/{container_name}"
container = ContainerClient.from_container_url(container_url=sas_url, credential=session_token)
upload_key = str(uuid.uuid4())
metadata = {'key': upload_key, 'name': upload_key + ".zip", 'bucket': container_name, 'status': 'ready'}
if hasattr(folder, "reference"):
metadata['collectionreference'] = folder.reference
elif isinstance(folder, str):
metadata['collectionreference'] = folder
properties = None
len_bytes = Path(path_to_zip_package).stat().st_size
if show_progress:
with tqdm.wrapattr(open(path_to_zip_package, 'rb'), "read", total=len_bytes) as data:
blob_client = container.upload_blob(name=upload_key, data=data, metadata=metadata, length=len_bytes)
properties = blob_client.get_blob_properties()
else:
with open(path_to_zip_package, "rb") as data:
blob_client = container.upload_blob(name=upload_key, data=data, metadata=metadata, length=len_bytes)
properties = blob_client.get_blob_properties()
if delete_after_upload:
os.remove(path_to_zip_package)
return properties
[docs]
def upload_zip_package_to_S3(self, path_to_zip_package, bucket_name, folder=None, callback=None, delete_after_upload=False):
"""
Uploads a zip file package to an S3 bucket connected to a Preservica Cloud System
:param str path_to_zip_package: Path to the package
:param str bucket_name: Bucket connected to an ingest workflow
:param Folder folder: The folder to ingest the package into
:param Callable callback: Optional callback to allow the callee to monitor the upload progress
:param bool delete_after_upload: Delete the local copy of the package after the upload has completed
"""
if (self.major_version < 7) and (self.minor_version < 5):
raise RuntimeError("This call [upload_zip_package_to_S3] is only available against v6.5 systems and above")
logger.debug("Finding Upload Locations")
self.token = self.__token__()
locations = self.upload_locations()
for location in locations:
if location['containerName'] == bucket_name:
logger.debug(f"Found Upload Location {location['containerName']}")
logger.debug(f"Fetching Upload Credentials for {location['containerName']}")
credentials = self.upload_credentials(location['apiId'])
access_key = credentials['key']
secret_key = credentials['secret']
session_token = credentials['sessionToken']
endpoint = credentials['endpoint']
session = boto3.Session(aws_access_key_id=access_key, aws_secret_access_key=secret_key, aws_session_token=session_token)
s3 = session.resource(service_name="s3")
logger.debug(f"S3 Session: {s3}")
upload_key = str(uuid.uuid4())
s3_object = s3.Object(bucket_name, upload_key)
metadata = {'key': upload_key, 'name': upload_key + ".zip", 'bucket': bucket_name, 'status': 'ready'}
if hasattr(folder, "reference"):
metadata['collectionreference'] = folder.reference
elif isinstance(folder, str):
metadata['collectionreference'] = folder
metadata['size'] = str(Path(path_to_zip_package).stat().st_size)
metadata['createdby'] = self.username
metadata_map = {'Metadata': metadata}
s3_object.upload_file(path_to_zip_package, Callback=callback, ExtraArgs=metadata_map, Config=transfer_config)
if delete_after_upload:
os.remove(path_to_zip_package)
[docs]
def upload_zip_package(self, path_to_zip_package, folder=None, callback=None, delete_after_upload=False):
"""
Uploads a zip file package directly to Preservica and starts an ingest workflow
:param str path_to_zip_package: Path to the package
:param Folder folder: The folder to ingest the package into
:param Callable callback: Optional callback to allow the callee to monitor the upload progress
:param bool delete_after_upload: Delete the local copy of the package after the upload has completed
:return: preservica-progress-token to allow the workflow progress to be monitored
:rtype: str
:raises RuntimeError:
"""
bucket = f'{self.tenant.lower()}.package.upload'
endpoint = f'{self.protocol}://{self.server}/api/s3/buckets'
self.token = self.__token__()
s3_client = boto3.client('s3', endpoint_url=endpoint, aws_access_key_id=self.token, aws_secret_access_key="NOT_USED",
config=Config(s3={'addressing_style': 'path'}))
metadata = {}
if folder is not None:
if hasattr(folder, "reference"):
metadata = {'Metadata': {'structuralobjectreference': folder.reference}}
elif isinstance(folder, str):
metadata = {'Metadata': {'structuralobjectreference': folder}}
if os.path.exists(path_to_zip_package) and os.path.isfile(path_to_zip_package):
try:
key_id = str(uuid.uuid4()) + ".zip"
transfer = S3Transfer(client=s3_client, config=transfer_config)
transfer.PutObjectTask = PutObjectTask
transfer.CompleteMultipartUploadTask = CompleteMultipartUploadTask
transfer.upload_file = upload_file
response = transfer.upload_file(self=transfer, filename=path_to_zip_package, bucket=bucket, key=key_id, extra_args=metadata,
callback=callback)
if delete_after_upload:
os.remove(path_to_zip_package)
return response['ResponseMetadata']['HTTPHeaders']['preservica-progress-token']
except ClientError as e:
logger.error(e)
raise e