Module rugged.tuf.repo

Expand source code
from collections import OrderedDict
from datetime import datetime, timedelta
from glob import glob
from os import chdir, listdir, makedirs, path, remove, rmdir
from shutil import move
from securesystemslib.exceptions import StorageError
from securesystemslib.interface import (
    import_ed25519_publickey_from_file,
    import_ed25519_privatekey_from_file,
)
from securesystemslib.signer import SSlibSigner
from typing import Any, Dict

from tuf.api.metadata import (
    SPECIFICATION_VERSION,
    Key,
    Metadata,
    MetaFile,
    Root,
    Role,
    Snapshot,
    TargetFile,
    Targets,
    Timestamp,
)
from tuf.api.serialization.json import JSONSerializer

from rugged.exceptions.metadata_error import RuggedMetadataError
from rugged.exceptions.repository_error import RuggedRepositoryError
from rugged.exceptions.storage_error import RuggedStorageError
from rugged.lib.config import get_config
from rugged.lib.constants import (
    RUGGED_SIGNING_KEY_DIR,
    RUGGED_VERIFICATION_KEY_DIR,
)
from rugged.lib.logger import get_logger, log_exception

config = get_config()
log = get_logger()
SPEC_VERSION = ".".join(SPECIFICATION_VERSION)


def _in(days: float) -> datetime:
    """Adds 'days' to now and returns datetime object w/o microseconds."""
    return datetime.utcnow().replace(microsecond=0) + timedelta(days=days)


