Module rugged.tuf.repo
Expand source code
from collections import OrderedDict
from datetime import datetime, timedelta
from glob import glob
from os import chdir, listdir, makedirs, path, remove, rmdir
from shutil import move
from securesystemslib.exceptions import StorageError
from securesystemslib.interface import (
import_ed25519_publickey_from_file,
import_ed25519_privatekey_from_file,
)
from securesystemslib.signer import SSlibSigner
from typing import Any, Dict
from tuf.api.metadata import (
SPECIFICATION_VERSION,
Key,
Metadata,
MetaFile,
Root,
Role,
Snapshot,
TargetFile,
Targets,
Timestamp,
)
from tuf.api.serialization.json import JSONSerializer
from rugged.exceptions.metadata_error import RuggedMetadataError
from rugged.exceptions.repository_error import RuggedRepositoryError
from rugged.exceptions.storage_error import RuggedStorageError
from rugged.lib.config import get_config
from rugged.lib.constants import (
RUGGED_SIGNING_KEY_DIR,
RUGGED_VERIFICATION_KEY_DIR,
)
from rugged.lib.logger import get_logger, log_exception
config = get_config()
log = get_logger()
SPEC_VERSION = ".".join(SPECIFICATION_VERSION)
def _in(days: float) -> datetime:
"""Adds 'days' to now and returns datetime object w/o microseconds."""
return datetime.utcnow().replace(microsecond=0) + timedelta(days=days)
class RuggedRepository():
"""
An opinionated TUF repository using the low-level TUF Metadata API.
@TODO: Implement key rotation. (#35)
@TODO: Make signature thresholds configurable per key. (#97)
@TODO: Make expiration configurable per role. (#98)
@TODO: Implement support for hashed bins. (#99)
@TODO: Implement support for consistent snapshots. (#100)
@TODO: Implement support for delegated targets. (#36)
"""
def __init__(self):
try:
self._init_dirs()
self._init_keys()
self._init_roles()
log.debug("Instantiated repository.")
except Exception as e:
log_exception(e)
log.error("Failed to instantiate repository.")
raise RuggedRepositoryError()
def load(self):
""" Load all metadata from storage. """
for name in self.roles.keys():
try:
self.load_metadata(name)
except RuggedMetadataError:
error = "Failed to load all metadata."
log.error(error)
raise RuggedMetadataError(error)
log.debug("Loaded all metadata.")
def load_metadata(self, name: str):
""" load a role's metadata from storage. """
metadata_file = self._get_metadata_path(name)
try:
self.roles[name] = self.roles[name].from_file(metadata_file)
message = f"Loaded metadata for '{ name }' role from "\
f"'{ metadata_file }'."
log.debug(message)
except TypeError as e:
log_exception(e)
error = f"Failed to load metadata for '{ name }' role from "\
f"'{ metadata_file }'."
raise RuggedMetadataError(error)
def _get_metadata_path(self, name):
""" Determine the path for a given role's metadata file. """
metadata_dir = config['repo_metadata_path'].get()
filename = self._get_metadata_filename(name)
return path.join(metadata_dir, filename)
def _get_metadata_filename(self, name):
""" Determine the filename for a given role's metadata. """
filename = f"{self.roles[name].signed.type}.json"
if name == "timestamp":
# Timestamp metadata doesn't use consistent snapshots.
return filename
if config['consistent_snapshot'].get():
filename = f"{self.roles[name].signed.version}.{ filename }"
return filename
def write(self):
""" Write all metadata to storage. """
for name in self.roles.keys():
result = self.write_metadata(name)
if not result:
return False
return True
def write_metadata(self, role_name: str):
""" Write a role's signed metadata to storage. """
PRETTY = JSONSerializer(compact=False)
path = self._get_metadata_path(role_name)
try:
self._sign_metadata(role_name)
self.roles[role_name].to_file(path, serializer=PRETTY)
except Exception as e:
log_exception(e)
error = f"Failed to write '{ role_name }' metadata to file '{ path }'."
log.error(error)
return False
log.debug(f"Wrote '{ role_name }' metadata to file '{ path }'.")
return True
def _sign_metadata(self, role_name: str):
""" Sign a role's metadata. """
role_info = config['roles'].get()[role_name]
for key_name in role_info['keys']:
key = self.keys[key_name]
try:
signer = SSlibSigner(key)
self.roles[role_name].sign(signer, append=True)
except Exception as e:
log_exception(e)
log.error(f"Failed to sign '{ role_name }' metadata with '{ key_name }' key.")
log.debug(f"Signed '{ role_name }' metadata with '{ key_name }' key.")
def add_targets(self):
""" Add any inbound targets to the targets metadata. """
inbound_targets = self.get_inbound_targets()
added_targets = []
for inbound_target in inbound_targets:
try:
moved_target_path = self._move_inbound_target_to_targets_dir(
inbound_target
)
target_file_info = TargetFile.from_file(
inbound_target,
moved_target_path,
)
self.roles["targets"].signed.targets[inbound_target] = target_file_info # noqa: E501
message = f"Added target '{ inbound_target }' "\
"to the repository."
log.info(message)
added_targets.append(inbound_target)
except Exception as e:
log_exception(e)
warning = f"Failed to add target '{ inbound_target }' "\
"to the repository."
log.warning(warning)
if added_targets:
self.roles["targets"].signed.version += 1
return added_targets
def get_inbound_targets(self):
""" Scan the inbound directory for files to add to the repository. """
inbound_targets_dir = config['inbound_targets_path'].get()
message = "Scanning for inbound targets in: "\
f"{ inbound_targets_dir }"
log.debug(message)
chdir(inbound_targets_dir)
inbound_targets = []
for inbound_target in glob('**', recursive=True):
if path.isdir(inbound_target):
# We only want files, not intermediate directories.
continue
log.debug(f"Found target: { inbound_target }")
inbound_targets.append(inbound_target)
return inbound_targets
def _move_inbound_target_to_targets_dir(self, inbound_target):
""" Move an inbound target to the repo targets directory. """
inbound_targets_path = config['inbound_targets_path'].get()
inbound_target_path = path.join(inbound_targets_path, inbound_target)
moved_target_path = path.join(
config['repo_targets_path'].get(),
inbound_target,
)
try:
makedirs(path.dirname(moved_target_path), exist_ok=True)
move(inbound_target_path, moved_target_path)
message = f"Moved '{ inbound_target_path }' to "\
f"'{ moved_target_path }'"
log.debug(message)
except Exception as e:
log_exception(e)
warning = f"Failed to move target '{ inbound_target }' to the "\
"targets directory."
log.warning(warning)
self._delete_empty_target_dirs(inbound_targets_path, inbound_target)
message = f"Moved inbound target '{ inbound_target }' to targets "\
"directory."
log.info(message)
return moved_target_path
def _delete_empty_target_dirs(self, root_dir, target):
""" Delete any intermediate (empty) directories for a target path. """
if root_dir not in target:
target = path.join(root_dir, target)
target_dir = path.dirname(target)
if target_dir == root_dir:
return # This target is the root directory, so stop.
if listdir(target_dir):
return # We're only cleaning up empty directories.
try:
rmdir(target_dir)
log.debug(f"Cleaned up empty directory '{ target_dir }'.")
# Recurse until we hit the root directory.
self._delete_empty_target_dirs(root_dir, target_dir)
except OSError as e:
log_exception(e)
warning = f"Failed to clean up empty directory '{ target_dir }'."
log.warning(warning)
def remove_targets(self, targets):
""" Remove given targets from the targets metadata. """
removed_targets = []
for target in targets:
try:
del self.roles["targets"].signed.targets[target]
log.info(f"Removed target '{ target }' from the repository.")
self._delete_removed_target(target)
removed_targets.append(target)
except Exception as e:
log_exception(e)
warning = f"Failed to remove target '{ target }' from the "\
"repository."
log.warning(warning)
if removed_targets:
self.roles["targets"].signed.version += 1
return removed_targets
def _delete_removed_target(self, removed_target):
""" Delete the file for the target that we removed from the repo. """
repo_targets_path = config['repo_targets_path'].get()
target_file = path.join(repo_targets_path, removed_target)
try:
remove(target_file)
log.info(f"Deleted target file '{ target_file }'.")
except Exception as e:
log_exception(e)
log.warning(f"Failed to delete target file '{ target_file }'.")
self._delete_empty_target_dirs(repo_targets_path, removed_target)
def update_snapshot(self):
""" Update snapshot to account for changed targets metadata. """
self.roles["snapshot"].signed.meta["targets.json"].version = self.roles[ # noqa: E501
"targets"
].signed.version
self.roles["snapshot"].signed.version += 1
log.info("Updated snapshot metadata.")
def update_timestamp(self):
""" Update timestamp to account for changed snapshot metadata. """
self.roles["timestamp"].signed.snapshot_meta.version = self.roles[
"snapshot"
].signed.version
self.roles["timestamp"].signed.version += 1
log.info("Updated timestamp metadata.")
def status(self):
targets = self.roles['targets'].signed.targets
repo_status = {
'targets': {
'count': len(targets),
'size': sum(target.length for target in targets.values()),
},
'roles': {},
}
for role_name, role_info in self.roles.items():
repo_status['roles'][role_name] = {
'signatures': len(role_info.signatures),
'threshold': self.roles['root'].signed.roles[role_name].threshold,
'version': role_info.signed.version,
'tuf_spec': role_info.signed.spec_version,
'expires': role_info.signed.expires,
}
repo_status['roles'][role_name]['keys'] = {}
role_key_ids = self.roles['root'].signed.roles[role_name].keyids
for key_name, key_info in self.keys.items():
if key_info['keyid'] in role_key_ids:
key_types = key_info['keyval'].keys()
if 'private' in key_types:
key_path = self._get_signing_key_path(key_name, role_name)
else:
key_path = self._get_verification_key_path(key_name)
repo_status['roles'][role_name]['keys'][key_name] = {
'types': ", ".join(key_types),
'scheme': key_info['scheme'],
'key_path': key_path
}
return repo_status
def _init_dirs(self):
""" Ensure all repository directories exist. """
dirs = {
config['repo_metadata_path'].get(): 0o755,
config['repo_targets_path'].get(): 0o755,
}
for dir, mode in dirs.items():
try:
makedirs(dir, mode=mode, exist_ok=True)
except PermissionError as e:
log_exception(e)
raise RuggedStorageError
def _init_keys(self):
""" Initialize a dictionary of keys. """
self.keys: Dict[str, Dict[str, Any]] = {}
for role_name, role_info in config['roles'].get().items():
for key_name in role_info['keys']:
self.keys[key_name] = self._load_keys(key_name, role_name)
def _load_keys(self, key_name, role_name):
""" Load public keys and private keys (if available) from storage. """
signing_key = self._load_signing_key(key_name, role_name)
if signing_key:
return signing_key
return self._load_verification_key(key_name)
def _load_signing_key(self, key_name, role_name):
""" Load a signing key for a given role. """
key_path = self._get_signing_key_path(key_name, role_name)
try:
key = import_ed25519_privatekey_from_file(key_path)
log.debug(f"Loaded '{ key_name }' signing key at { key_path }.")
except StorageError:
# This is to be expected. So it's not really an error.
log.debug(f"Cannot load { key_name } signing key at { key_path }.")
return False
return key
def _get_signing_key_path(self, key_name, role_name):
""" Return the path of a signing key. """
return f"{ RUGGED_SIGNING_KEY_DIR }/{ role_name }/{ key_name }"
def _load_verification_key(self, key_name):
""" Load a verification key for a given role. """
key_path = self._get_verification_key_path(key_name)
try:
key = import_ed25519_publickey_from_file(key_path)
log.debug(f"Loaded '{ key_name }' verification key at { key_path }.")
except StorageError:
# This is to be expected. So it's not really an error.
message = f"Cannot load { key_name } verification key at "\
f"{ key_path }."
log.debug(message)
return False
return key
def _get_verification_key_path(self, key_name):
""" Return the path of a verification key. """
return f"{ RUGGED_VERIFICATION_KEY_DIR }/{ key_name }.pub"
def _init_roles(self):
""" Initialize a dictionary of roles. """
self.roles: Dict[str, Metadata] = {}
self._init_top_level_roles()
def _init_top_level_roles(self):
""" Create all top-level metadata objects. """
self._init_targets_role()
self._init_snapshot_role()
self._init_timestamp_role()
self._init_root_role()
def _init_targets_role(self):
""" Create targets metadata object. """
self.roles["targets"] = Metadata[Targets](
signed=Targets(
version=1,
spec_version=SPEC_VERSION,
expires=_in(7),
targets={},
),
signatures=OrderedDict(),
)
def _init_snapshot_role(self):
""" Create snapshot metadata object. """
self.roles["snapshot"] = Metadata[Snapshot](
Snapshot(
version=1,
spec_version=SPEC_VERSION,
expires=_in(7),
meta={"targets.json": MetaFile(version=1)},
),
OrderedDict(),
)
def _init_timestamp_role(self):
""" Create timestamp metadata object. """
self.roles["timestamp"] = Metadata[Timestamp](
Timestamp(
version=1,
spec_version=SPEC_VERSION,
expires=_in(1),
snapshot_meta=MetaFile(version=1),
),
OrderedDict(),
)
def _init_root_role(self):
""" Create root metadata object. """
self.roles["root"] = Metadata[Root](
signed=Root(
version=1,
spec_version=SPEC_VERSION,
expires=_in(365),
keys=self._get_repo_keys_for_root_role(),
roles=self._get_repo_roles_for_root_role(),
consistent_snapshot=False,
),
signatures=OrderedDict(),
)
def _get_repo_keys_for_root_role(self):
""" Return all known keys, keyed by ID. """
repo_keys = {}
for key in self.keys.values():
try:
repo_keys[key["keyid"]] = Key.from_securesystemslib_key(key)
except TypeError as e:
log_exception(e)
error = "Failed to generate key metadata during TUF "\
"repository initialization."
raise RuggedMetadataError(error)
return repo_keys
def _get_repo_roles_for_root_role(self):
""" Return all known roles, keyed by ID. """
repo_roles = {}
for role, role_info in config['roles'].get().items():
role_keys = []
for key_name in role_info['keys']:
role_keys.append(self.keys[key_name]["keyid"])
try:
repo_roles[role] = Role(role_keys, threshold=1)
except ValueError as e:
log_exception(e)
error = "Failed to generate role metadata during TUF "\
"repository initialization."
raise RuggedMetadataError(error)
return repo_roles
Classes
class RuggedRepository
-
An opinionated TUF repository using the low-level TUF Metadata API.
@TODO: Implement key rotation. (#35) @TODO: Make signature thresholds configurable per key. (#97) @TODO: Make expiration configurable per role. (#98) @TODO: Implement support for hashed bins. (#99) @TODO: Implement support for consistent snapshots. (#100) @TODO: Implement support for delegated targets. (#36)
Expand source code
class RuggedRepository(): """ An opinionated TUF repository using the low-level TUF Metadata API. @TODO: Implement key rotation. (#35) @TODO: Make signature thresholds configurable per key. (#97) @TODO: Make expiration configurable per role. (#98) @TODO: Implement support for hashed bins. (#99) @TODO: Implement support for consistent snapshots. (#100) @TODO: Implement support for delegated targets. (#36) """ def __init__(self): try: self._init_dirs() self._init_keys() self._init_roles() log.debug("Instantiated repository.") except Exception as e: log_exception(e) log.error("Failed to instantiate repository.") raise RuggedRepositoryError() def load(self): """ Load all metadata from storage. """ for name in self.roles.keys(): try: self.load_metadata(name) except RuggedMetadataError: error = "Failed to load all metadata." log.error(error) raise RuggedMetadataError(error) log.debug("Loaded all metadata.") def load_metadata(self, name: str): """ load a role's metadata from storage. """ metadata_file = self._get_metadata_path(name) try: self.roles[name] = self.roles[name].from_file(metadata_file) message = f"Loaded metadata for '{ name }' role from "\ f"'{ metadata_file }'." log.debug(message) except TypeError as e: log_exception(e) error = f"Failed to load metadata for '{ name }' role from "\ f"'{ metadata_file }'." raise RuggedMetadataError(error) def _get_metadata_path(self, name): """ Determine the path for a given role's metadata file. """ metadata_dir = config['repo_metadata_path'].get() filename = self._get_metadata_filename(name) return path.join(metadata_dir, filename) def _get_metadata_filename(self, name): """ Determine the filename for a given role's metadata. """ filename = f"{self.roles[name].signed.type}.json" if name == "timestamp": # Timestamp metadata doesn't use consistent snapshots. return filename if config['consistent_snapshot'].get(): filename = f"{self.roles[name].signed.version}.{ filename }" return filename def write(self): """ Write all metadata to storage. """ for name in self.roles.keys(): result = self.write_metadata(name) if not result: return False return True def write_metadata(self, role_name: str): """ Write a role's signed metadata to storage. """ PRETTY = JSONSerializer(compact=False) path = self._get_metadata_path(role_name) try: self._sign_metadata(role_name) self.roles[role_name].to_file(path, serializer=PRETTY) except Exception as e: log_exception(e) error = f"Failed to write '{ role_name }' metadata to file '{ path }'." log.error(error) return False log.debug(f"Wrote '{ role_name }' metadata to file '{ path }'.") return True def _sign_metadata(self, role_name: str): """ Sign a role's metadata. """ role_info = config['roles'].get()[role_name] for key_name in role_info['keys']: key = self.keys[key_name] try: signer = SSlibSigner(key) self.roles[role_name].sign(signer, append=True) except Exception as e: log_exception(e) log.error(f"Failed to sign '{ role_name }' metadata with '{ key_name }' key.") log.debug(f"Signed '{ role_name }' metadata with '{ key_name }' key.") def add_targets(self): """ Add any inbound targets to the targets metadata. """ inbound_targets = self.get_inbound_targets() added_targets = [] for inbound_target in inbound_targets: try: moved_target_path = self._move_inbound_target_to_targets_dir( inbound_target ) target_file_info = TargetFile.from_file( inbound_target, moved_target_path, ) self.roles["targets"].signed.targets[inbound_target] = target_file_info # noqa: E501 message = f"Added target '{ inbound_target }' "\ "to the repository." log.info(message) added_targets.append(inbound_target) except Exception as e: log_exception(e) warning = f"Failed to add target '{ inbound_target }' "\ "to the repository." log.warning(warning) if added_targets: self.roles["targets"].signed.version += 1 return added_targets def get_inbound_targets(self): """ Scan the inbound directory for files to add to the repository. """ inbound_targets_dir = config['inbound_targets_path'].get() message = "Scanning for inbound targets in: "\ f"{ inbound_targets_dir }" log.debug(message) chdir(inbound_targets_dir) inbound_targets = [] for inbound_target in glob('**', recursive=True): if path.isdir(inbound_target): # We only want files, not intermediate directories. continue log.debug(f"Found target: { inbound_target }") inbound_targets.append(inbound_target) return inbound_targets def _move_inbound_target_to_targets_dir(self, inbound_target): """ Move an inbound target to the repo targets directory. """ inbound_targets_path = config['inbound_targets_path'].get() inbound_target_path = path.join(inbound_targets_path, inbound_target) moved_target_path = path.join( config['repo_targets_path'].get(), inbound_target, ) try: makedirs(path.dirname(moved_target_path), exist_ok=True) move(inbound_target_path, moved_target_path) message = f"Moved '{ inbound_target_path }' to "\ f"'{ moved_target_path }'" log.debug(message) except Exception as e: log_exception(e) warning = f"Failed to move target '{ inbound_target }' to the "\ "targets directory." log.warning(warning) self._delete_empty_target_dirs(inbound_targets_path, inbound_target) message = f"Moved inbound target '{ inbound_target }' to targets "\ "directory." log.info(message) return moved_target_path def _delete_empty_target_dirs(self, root_dir, target): """ Delete any intermediate (empty) directories for a target path. """ if root_dir not in target: target = path.join(root_dir, target) target_dir = path.dirname(target) if target_dir == root_dir: return # This target is the root directory, so stop. if listdir(target_dir): return # We're only cleaning up empty directories. try: rmdir(target_dir) log.debug(f"Cleaned up empty directory '{ target_dir }'.") # Recurse until we hit the root directory. self._delete_empty_target_dirs(root_dir, target_dir) except OSError as e: log_exception(e) warning = f"Failed to clean up empty directory '{ target_dir }'." log.warning(warning) def remove_targets(self, targets): """ Remove given targets from the targets metadata. """ removed_targets = [] for target in targets: try: del self.roles["targets"].signed.targets[target] log.info(f"Removed target '{ target }' from the repository.") self._delete_removed_target(target) removed_targets.append(target) except Exception as e: log_exception(e) warning = f"Failed to remove target '{ target }' from the "\ "repository." log.warning(warning) if removed_targets: self.roles["targets"].signed.version += 1 return removed_targets def _delete_removed_target(self, removed_target): """ Delete the file for the target that we removed from the repo. """ repo_targets_path = config['repo_targets_path'].get() target_file = path.join(repo_targets_path, removed_target) try: remove(target_file) log.info(f"Deleted target file '{ target_file }'.") except Exception as e: log_exception(e) log.warning(f"Failed to delete target file '{ target_file }'.") self._delete_empty_target_dirs(repo_targets_path, removed_target) def update_snapshot(self): """ Update snapshot to account for changed targets metadata. """ self.roles["snapshot"].signed.meta["targets.json"].version = self.roles[ # noqa: E501 "targets" ].signed.version self.roles["snapshot"].signed.version += 1 log.info("Updated snapshot metadata.") def update_timestamp(self): """ Update timestamp to account for changed snapshot metadata. """ self.roles["timestamp"].signed.snapshot_meta.version = self.roles[ "snapshot" ].signed.version self.roles["timestamp"].signed.version += 1 log.info("Updated timestamp metadata.") def status(self): targets = self.roles['targets'].signed.targets repo_status = { 'targets': { 'count': len(targets), 'size': sum(target.length for target in targets.values()), }, 'roles': {}, } for role_name, role_info in self.roles.items(): repo_status['roles'][role_name] = { 'signatures': len(role_info.signatures), 'threshold': self.roles['root'].signed.roles[role_name].threshold, 'version': role_info.signed.version, 'tuf_spec': role_info.signed.spec_version, 'expires': role_info.signed.expires, } repo_status['roles'][role_name]['keys'] = {} role_key_ids = self.roles['root'].signed.roles[role_name].keyids for key_name, key_info in self.keys.items(): if key_info['keyid'] in role_key_ids: key_types = key_info['keyval'].keys() if 'private' in key_types: key_path = self._get_signing_key_path(key_name, role_name) else: key_path = self._get_verification_key_path(key_name) repo_status['roles'][role_name]['keys'][key_name] = { 'types': ", ".join(key_types), 'scheme': key_info['scheme'], 'key_path': key_path } return repo_status def _init_dirs(self): """ Ensure all repository directories exist. """ dirs = { config['repo_metadata_path'].get(): 0o755, config['repo_targets_path'].get(): 0o755, } for dir, mode in dirs.items(): try: makedirs(dir, mode=mode, exist_ok=True) except PermissionError as e: log_exception(e) raise RuggedStorageError def _init_keys(self): """ Initialize a dictionary of keys. """ self.keys: Dict[str, Dict[str, Any]] = {} for role_name, role_info in config['roles'].get().items(): for key_name in role_info['keys']: self.keys[key_name] = self._load_keys(key_name, role_name) def _load_keys(self, key_name, role_name): """ Load public keys and private keys (if available) from storage. """ signing_key = self._load_signing_key(key_name, role_name) if signing_key: return signing_key return self._load_verification_key(key_name) def _load_signing_key(self, key_name, role_name): """ Load a signing key for a given role. """ key_path = self._get_signing_key_path(key_name, role_name) try: key = import_ed25519_privatekey_from_file(key_path) log.debug(f"Loaded '{ key_name }' signing key at { key_path }.") except StorageError: # This is to be expected. So it's not really an error. log.debug(f"Cannot load { key_name } signing key at { key_path }.") return False return key def _get_signing_key_path(self, key_name, role_name): """ Return the path of a signing key. """ return f"{ RUGGED_SIGNING_KEY_DIR }/{ role_name }/{ key_name }" def _load_verification_key(self, key_name): """ Load a verification key for a given role. """ key_path = self._get_verification_key_path(key_name) try: key = import_ed25519_publickey_from_file(key_path) log.debug(f"Loaded '{ key_name }' verification key at { key_path }.") except StorageError: # This is to be expected. So it's not really an error. message = f"Cannot load { key_name } verification key at "\ f"{ key_path }." log.debug(message) return False return key def _get_verification_key_path(self, key_name): """ Return the path of a verification key. """ return f"{ RUGGED_VERIFICATION_KEY_DIR }/{ key_name }.pub" def _init_roles(self): """ Initialize a dictionary of roles. """ self.roles: Dict[str, Metadata] = {} self._init_top_level_roles() def _init_top_level_roles(self): """ Create all top-level metadata objects. """ self._init_targets_role() self._init_snapshot_role() self._init_timestamp_role() self._init_root_role() def _init_targets_role(self): """ Create targets metadata object. """ self.roles["targets"] = Metadata[Targets]( signed=Targets( version=1, spec_version=SPEC_VERSION, expires=_in(7), targets={}, ), signatures=OrderedDict(), ) def _init_snapshot_role(self): """ Create snapshot metadata object. """ self.roles["snapshot"] = Metadata[Snapshot]( Snapshot( version=1, spec_version=SPEC_VERSION, expires=_in(7), meta={"targets.json": MetaFile(version=1)}, ), OrderedDict(), ) def _init_timestamp_role(self): """ Create timestamp metadata object. """ self.roles["timestamp"] = Metadata[Timestamp]( Timestamp( version=1, spec_version=SPEC_VERSION, expires=_in(1), snapshot_meta=MetaFile(version=1), ), OrderedDict(), ) def _init_root_role(self): """ Create root metadata object. """ self.roles["root"] = Metadata[Root]( signed=Root( version=1, spec_version=SPEC_VERSION, expires=_in(365), keys=self._get_repo_keys_for_root_role(), roles=self._get_repo_roles_for_root_role(), consistent_snapshot=False, ), signatures=OrderedDict(), ) def _get_repo_keys_for_root_role(self): """ Return all known keys, keyed by ID. """ repo_keys = {} for key in self.keys.values(): try: repo_keys[key["keyid"]] = Key.from_securesystemslib_key(key) except TypeError as e: log_exception(e) error = "Failed to generate key metadata during TUF "\ "repository initialization." raise RuggedMetadataError(error) return repo_keys def _get_repo_roles_for_root_role(self): """ Return all known roles, keyed by ID. """ repo_roles = {} for role, role_info in config['roles'].get().items(): role_keys = [] for key_name in role_info['keys']: role_keys.append(self.keys[key_name]["keyid"]) try: repo_roles[role] = Role(role_keys, threshold=1) except ValueError as e: log_exception(e) error = "Failed to generate role metadata during TUF "\ "repository initialization." raise RuggedMetadataError(error) return repo_roles
Methods
def add_targets(self)
-
Add any inbound targets to the targets metadata.
Expand source code
def add_targets(self): """ Add any inbound targets to the targets metadata. """ inbound_targets = self.get_inbound_targets() added_targets = [] for inbound_target in inbound_targets: try: moved_target_path = self._move_inbound_target_to_targets_dir( inbound_target ) target_file_info = TargetFile.from_file( inbound_target, moved_target_path, ) self.roles["targets"].signed.targets[inbound_target] = target_file_info # noqa: E501 message = f"Added target '{ inbound_target }' "\ "to the repository." log.info(message) added_targets.append(inbound_target) except Exception as e: log_exception(e) warning = f"Failed to add target '{ inbound_target }' "\ "to the repository." log.warning(warning) if added_targets: self.roles["targets"].signed.version += 1 return added_targets
def get_inbound_targets(self)
-
Scan the inbound directory for files to add to the repository.
Expand source code
def get_inbound_targets(self): """ Scan the inbound directory for files to add to the repository. """ inbound_targets_dir = config['inbound_targets_path'].get() message = "Scanning for inbound targets in: "\ f"{ inbound_targets_dir }" log.debug(message) chdir(inbound_targets_dir) inbound_targets = [] for inbound_target in glob('**', recursive=True): if path.isdir(inbound_target): # We only want files, not intermediate directories. continue log.debug(f"Found target: { inbound_target }") inbound_targets.append(inbound_target) return inbound_targets
def load(self)
-
Load all metadata from storage.
Expand source code
def load(self): """ Load all metadata from storage. """ for name in self.roles.keys(): try: self.load_metadata(name) except RuggedMetadataError: error = "Failed to load all metadata." log.error(error) raise RuggedMetadataError(error) log.debug("Loaded all metadata.")
def load_metadata(self, name: str)
-
load a role's metadata from storage.
Expand source code
def load_metadata(self, name: str): """ load a role's metadata from storage. """ metadata_file = self._get_metadata_path(name) try: self.roles[name] = self.roles[name].from_file(metadata_file) message = f"Loaded metadata for '{ name }' role from "\ f"'{ metadata_file }'." log.debug(message) except TypeError as e: log_exception(e) error = f"Failed to load metadata for '{ name }' role from "\ f"'{ metadata_file }'." raise RuggedMetadataError(error)
def remove_targets(self, targets)
-
Remove given targets from the targets metadata.
Expand source code
def remove_targets(self, targets): """ Remove given targets from the targets metadata. """ removed_targets = [] for target in targets: try: del self.roles["targets"].signed.targets[target] log.info(f"Removed target '{ target }' from the repository.") self._delete_removed_target(target) removed_targets.append(target) except Exception as e: log_exception(e) warning = f"Failed to remove target '{ target }' from the "\ "repository." log.warning(warning) if removed_targets: self.roles["targets"].signed.version += 1 return removed_targets
def status(self)
-
Expand source code
def status(self): targets = self.roles['targets'].signed.targets repo_status = { 'targets': { 'count': len(targets), 'size': sum(target.length for target in targets.values()), }, 'roles': {}, } for role_name, role_info in self.roles.items(): repo_status['roles'][role_name] = { 'signatures': len(role_info.signatures), 'threshold': self.roles['root'].signed.roles[role_name].threshold, 'version': role_info.signed.version, 'tuf_spec': role_info.signed.spec_version, 'expires': role_info.signed.expires, } repo_status['roles'][role_name]['keys'] = {} role_key_ids = self.roles['root'].signed.roles[role_name].keyids for key_name, key_info in self.keys.items(): if key_info['keyid'] in role_key_ids: key_types = key_info['keyval'].keys() if 'private' in key_types: key_path = self._get_signing_key_path(key_name, role_name) else: key_path = self._get_verification_key_path(key_name) repo_status['roles'][role_name]['keys'][key_name] = { 'types': ", ".join(key_types), 'scheme': key_info['scheme'], 'key_path': key_path } return repo_status
def update_snapshot(self)
-
Update snapshot to account for changed targets metadata.
Expand source code
def update_snapshot(self): """ Update snapshot to account for changed targets metadata. """ self.roles["snapshot"].signed.meta["targets.json"].version = self.roles[ # noqa: E501 "targets" ].signed.version self.roles["snapshot"].signed.version += 1 log.info("Updated snapshot metadata.")
def update_timestamp(self)
-
Update timestamp to account for changed snapshot metadata.
Expand source code
def update_timestamp(self): """ Update timestamp to account for changed snapshot metadata. """ self.roles["timestamp"].signed.snapshot_meta.version = self.roles[ "snapshot" ].signed.version self.roles["timestamp"].signed.version += 1 log.info("Updated timestamp metadata.")
def write(self)
-
Write all metadata to storage.
Expand source code
def write(self): """ Write all metadata to storage. """ for name in self.roles.keys(): result = self.write_metadata(name) if not result: return False return True
def write_metadata(self, role_name: str)
-
Write a role's signed metadata to storage.
Expand source code
def write_metadata(self, role_name: str): """ Write a role's signed metadata to storage. """ PRETTY = JSONSerializer(compact=False) path = self._get_metadata_path(role_name) try: self._sign_metadata(role_name) self.roles[role_name].to_file(path, serializer=PRETTY) except Exception as e: log_exception(e) error = f"Failed to write '{ role_name }' metadata to file '{ path }'." log.error(error) return False log.debug(f"Wrote '{ role_name }' metadata to file '{ path }'.") return True