class RuggedRepository():
    """
    An opinionated TUF repository using the low-level TUF Metadata API.

    @TODO: Implement key rotation. (#35)
    @TODO: Make signature thresholds configurable per key. (#97)
    @TODO: Make expiration configurable per role. (#98)
    @TODO: Implement support for hashed bins. (#99)
    @TODO: Implement support for consistent snapshots. (#100)
    @TODO: Implement support for delegated targets. (#36)
    """

    def __init__(self):
        try:
            self._init_dirs()
            self._init_keys()
            self._init_roles()
            log.debug("Instantiated repository.")
        except Exception as e:
            log_exception(e)
            log.error("Failed to instantiate repository.")
            raise RuggedRepositoryError()

    def load(self):
        """ Load all metadata from storage. """
        for name in self.roles.keys():
            try:
                self.load_metadata(name)
            except RuggedMetadataError:
                error = "Failed to load all metadata."
                log.error(error)
                raise RuggedMetadataError(error)
        log.debug("Loaded all metadata.")

    def load_metadata(self, name: str):
        """ load a role's metadata from storage. """
        metadata_file = self._get_metadata_path(name)
        try:
            self.roles[name] = self.roles[name].from_file(metadata_file)
            message = f"Loaded metadata for '{ name }' role from "\
                      f"'{ metadata_file }'."
            log.debug(message)
        except TypeError as e:
            log_exception(e)
            error = f"Failed to load metadata for '{ name }' role from "\
                    f"'{ metadata_file }'."
            raise RuggedMetadataError(error)

    def _get_metadata_path(self, name):
        """ Determine the path for a given role's metadata file. """
        metadata_dir = config['repo_metadata_path'].get()
        filename = self._get_metadata_filename(name)
        return path.join(metadata_dir, filename)

    def _get_metadata_filename(self, name):
        """ Determine the filename for a given role's metadata. """
        filename = f"{self.roles[name].signed.type}.json"
        if name == "timestamp":
            # Timestamp metadata doesn't use consistent snapshots.
            return filename
        if config['consistent_snapshot'].get():
            filename = f"{self.roles[name].signed.version}.{ filename }"
        return filename

    def write(self):
        """ Write all metadata to storage. """
        for name in self.roles.keys():
            result = self.write_metadata(name)
            if not result:
                return False
        return True

    def write_metadata(self, role_name: str):
        """ Write a role's signed metadata to storage. """
        PRETTY = JSONSerializer(compact=False)
        path = self._get_metadata_path(role_name)
        try:
            self._sign_metadata(role_name)
            self.roles[role_name].to_file(path, serializer=PRETTY)
        except Exception as e:
            log_exception(e)
            error = f"Failed to write '{ role_name }' metadata to file '{ path }'."
            log.error(error)
            return False
        log.debug(f"Wrote '{ role_name }' metadata to file '{ path }'.")
        return True

    def _sign_metadata(self, role_name: str):
        """ Sign a role's metadata. """
        role_info = config['roles'].get()[role_name]
        for key_name in role_info['keys']:
            key = self.keys[key_name]
            try:
                signer = SSlibSigner(key)
                self.roles[role_name].sign(signer, append=True)
            except Exception as e:
                log_exception(e)
                log.error(f"Failed to sign '{ role_name }' metadata with '{ key_name }' key.")
            log.debug(f"Signed '{ role_name }' metadata with '{ key_name }' key.")

    def add_targets(self):
        """ Add any inbound targets to the targets metadata. """
        inbound_targets = self.get_inbound_targets()
        added_targets = []
        for inbound_target in inbound_targets:
            try:
                moved_target_path = self._move_inbound_target_to_targets_dir(
                    inbound_target
                )
                target_file_info = TargetFile.from_file(
                    inbound_target,
                    moved_target_path,
                )
                self.roles["targets"].signed.targets[inbound_target] = target_file_info  # noqa: E501
                message = f"Added target '{ inbound_target }' "\
                          "to the repository."
                log.info(message)
                added_targets.append(inbound_target)
            except Exception as e:
                log_exception(e)
                warning = f"Failed to add target '{ inbound_target }' "\
                          "to the repository."
                log.warning(warning)
        if added_targets:
            self.roles["targets"].signed.version += 1
        return added_targets

    def get_inbound_targets(self):
        """ Scan the inbound directory for files to add to the repository. """
        inbound_targets_dir = config['inbound_targets_path'].get()
        message = "Scanning for inbound targets in: "\
                  f"{ inbound_targets_dir }"
        log.debug(message)
        chdir(inbound_targets_dir)
        inbound_targets = []
        for inbound_target in glob('**', recursive=True):
            if path.isdir(inbound_target):
                # We only want files, not intermediate directories.
                continue
            log.debug(f"Found target: { inbound_target }")
            inbound_targets.append(inbound_target)
        return inbound_targets

    def _move_inbound_target_to_targets_dir(self, inbound_target):
        """ Move an inbound target to the repo targets directory. """
        inbound_targets_path = config['inbound_targets_path'].get()
        inbound_target_path = path.join(inbound_targets_path, inbound_target)
        moved_target_path = path.join(
            config['repo_targets_path'].get(),
            inbound_target,
        )
        try:
            makedirs(path.dirname(moved_target_path), exist_ok=True)
            move(inbound_target_path, moved_target_path)
            message = f"Moved '{ inbound_target_path }' to "\
                      f"'{ moved_target_path }'"
            log.debug(message)
        except Exception as e:
            log_exception(e)
            warning = f"Failed to move target '{ inbound_target }' to the "\
                      "targets directory."
            log.warning(warning)
        self._delete_empty_target_dirs(inbound_targets_path, inbound_target)
        message = f"Moved inbound target '{ inbound_target }' to targets "\
                  "directory."
        log.info(message)
        return moved_target_path

    def _delete_empty_target_dirs(self, root_dir, target):
        """ Delete any intermediate (empty) directories for a target path. """
        if root_dir not in target:
            target = path.join(root_dir, target)
        target_dir = path.dirname(target)
        if target_dir == root_dir:
            return   # This target is the root directory, so stop.
        if listdir(target_dir):
            return   # We're only cleaning up empty directories.
        try:
            rmdir(target_dir)
            log.debug(f"Cleaned up empty directory '{ target_dir }'.")
            # Recurse until we hit the root directory.
            self._delete_empty_target_dirs(root_dir, target_dir)
        except OSError as e:
            log_exception(e)
            warning = f"Failed to clean up empty directory '{ target_dir }'."
            log.warning(warning)

    def remove_targets(self, targets):
        """ Remove given targets from the targets metadata. """
        removed_targets = []
        for target in targets:
            try:
                del self.roles["targets"].signed.targets[target]
                log.info(f"Removed target '{ target }' from the repository.")
                self._delete_removed_target(target)
                removed_targets.append(target)
            except Exception as e:
                log_exception(e)
                warning = f"Failed to remove target '{ target }' from the "\
                          "repository."
                log.warning(warning)
        if removed_targets:
            self.roles["targets"].signed.version += 1
        return removed_targets

    def _delete_removed_target(self, removed_target):
        """ Delete the file for the target that we removed from the repo. """
        repo_targets_path = config['repo_targets_path'].get()
        target_file = path.join(repo_targets_path, removed_target)
        try:
            remove(target_file)
            log.info(f"Deleted target file '{ target_file }'.")
        except Exception as e:
            log_exception(e)
            log.warning(f"Failed to delete target file '{ target_file }'.")
        self._delete_empty_target_dirs(repo_targets_path, removed_target)

    def update_snapshot(self):
        """ Update snapshot to account for changed targets metadata. """
        self.roles["snapshot"].signed.meta["targets.json"].version = self.roles[  # noqa: E501
            "targets"
        ].signed.version
        self.roles["snapshot"].signed.version += 1
        log.info("Updated snapshot metadata.")

    def update_timestamp(self):
        """ Update timestamp to account for changed snapshot metadata. """
        self.roles["timestamp"].signed.snapshot_meta.version = self.roles[
            "snapshot"
        ].signed.version
        self.roles["timestamp"].signed.version += 1
        log.info("Updated timestamp metadata.")

    def status(self):
        targets = self.roles['targets'].signed.targets
        repo_status = {
            'targets': {
                'count': len(targets),
                'size': sum(target.length for target in targets.values()),
            },
            'roles': {},
        }
        for role_name, role_info in self.roles.items():
            repo_status['roles'][role_name] = {
                'signatures': len(role_info.signatures),
                'threshold': self.roles['root'].signed.roles[role_name].threshold,
                'version': role_info.signed.version,
                'tuf_spec': role_info.signed.spec_version,
                'expires': role_info.signed.expires,
            }
            repo_status['roles'][role_name]['keys'] = {}
            role_key_ids = self.roles['root'].signed.roles[role_name].keyids
            for key_name, key_info in self.keys.items():
                if key_info['keyid'] in role_key_ids:
                    key_types = key_info['keyval'].keys()
                    if 'private' in key_types:
                        key_path = self._get_signing_key_path(key_name, role_name)
                    else:
                        key_path = self._get_verification_key_path(key_name)
                    repo_status['roles'][role_name]['keys'][key_name] = {
                        'types': ", ".join(key_types),
                        'scheme': key_info['scheme'],
                        'key_path': key_path
                    }
        return repo_status

    def _init_dirs(self):
        """ Ensure all repository directories exist. """
        dirs = {
            config['repo_metadata_path'].get(): 0o755,
            config['repo_targets_path'].get(): 0o755,
        }
        for dir, mode in dirs.items():
            try:
                makedirs(dir, mode=mode, exist_ok=True)
            except PermissionError as e:
                log_exception(e)
                raise RuggedStorageError

    def _init_keys(self):
        """ Initialize a dictionary of keys. """
        self.keys: Dict[str, Dict[str, Any]] = {}
        for role_name, role_info in config['roles'].get().items():
            for key_name in role_info['keys']:
                self.keys[key_name] = self._load_keys(key_name, role_name)

    def _load_keys(self, key_name, role_name):
        """ Load public keys and private keys (if available) from storage. """
        signing_key = self._load_signing_key(key_name, role_name)
        if signing_key:
            return signing_key
        return self._load_verification_key(key_name)

    def _load_signing_key(self, key_name, role_name):
        """ Load a signing key for a given role. """
        key_path = self._get_signing_key_path(key_name, role_name)
        try:
            key = import_ed25519_privatekey_from_file(key_path)
            log.debug(f"Loaded '{ key_name }' signing key at { key_path }.")
        except StorageError:
            # This is to be expected. So it's not really an error.
            log.debug(f"Cannot load { key_name } signing key at { key_path }.")
            return False
        return key

    def _get_signing_key_path(self, key_name, role_name):
        """ Return the path of a signing key. """
        return f"{ RUGGED_SIGNING_KEY_DIR }/{ role_name }/{ key_name }"

    def _load_verification_key(self, key_name):
        """ Load a verification key for a given role. """
        key_path = self._get_verification_key_path(key_name)
        try:
            key = import_ed25519_publickey_from_file(key_path)
            log.debug(f"Loaded '{ key_name }' verification key at { key_path }.")
        except StorageError:
            # This is to be expected. So it's not really an error.
            message = f"Cannot load { key_name } verification key at "\
                      f"{ key_path }."
            log.debug(message)
            return False
        return key

    def _get_verification_key_path(self, key_name):
        """ Return the path of a verification key. """
        return f"{ RUGGED_VERIFICATION_KEY_DIR }/{ key_name }.pub"

    def _init_roles(self):
        """ Initialize a dictionary of roles. """
        self.roles: Dict[str, Metadata] = {}
        self._init_top_level_roles()

    def _init_top_level_roles(self):
        """ Create all top-level metadata objects. """
        self._init_targets_role()
        self._init_snapshot_role()
        self._init_timestamp_role()
        self._init_root_role()

    def _init_targets_role(self):
        """ Create targets metadata object. """
        self.roles["targets"] = Metadata[Targets](
            signed=Targets(
                version=1,
                spec_version=SPEC_VERSION,
                expires=_in(7),
                targets={},
            ),
            signatures=OrderedDict(),
        )

    def _init_snapshot_role(self):
        """ Create snapshot metadata object. """
        self.roles["snapshot"] = Metadata[Snapshot](
            Snapshot(
                version=1,
                spec_version=SPEC_VERSION,
                expires=_in(7),
                meta={"targets.json": MetaFile(version=1)},
            ),
            OrderedDict(),
        )

    def _init_timestamp_role(self):
        """ Create timestamp metadata object. """
        self.roles["timestamp"] = Metadata[Timestamp](
            Timestamp(
                version=1,
                spec_version=SPEC_VERSION,
                expires=_in(1),
                snapshot_meta=MetaFile(version=1),
            ),
            OrderedDict(),
        )

    def _init_root_role(self):
        """ Create root metadata object. """
        self.roles["root"] = Metadata[Root](
            signed=Root(
                version=1,
                spec_version=SPEC_VERSION,
                expires=_in(365),
                keys=self._get_repo_keys_for_root_role(),
                roles=self._get_repo_roles_for_root_role(),
                consistent_snapshot=False,
            ),
            signatures=OrderedDict(),
        )

    def _get_repo_keys_for_root_role(self):
        """ Return all known keys, keyed by ID. """
        repo_keys = {}
        for key in self.keys.values():
            try:
                repo_keys[key["keyid"]] = Key.from_securesystemslib_key(key)
            except TypeError as e:
                log_exception(e)
                error = "Failed to generate key metadata during TUF "\
                        "repository initialization."
                raise RuggedMetadataError(error)
        return repo_keys

    def _get_repo_roles_for_root_role(self):
        """ Return all known roles, keyed by ID. """
        repo_roles = {}
        for role, role_info in config['roles'].get().items():
            role_keys = []
            for key_name in role_info['keys']:
                role_keys.append(self.keys[key_name]["keyid"])
                try:
                    repo_roles[role] = Role(role_keys, threshold=1)
                except ValueError as e:
                    log_exception(e)
                    error = "Failed to generate role metadata during TUF "\
                            "repository initialization."
                    raise RuggedMetadataError(error)
        return repo_roles

Classes

class RuggedRepository

An opinionated TUF repository using the low-level TUF Metadata API.

@TODO: Implement key rotation. (#35) @TODO: Make signature thresholds configurable per key. (#97) @TODO: Make expiration configurable per role. (#98) @TODO: Implement support for hashed bins. (#99) @TODO: Implement support for consistent snapshots. (#100) @TODO: Implement support for delegated targets. (#36)

Expand source code
class RuggedRepository():
    """
    An opinionated TUF repository using the low-level TUF Metadata API.

    @TODO: Implement key rotation. (#35)
    @TODO: Make signature thresholds configurable per key. (#97)
    @TODO: Make expiration configurable per role. (#98)
    @TODO: Implement support for hashed bins. (#99)
    @TODO: Implement support for consistent snapshots. (#100)
    @TODO: Implement support for delegated targets. (#36)
    """

    def __init__(self):
        try:
            self._init_dirs()
            self._init_keys()
            self._init_roles()
            log.debug("Instantiated repository.")
        except Exception as e:
            log_exception(e)
            log.error("Failed to instantiate repository.")
            raise RuggedRepositoryError()

    def load(self):
        """ Load all metadata from storage. """
        for name in self.roles.keys():
            try:
                self.load_metadata(name)
            except RuggedMetadataError:
                error = "Failed to load all metadata."
                log.error(error)
                raise RuggedMetadataError(error)
        log.debug("Loaded all metadata.")

    def load_metadata(self, name: str):
        """ load a role's metadata from storage. """
        metadata_file = self._get_metadata_path(name)
        try:
            self.roles[name] = self.roles[name].from_file(metadata_file)
            message = f"Loaded metadata for '{ name }' role from "\
                      f"'{ metadata_file }'."
            log.debug(message)
        except TypeError as e:
            log_exception(e)
            error = f"Failed to load metadata for '{ name }' role from "\
                    f"'{ metadata_file }'."
            raise RuggedMetadataError(error)

    def _get_metadata_path(self, name):
        """ Determine the path for a given role's metadata file. """
        metadata_dir = config['repo_metadata_path'].get()
        filename = self._get_metadata_filename(name)
        return path.join(metadata_dir, filename)

    def _get_metadata_filename(self, name):
        """ Determine the filename for a given role's metadata. """
        filename = f"{self.roles[name].signed.type}.json"
        if name == "timestamp":
            # Timestamp metadata doesn't use consistent snapshots.
            return filename
        if config['consistent_snapshot'].get():
            filename = f"{self.roles[name].signed.version}.{ filename }"
        return filename

    def write(self):
        """ Write all metadata to storage. """
        for name in self.roles.keys():
            result = self.write_metadata(name)
            if not result:
                return False
        return True

    def write_metadata(self, role_name: str):
        """ Write a role's signed metadata to storage. """
        PRETTY = JSONSerializer(compact=False)
        path = self._get_metadata_path(role_name)
        try:
            self._sign_metadata(role_name)
            self.roles[role_name].to_file(path, serializer=PRETTY)
        except Exception as e:
            log_exception(e)
            error = f"Failed to write '{ role_name }' metadata to file '{ path }'."
            log.error(error)
            return False
        log.debug(f"Wrote '{ role_name }' metadata to file '{ path }'.")
        return True

    def _sign_metadata(self, role_name: str):
        """ Sign a role's metadata. """
        role_info = config['roles'].get()[role_name]
        for key_name in role_info['keys']:
            key = self.keys[key_name]
            try:
                signer = SSlibSigner(key)
                self.roles[role_name].sign(signer, append=True)
            except Exception as e:
                log_exception(e)
                log.error(f"Failed to sign '{ role_name }' metadata with '{ key_name }' key.")
            log.debug(f"Signed '{ role_name }' metadata with '{ key_name }' key.")

    def add_targets(self):
        """ Add any inbound targets to the targets metadata. """
        inbound_targets = self.get_inbound_targets()
        added_targets = []
        for inbound_target in inbound_targets:
            try:
                moved_target_path = self._move_inbound_target_to_targets_dir(
                    inbound_target
                )
                target_file_info = TargetFile.from_file(
                    inbound_target,
                    moved_target_path,
                )
                self.roles["targets"].signed.targets[inbound_target] = target_file_info  # noqa: E501
                message = f"Added target '{ inbound_target }' "\
                          "to the repository."
                log.info(message)
                added_targets.append(inbound_target)
            except Exception as e:
                log_exception(e)
                warning = f"Failed to add target '{ inbound_target }' "\
                          "to the repository."
                log.warning(warning)
        if added_targets:
            self.roles["targets"].signed.version += 1
        return added_targets

    def get_inbound_targets(self):
        """ Scan the inbound directory for files to add to the repository. """
        inbound_targets_dir = config['inbound_targets_path'].get()
        message = "Scanning for inbound targets in: "\
                  f"{ inbound_targets_dir }"
        log.debug(message)
        chdir(inbound_targets_dir)
        inbound_targets = []
        for inbound_target in glob('**', recursive=True):
            if path.isdir(inbound_target):
                # We only want files, not intermediate directories.
                continue
            log.debug(f"Found target: { inbound_target }")
            inbound_targets.append(inbound_target)
        return inbound_targets

    def _move_inbound_target_to_targets_dir(self, inbound_target):
        """ Move an inbound target to the repo targets directory. """
        inbound_targets_path = config['inbound_targets_path'].get()
        inbound_target_path = path.join(inbound_targets_path, inbound_target)
        moved_target_path = path.join(
            config['repo_targets_path'].get(),
            inbound_target,
        )
        try:
            makedirs(path.dirname(moved_target_path), exist_ok=True)
            move(inbound_target_path, moved_target_path)
            message = f"Moved '{ inbound_target_path }' to "\
                      f"'{ moved_target_path }'"
            log.debug(message)
        except Exception as e:
            log_exception(e)
            warning = f"Failed to move target '{ inbound_target }' to the "\
                      "targets directory."
            log.warning(warning)
        self._delete_empty_target_dirs(inbound_targets_path, inbound_target)
        message = f"Moved inbound target '{ inbound_target }' to targets "\
                  "directory."
        log.info(message)
        return moved_target_path

    def _delete_empty_target_dirs(self, root_dir, target):
        """ Delete any intermediate (empty) directories for a target path. """
        if root_dir not in target:
            target = path.join(root_dir, target)
        target_dir = path.dirname(target)
        if target_dir == root_dir:
            return   # This target is the root directory, so stop.
        if listdir(target_dir):
            return   # We're only cleaning up empty directories.
        try:
            rmdir(target_dir)
            log.debug(f"Cleaned up empty directory '{ target_dir }'.")
            # Recurse until we hit the root directory.
            self._delete_empty_target_dirs(root_dir, target_dir)
        except OSError as e:
            log_exception(e)
            warning = f"Failed to clean up empty directory '{ target_dir }'."
            log.warning(warning)

    def remove_targets(self, targets):
        """ Remove given targets from the targets metadata. """
        removed_targets = []
        for target in targets:
            try:
                del self.roles["targets"].signed.targets[target]
                log.info(f"Removed target '{ target }' from the repository.")
                self._delete_removed_target(target)
                removed_targets.append(target)
            except Exception as e:
                log_exception(e)
                warning = f"Failed to remove target '{ target }' from the "\
                          "repository."
                log.warning(warning)
        if removed_targets:
            self.roles["targets"].signed.version += 1
        return removed_targets

    def _delete_removed_target(self, removed_target):
        """ Delete the file for the target that we removed from the repo. """
        repo_targets_path = config['repo_targets_path'].get()
        target_file = path.join(repo_targets_path, removed_target)
        try:
            remove(target_file)
            log.info(f"Deleted target file '{ target_file }'.")
        except Exception as e:
            log_exception(e)
            log.warning(f"Failed to delete target file '{ target_file }'.")
        self._delete_empty_target_dirs(repo_targets_path, removed_target)

    def update_snapshot(self):
        """ Update snapshot to account for changed targets metadata. """
        self.roles["snapshot"].signed.meta["targets.json"].version = self.roles[  # noqa: E501
            "targets"
        ].signed.version
        self.roles["snapshot"].signed.version += 1
        log.info("Updated snapshot metadata.")

    def update_timestamp(self):
        """ Update timestamp to account for changed snapshot metadata. """
        self.roles["timestamp"].signed.snapshot_meta.version = self.roles[
            "snapshot"
        ].signed.version
        self.roles["timestamp"].signed.version += 1
        log.info("Updated timestamp metadata.")

    def status(self):
        targets = self.roles['targets'].signed.targets
        repo_status = {
            'targets': {
                'count': len(targets),
                'size': sum(target.length for target in targets.values()),
            },
            'roles': {},
        }
        for role_name, role_info in self.roles.items():
            repo_status['roles'][role_name] = {
                'signatures': len(role_info.signatures),
                'threshold': self.roles['root'].signed.roles[role_name].threshold,
                'version': role_info.signed.version,
                'tuf_spec': role_info.signed.spec_version,
                'expires': role_info.signed.expires,
            }
            repo_status['roles'][role_name]['keys'] = {}
            role_key_ids = self.roles['root'].signed.roles[role_name].keyids
            for key_name, key_info in self.keys.items():
                if key_info['keyid'] in role_key_ids:
                    key_types = key_info['keyval'].keys()
                    if 'private' in key_types:
                        key_path = self._get_signing_key_path(key_name, role_name)
                    else:
                        key_path = self._get_verification_key_path(key_name)
                    repo_status['roles'][role_name]['keys'][key_name] = {
                        'types': ", ".join(key_types),
                        'scheme': key_info['scheme'],
                        'key_path': key_path
                    }
        return repo_status

    def _init_dirs(self):
        """ Ensure all repository directories exist. """
        dirs = {
            config['repo_metadata_path'].get(): 0o755,
            config['repo_targets_path'].get(): 0o755,
        }
        for dir, mode in dirs.items():
            try:
                makedirs(dir, mode=mode, exist_ok=True)
            except PermissionError as e:
                log_exception(e)
                raise RuggedStorageError

    def _init_keys(self):
        """ Initialize a dictionary of keys. """
        self.keys: Dict[str, Dict[str, Any]] = {}
        for role_name, role_info in config['roles'].get().items():
            for key_name in role_info['keys']:
                self.keys[key_name] = self._load_keys(key_name, role_name)

    def _load_keys(self, key_name, role_name):
        """ Load public keys and private keys (if available) from storage. """
        signing_key = self._load_signing_key(key_name, role_name)
        if signing_key:
            return signing_key
        return self._load_verification_key(key_name)

    def _load_signing_key(self, key_name, role_name):
        """ Load a signing key for a given role. """
        key_path = self._get_signing_key_path(key_name, role_name)
        try:
            key = import_ed25519_privatekey_from_file(key_path)
            log.debug(f"Loaded '{ key_name }' signing key at { key_path }.")
        except StorageError:
            # This is to be expected. So it's not really an error.
            log.debug(f"Cannot load { key_name } signing key at { key_path }.")
            return False
        return key

    def _get_signing_key_path(self, key_name, role_name):
        """ Return the path of a signing key. """
        return f"{ RUGGED_SIGNING_KEY_DIR }/{ role_name }/{ key_name }"

    def _load_verification_key(self, key_name):
        """ Load a verification key for a given role. """
        key_path = self._get_verification_key_path(key_name)
        try:
            key = import_ed25519_publickey_from_file(key_path)
            log.debug(f"Loaded '{ key_name }' verification key at { key_path }.")
        except StorageError:
            # This is to be expected. So it's not really an error.
            message = f"Cannot load { key_name } verification key at "\
                      f"{ key_path }."
            log.debug(message)
            return False
        return key

    def _get_verification_key_path(self, key_name):
        """ Return the path of a verification key. """
        return f"{ RUGGED_VERIFICATION_KEY_DIR }/{ key_name }.pub"

    def _init_roles(self):
        """ Initialize a dictionary of roles. """
        self.roles: Dict[str, Metadata] = {}
        self._init_top_level_roles()

    def _init_top_level_roles(self):
        """ Create all top-level metadata objects. """
        self._init_targets_role()
        self._init_snapshot_role()
        self._init_timestamp_role()
        self._init_root_role()

    def _init_targets_role(self):
        """ Create targets metadata object. """
        self.roles["targets"] = Metadata[Targets](
            signed=Targets(
                version=1,
                spec_version=SPEC_VERSION,
                expires=_in(7),
                targets={},
            ),
            signatures=OrderedDict(),
        )

    def _init_snapshot_role(self):
        """ Create snapshot metadata object. """
        self.roles["snapshot"] = Metadata[Snapshot](
            Snapshot(
                version=1,
                spec_version=SPEC_VERSION,
                expires=_in(7),
                meta={"targets.json": MetaFile(version=1)},
            ),
            OrderedDict(),
        )

    def _init_timestamp_role(self):
        """ Create timestamp metadata object. """
        self.roles["timestamp"] = Metadata[Timestamp](
            Timestamp(
                version=1,
                spec_version=SPEC_VERSION,
                expires=_in(1),
                snapshot_meta=MetaFile(version=1),
            ),
            OrderedDict(),
        )

    def _init_root_role(self):
        """ Create root metadata object. """
        self.roles["root"] = Metadata[Root](
            signed=Root(
                version=1,
                spec_version=SPEC_VERSION,
                expires=_in(365),
                keys=self._get_repo_keys_for_root_role(),
                roles=self._get_repo_roles_for_root_role(),
                consistent_snapshot=False,
            ),
            signatures=OrderedDict(),
        )

    def _get_repo_keys_for_root_role(self):
        """ Return all known keys, keyed by ID. """
        repo_keys = {}
        for key in self.keys.values():
            try:
                repo_keys[key["keyid"]] = Key.from_securesystemslib_key(key)
            except TypeError as e:
                log_exception(e)
                error = "Failed to generate key metadata during TUF "\
                        "repository initialization."
                raise RuggedMetadataError(error)
        return repo_keys

    def _get_repo_roles_for_root_role(self):
        """ Return all known roles, keyed by ID. """
        repo_roles = {}
        for role, role_info in config['roles'].get().items():
            role_keys = []
            for key_name in role_info['keys']:
                role_keys.append(self.keys[key_name]["keyid"])
                try:
                    repo_roles[role] = Role(role_keys, threshold=1)
                except ValueError as e:
                    log_exception(e)
                    error = "Failed to generate role metadata during TUF "\
                            "repository initialization."
                    raise RuggedMetadataError(error)
        return repo_roles

Methods

def add_targets(self)

Add any inbound targets to the targets metadata.

Expand source code
def add_targets(self):
    """ Add any inbound targets to the targets metadata. """
    inbound_targets = self.get_inbound_targets()
    added_targets = []
    for inbound_target in inbound_targets:
        try:
            moved_target_path = self._move_inbound_target_to_targets_dir(
                inbound_target
            )
            target_file_info = TargetFile.from_file(
                inbound_target,
                moved_target_path,
            )
            self.roles["targets"].signed.targets[inbound_target] = target_file_info  # noqa: E501
            message = f"Added target '{ inbound_target }' "\
                      "to the repository."
            log.info(message)
            added_targets.append(inbound_target)
        except Exception as e:
            log_exception(e)
            warning = f"Failed to add target '{ inbound_target }' "\
                      "to the repository."
            log.warning(warning)
    if added_targets:
        self.roles["targets"].signed.version += 1
    return added_targets
def get_inbound_targets(self)

Scan the inbound directory for files to add to the repository.

Expand source code
def get_inbound_targets(self):
    """ Scan the inbound directory for files to add to the repository. """
    inbound_targets_dir = config['inbound_targets_path'].get()
    message = "Scanning for inbound targets in: "\
              f"{ inbound_targets_dir }"
    log.debug(message)
    chdir(inbound_targets_dir)
    inbound_targets = []
    for inbound_target in glob('**', recursive=True):
        if path.isdir(inbound_target):
            # We only want files, not intermediate directories.
            continue
        log.debug(f"Found target: { inbound_target }")
        inbound_targets.append(inbound_target)
    return inbound_targets
def load(self)

Load all metadata from storage.

Expand source code
def load(self):
    """ Load all metadata from storage. """
    for name in self.roles.keys():
        try:
            self.load_metadata(name)
        except RuggedMetadataError:
            error = "Failed to load all metadata."
            log.error(error)
            raise RuggedMetadataError(error)
    log.debug("Loaded all metadata.")
def load_metadata(self, name: str)

load a role's metadata from storage.

Expand source code
def load_metadata(self, name: str):
    """ load a role's metadata from storage. """
    metadata_file = self._get_metadata_path(name)
    try:
        self.roles[name] = self.roles[name].from_file(metadata_file)
        message = f"Loaded metadata for '{ name }' role from "\
                  f"'{ metadata_file }'."
        log.debug(message)
    except TypeError as e:
        log_exception(e)
        error = f"Failed to load metadata for '{ name }' role from "\
                f"'{ metadata_file }'."
        raise RuggedMetadataError(error)
def remove_targets(self, targets)

Remove given targets from the targets metadata.

Expand source code
def remove_targets(self, targets):
    """ Remove given targets from the targets metadata. """
    removed_targets = []
    for target in targets:
        try:
            del self.roles["targets"].signed.targets[target]
            log.info(f"Removed target '{ target }' from the repository.")
            self._delete_removed_target(target)
            removed_targets.append(target)
        except Exception as e:
            log_exception(e)
            warning = f"Failed to remove target '{ target }' from the "\
                      "repository."
            log.warning(warning)
    if removed_targets:
        self.roles["targets"].signed.version += 1
    return removed_targets
def status(self)
Expand source code
def status(self):
    targets = self.roles['targets'].signed.targets
    repo_status = {
        'targets': {
            'count': len(targets),
            'size': sum(target.length for target in targets.values()),
        },
        'roles': {},
    }
    for role_name, role_info in self.roles.items():
        repo_status['roles'][role_name] = {
            'signatures': len(role_info.signatures),
            'threshold': self.roles['root'].signed.roles[role_name].threshold,
            'version': role_info.signed.version,
            'tuf_spec': role_info.signed.spec_version,
            'expires': role_info.signed.expires,
        }
        repo_status['roles'][role_name]['keys'] = {}
        role_key_ids = self.roles['root'].signed.roles[role_name].keyids
        for key_name, key_info in self.keys.items():
            if key_info['keyid'] in role_key_ids:
                key_types = key_info['keyval'].keys()
                if 'private' in key_types:
                    key_path = self._get_signing_key_path(key_name, role_name)
                else:
                    key_path = self._get_verification_key_path(key_name)
                repo_status['roles'][role_name]['keys'][key_name] = {
                    'types': ", ".join(key_types),
                    'scheme': key_info['scheme'],
                    'key_path': key_path
                }
    return repo_status
def update_snapshot(self)

Update snapshot to account for changed targets metadata.

Expand source code
def update_snapshot(self):
    """ Update snapshot to account for changed targets metadata. """
    self.roles["snapshot"].signed.meta["targets.json"].version = self.roles[  # noqa: E501
        "targets"
    ].signed.version
    self.roles["snapshot"].signed.version += 1
    log.info("Updated snapshot metadata.")
def update_timestamp(self)

Update timestamp to account for changed snapshot metadata.

Expand source code
def update_timestamp(self):
    """ Update timestamp to account for changed snapshot metadata. """
    self.roles["timestamp"].signed.snapshot_meta.version = self.roles[
        "snapshot"
    ].signed.version
    self.roles["timestamp"].signed.version += 1
    log.info("Updated timestamp metadata.")
def write(self)

Write all metadata to storage.

Expand source code
def write(self):
    """ Write all metadata to storage. """
    for name in self.roles.keys():
        result = self.write_metadata(name)
        if not result:
            return False
    return True
def write_metadata(self, role_name: str)

Write a role's signed metadata to storage.

Expand source code
def write_metadata(self, role_name: str):
    """ Write a role's signed metadata to storage. """
    PRETTY = JSONSerializer(compact=False)
    path = self._get_metadata_path(role_name)
    try:
        self._sign_metadata(role_name)
        self.roles[role_name].to_file(path, serializer=PRETTY)
    except Exception as e:
        log_exception(e)
        error = f"Failed to write '{ role_name }' metadata to file '{ path }'."
        log.error(error)
        return False
    log.debug(f"Wrote '{ role_name }' metadata to file '{ path }'.")
    return True