Module rugged.tuf.repo

Classes

class RuggedRepository
Expand source code
class RuggedRepository():
    """
    An opinionated TUF repository using the low-level TUF Metadata API.

    @TODO: Implement support for consistent snapshots. (#100)
    @TODO: Implement support for delegated targets. (#36)
    """

    def __init__(self) -> None:
        self.keys: Dict[str, Dict[str, Any]] = {}
        self.roles: Dict[str, Metadata[Any]] = {}
        try:
            self._init_dirs()
            self._init_keys()
            self._init_top_level_roles()
            log.debug("Instantiated repository.")
        except Exception as e:
            log_exception(e)
            log.error("Failed to instantiate repository.")
            raise RuggedRepositoryError()

    def initialize(self) -> None:
        """ Initialize signed TUF metadata. """
        self.update_root(initializing=True)
        self.update_targets(initializing=True)
        self.update_snapshot(initializing=True)
        self.update_timestamp(initializing=True)

    def load(self) -> None:
        """ Load all metadata from storage. """
        for role_name in TOP_LEVEL_ROLE_NAMES:
            try:
                self.load_metadata(role_name)
                if hashed_bins_is_enabled():
                    self.load_hashed_bins_metadata()
            except (RuggedMetadataError, RuggedStorageError) as e:
                log_exception(e)
                error = f"Failed to load '{role_name}' metadata."
                log.error(error)
                raise RuggedMetadataError(error)
        log.debug("Loaded all metadata.")

    def load_hashed_bins_metadata(self) -> None:
        """ Load hashed bins metadata. """
        # We want to load from metadata from storage. However, if the
        # configured number of hashed bins has changed, the initialized roles
        # will be incorrect. So we delete those, load 'bins.json', then load
        # the individual 'bin_n' metadata from there.
        self.delete_initial_hashed_bin_roles()
        self.load_metadata('bins')
        current_bins = self.roles['bins'].signed.delegations.roles
        for bin_name, bin_n in current_bins.items():
            self.load_metadata(bin_name)

    def delete_initial_hashed_bin_roles(self) -> None:
        """ Delete initial hashed bin roles. """
        current_bins = self.roles['bins'].signed.delegations.roles
        for bin_name, bin_n in current_bins.items():
            self.delete_metadata(bin_name)

    def load_metadata_for_hashed_bins_targets(self, targets: List[str]) -> None:
        """ Load only the metadata required for a given set of targets. """
        role_names = list(self.roles.keys())
        target_hashed_bins = [find_hash_bin(target) for target in targets]
        # Remove all hashed bins except those for the targets provided.
        for bin_n_name, bin_n_hash_prefixes in generate_hash_bins():
            if bin_n_name not in target_hashed_bins:
                role_names.remove(bin_n_name)
        for role_name in role_names:
            try:
                self.load_metadata(role_name)
            except (RuggedMetadataError, RuggedStorageError) as e:
                log_exception(e)
                error = f"Failed to load '{role_name}' metadata."
                log.error(error)
                raise RuggedMetadataError(error)
        log.debug("Loaded all metadata for targets.")

    def load_metadata(self, role_name: str) -> None:
        """ Load a role's metadata from storage. """
        try:
            path = self._get_metadata_path(role_name)
            self.roles[role_name] = load_metadata_from_file(role_name, path)
        except StorageError as e:
            log_exception(e)
            error = f"Error loading '{role_name}' metadata from storage."
            log.error(error)
            raise RuggedStorageError(error)

    def delete_metadata(self, role_name: str) -> None:
        """ Delete a role's metadata. """
        log.debug(f"Deleting '{role_name}' role from repository.")
        del self.roles[role_name]

    def delete_metadata_file(self, role_name: str) -> None:
        """ Delete a role's metadata file from storage. """
        path = self._get_metadata_path(role_name)
        log.debug(f"Deleting metadata file: {path}")
        remove(path)

    def _get_metadata_path(self, role_name: str, new: bool = False) -> str:
        """ Determine the path for a given role's metadata file. """
        metadata_dir = config['repo_metadata_path'].get()
        filename = self._get_metadata_filename(role_name, new)
        return path.join(metadata_dir, filename)

    def _get_metadata_filename(self, role_name: str, new: bool = False) -> str:
        """ Determine the filename for a given role's metadata. """
        filename = f"{role_name}.json"
        # Timestamp metadata doesn't use consistent snapshots.
        if role_name == "timestamp":
            return filename
        # We need to special-case root here, since it should always have the
        # version prefix. This is what will allow clients to find and validate
        # the chain of root metadata that links the original (shipped) root to
        # its current version.
        if role_name == 'root' or config['consistent_snapshot'].get():
            if new:
                # Use the version in the role metadata that has already been incremented.
                filename = f"{self.roles[role_name].signed.version}.{filename}"
            else:
                # Otherwise, find the most recent one that exists in storage (eg. on disk).
                filename = self._get_versioned_metadata_filename(filename)
        return filename

    def _get_versioned_metadata_filename(self, filename: str) -> str:
        """ Determine the versioned filename for a given role's metadata. """
        metadata_dir = config['repo_metadata_path'].get()
        chdir(metadata_dir)
        files = glob(f"*.{filename}")
        # Sort numbers naturally. Based on magic from https://stackoverflow.com/a/33159707.
        files.sort(key=lambda f: int(sub(r'\D', '', f)))
        if not files:
            raise FileNotFoundError
        # Return the latest version.
        return files[-1]

    def write(self) -> bool:
        """ Write all metadata to storage. """
        for role_name in self.roles.keys():
            result = self.write_metadata(role_name)
            if not result:
                return False
        return True

    def write_metadata(self, role_name: str) -> bool:
        """ Write a role's signed metadata to storage. """
        path = self._get_metadata_path(role_name, new=True)
        metadata = self.roles[role_name]
        return write_metadata_to_file(role_name, metadata, path)

    def _key_index(self, role_name: str, key_name: str) -> str:
        """ Return a unique index based on a key's role and name. """
        return f"{role_name}{RUGGED_KEY_INDEX_DELIMITER}{key_name}"

    def _key_name(self, key_index: str) -> str:
        """ Return a key's name based on its index. """
        return key_index.split(RUGGED_KEY_INDEX_DELIMITER)[1]

    def sign_metadata(self, role_name: str) -> None:
        """ Sign a role's metadata. """
        # This needs to use KeyManager because the root role may not be defined yet.
        for key_name in KeyManager().find_keys_for_role(role_name):
            key = self._get_key_for_role_key(role_name, key_name)
            if not key:
                raise RuggedMetadataError(f"No key available to sign '{role_name}' metadata.")
            try:
                signer = SSlibSigner(key)
                self.roles[role_name].sign(signer, append=True)
            except Exception as e:
                log_exception(e)
                error = f"Failed to sign '{role_name}' metadata with '{key_name}' key."
                log.error(error)
                raise RuggedMetadataError(error)
            log.debug(f"Signed '{role_name}' metadata with '{key_name}' key.")

    def add_targets(self) -> List[str]:
        """ Add any inbound targets to the targets metadata. """
        added_targets = []
        for inbound_target in get_inbound_targets():
            self.add_target(inbound_target)
            added_targets.append(inbound_target)
        if added_targets:
            if hashed_bins_is_enabled():
                hashed_bins_to_update = get_bins_for_targets(added_targets)
                for bin_n_name in hashed_bins_to_update:
                    self.update_hashed_bin(bin_n_name)
                # All targets are delegated to hashed bins. As a result,
                # there's no need to update the version of the targets
                # metadata.
            else:
                self.update_targets()
        return added_targets

    def add_target(self, target: str) -> None:
        """ Add a single target to the targets (or hashed bin) metadata. """
        try:
            moved_target_path = move_inbound_target_to_targets_dir(target)
            target_file_info = TargetFile.from_file(target, moved_target_path)
            self.add_target_to_metadata(target, target_file_info)
            if config['delete_targets_after_signing'].get():
                delete_target_after_signing(moved_target_path)
        except Exception as e:
            log_exception(e)
            log.warning(f"Failed to add target '{target}' to the repository.")

    def add_target_to_metadata(self, target: str, target_file_info: TargetFile) -> None:
        """ Add a target to the targets (or hashed bin) metadata. """
        targets_role = "targets"
        if hashed_bins_is_enabled():
            targets_role = find_hash_bin(target)
        self.roles[targets_role].signed.targets[target] = target_file_info
        log.info(f"Added target '{target}' to '{targets_role}' role.")

    def remove_targets(self, targets: List[str]) -> List[str]:
        """ Remove given targets from targets (or hashed bin) metadata. """
        removed_targets = []
        for target in targets:
            if self.remove_target(target):
                removed_targets.append(target)
        if removed_targets:
            if hashed_bins_is_enabled():
                hashed_bins_to_update = get_bins_for_targets(removed_targets)
                for bin_n_name in hashed_bins_to_update:
                    self.update_hashed_bin(bin_n_name)
                # All targets are delegated to hashed bins. As a result,
                # there's no need to update the version of the targets
                # metadata.
            else:
                self.update_targets()
        return removed_targets

    def remove_target(self, target: str) -> bool:
        """ Remove a single target from the targets (or hashed bin) metadata. """
        try:
            targets_role = "targets"
            if hashed_bins_is_enabled():
                targets_role = find_hash_bin(target)
            del self.roles[targets_role].signed.targets[target]
            log.info(f"Removed target '{target}' from the '{targets_role}' role.")
            delete_removed_target(target)
            return True
        except Exception as e:
            log_exception(e)
            warning = f"Failed to remove target '{target}' from the "\
                      "repository."
            log.warning(warning)
        return False

    def update_targets(self, initializing: bool = False) -> None:
        """ Update targets to account for new targets, or rotated keys. """
        if hashed_bins_is_enabled():
            self.update_hashed_bins(initializing=initializing)
        if not initializing:
            self.roles["targets"].signed.version += 1
        self.update_metadata_expiry("targets")
        self.roles["targets"].signatures.clear()
        self.sign_metadata("targets")
        log.info("Updated targets metadata.")

    def update_snapshot(self, initializing: bool = False) -> None:
        """ Update snapshot to account for changed targets metadata, or rotated keys. """
        self.roles["snapshot"].signed.meta["targets.json"] = get_metafile_info(self.roles['targets'])
        if hashed_bins_is_enabled():
            self.update_hashed_bin_versions_in_snapshot()
        if not initializing:
            self.roles["snapshot"].signed.version += 1
        self.update_metadata_expiry("snapshot")
        self.roles["snapshot"].signatures.clear()
        self.sign_metadata("snapshot")
        log.info("Updated snapshot metadata.")

    def update_timestamp(self, initializing: bool = False) -> None:
        """ Update timestamp to account for changed snapshot metadata info, or rotated keys. """
        self.roles["timestamp"].signed.snapshot_meta = get_metafile_info(self.roles['snapshot'])
        if not initializing:
            self.roles["timestamp"].signed.version += 1
        self.update_metadata_expiry("timestamp")
        self.roles["timestamp"].signatures.clear()
        self.sign_metadata("timestamp")
        log.info("Updated timestamp metadata.")

    def update_root(self, initializing: bool = False) -> None:
        """ Update root to account for newly added or removed keys. """
        if initializing:
            try:
                log.debug("Trying to initialize 'root' metadata from disk.")
                self.load_metadata("root")
                log.warn("Initialized 'root' metadata from disk.")
                log.info("If you did not intend to initialize with existing 'root' metadata then delete '1.root.json' "
                         "and re-run this command.")
            except FileNotFoundError:
                log.debug("Did not find 'root' metadata on disk to load.")
                pass
            # If we've loaded signed root metadata from disk, don't overwrite it.
            if self.roles["root"].signatures:
                return
        if not initializing:
            self.roles["root"].signed.version += 1
        self.update_metadata_expiry("root")
        self.roles["root"].signatures.clear()
        self.sign_metadata("root")
        log.info("Updated root metadata.")

    def status(self) -> Dict[str, Dict[str, Any]]:
        repo_status: Dict[str, Dict[str, Any]] = {
            'roles': {},
        }
        if 'targets' in self.roles:
            targets = self.roles['targets'].signed.targets
            repo_status['targets'] = {
                'count': len(targets),
                'size': sum(target.length for target in targets.values()),
            }
        for role_name, role_info in self.roles.items():
            # Ignore hashed bin roles, since the code below assumes we're only
            # showing the status of roles signed by root.
            # @TODO: Figure out a better way to do this. Maybe skip delegated roles?
            # @TODO: Alternatively, figure out how to report the status of hashed bins succinctly.
            if role_name[:4] in ['bins', 'bin_']:
                continue
            repo_status['roles'][role_name] = {
                'signatures': len(role_info.signatures),
                'version': role_info.signed.version,
                'tuf_spec': role_info.signed.spec_version,
                'expires': role_info.signed.expires.replace(tzinfo=timezone.utc).isoformat(),
            }
            if 'root' in self.roles:
                threshold = self.roles['root'].signed.roles[role_name].threshold
                repo_status['roles'][role_name]['threshold'] = threshold
            repo_status['roles'][role_name]['keys'] = {}
            for key_index, key in self.keys.items():
                key_name = self._key_name(key_index)
                # @TODO: In _role_has_signing_capability() we look up keys in
                # the Root (or Targets) metadata. That seems more correct than
                # using the signatures themselves.
                if key['keyid'] not in role_info.signatures.keys():
                    continue
                key_types = list(key['keyval'].keys())
                if 'private' in key_types:
                    key_path = KeyManager().get_key_path(key_name, role_name, 'signing')
                else:
                    key_path = KeyManager().get_key_path(key_name, role_name, 'verification')
                repo_status['roles'][role_name]['keys'][key_name] = {
                    'types': key_types,
                    'scheme': key['scheme'],
                    'key_path': key_path
                }
        return repo_status

    def get_keys_by_role(self) -> Dict[str, str]:
        """ Return a dictionary of roles and their associated keys. """
        keys_by_role = {}
        for role_name, role in self.roles.items():
            keys_by_role[role_name] = []
            for keyid in role.signatures:
                for key_index, key in self.keys.items():
                    key_name = self._key_name(key_index)
                    if key['keyid'] == keyid:
                        keys_by_role[role_name].append(key_name)
        return keys_by_role

    def _init_dirs(self) -> None:
        """ Ensure all repository directories exist. """
        dirs = {
            config['repo_metadata_path'].get(): 0o755,
            config['repo_targets_path'].get(): 0o755,
        }
        for dir, mode in dirs.items():
            try:
                makedirs(dir, mode=mode, exist_ok=True)
            except PermissionError as e:
                log_exception(e)
                raise RuggedStorageError

    def _init_keys(self) -> None:
        """ Initialize a dictionary of keys. """
        for role_name, key_names in KeyManager().find_keys().items():
            for key_name in key_names:
                self._add_key_to_keychain(role_name, key_name)

    def _init_top_level_roles(self) -> None:
        """ Create all top-level metadata objects. """
        self._init_targets_role()
        self._init_snapshot_role()
        self._init_timestamp_role()
        self._init_root_role()

    def _init_targets_role(self) -> None:
        """ Create targets metadata object. """
        expiry = config['roles']['targets']['expiry'].get(float)
        log.debug(f"Setting 'targets' metadata expiry to {expiry}.")
        self.roles["targets"] = Metadata[Targets](
            signed=Targets(
                version=1,
                spec_version=get_spec_version_string(),
                expires=seconds_from_now(expiry),
                targets={},
            ),
            signatures=OrderedDict(),
        )
        log.debug("Initialized 'targets' metadata.")
        if hashed_bins_is_enabled():
            self._init_bin_roles()

    def _init_snapshot_role(self) -> None:
        """ Create snapshot metadata object. """
        expiry = config['roles']['snapshot']['expiry'].get(float)
        log.debug(f"Setting 'snapshot' metadata expiry to {expiry}.")
        self.roles["snapshot"] = Metadata[Snapshot](
            Snapshot(
                version=1,
                spec_version=get_spec_version_string(),
                expires=seconds_from_now(expiry),
                meta={"targets.json": MetaFile(version=1)},
            ),
            signatures=OrderedDict(),
        )
        if hashed_bins_is_enabled():
            self._add_bin_roles_to_snapshot()

        log.debug("Initialized 'snapshot' metadata.")

    def _init_timestamp_role(self) -> None:
        """ Create timestamp metadata object. """
        expiry = config['roles']['timestamp']['expiry'].get(float)
        log.debug(f"Setting 'timestamp' metadata expiry to {expiry}.")
        self.roles["timestamp"] = Metadata[Timestamp](
            Timestamp(
                version=1,
                spec_version=get_spec_version_string(),
                expires=seconds_from_now(expiry),
                snapshot_meta=get_metafile_info(self.roles['snapshot'])
            ),
            signatures=OrderedDict(),
        )
        log.debug("Initialized 'timestamp' metadata.")

    def _init_root_role(self) -> None:
        """ Create root metadata object. """
        try:
            log.debug("Collecting keys for 'root' metadata.")
            keys = self._get_repo_keys_for_root_role()
            log.debug("Collecting roles for 'root' metadata.")
            roles = self._get_repo_roles_for_root_role()
            self.roles["root"] = Metadata[Root](
                signed=Root(
                    version=1,
                    spec_version=get_spec_version_string(),
                    keys=keys,
                    roles=roles,
                    consistent_snapshot=False,
                ),
                signatures=OrderedDict(),
            )
        except ValueError as e:
            log_exception(e)
            error = "Failed to initialize Root metadata during TUF repository initialization."
            raise RuggedMetadataError(error)

        self.update_metadata_expiry("root")
        log.debug("Initialized 'root' metadata.")

    def update_metadata_expiry(self, role_name: str, config_role: str = '') -> None:
        """ Update expiry for a given role. """
        if config_role == '':
            config_role = role_name
        expiry = config['roles'][config_role]['expiry'].get(float)
        log.debug(f"Setting '{role_name}' metadata expiry to {expiry}.")
        self.roles[role_name].signed.expires = seconds_from_now(expiry)

    def _get_repo_keys_for_root_role(self) -> Dict[str, Key]:
        """ Return all known keys, keyed by ID. """
        repo_keys = {}
        for key in self.keys.values():
            if not key:
                # @TODO: Re-visit once #253 lands.
                raise RuggedKeyError("Empty key returned from keychain.")
            try:
                log.debug(f"Loading key with ID: {key['keyid']}")
                repo_keys[key["keyid"]] = Key.from_securesystemslib_key(key)
            except TypeError as e:
                log_exception(e)
                error = "Failed to generate key metadata during TUF repository initialization."
                raise RuggedMetadataError(error)
        if not repo_keys:
            raise RuggedKeyError("No keys found for Root role.")
        return repo_keys

    def _get_repo_roles_for_root_role(self) -> Dict[str, Role]:
        """ Return all known roles, keyed by ID. """
        repo_roles = {}
        for role_name, role_info in config['roles'].get().items():
            if role_name not in TOP_LEVEL_ROLE_NAMES:
                continue
            role_keys = []
            # This needs to use KeyManager because the root role isn't defined yet.
            keys = KeyManager().find_keys()
            if role_name in keys:
                for key_name in keys[role_name]:
                    keyid = self._get_keyid_for_role_key(role_name, key_name)
                    if keyid:
                        log.debug(f"Adding 'key_name' key to '{role_name}' role with KeyID: {keyid}")
                        role_keys.append(keyid)
            else:
                if role_name != Root.type:
                    message = f"No keys found for '{role_name}'."
                    log.error(message)
                    raise RuggedKeyError(message)
            try:
                threshold = role_info['threshold']
                log.debug(f"Setting '{role_name}' signature threshold to {threshold}.")
                repo_roles[role_name] = Role(role_keys, threshold=threshold)
            except ValueError as e:
                log_exception(e)
                error = "Failed to generate role metadata during TUF "\
                        "repository initialization."
                raise RuggedMetadataError(error)
        return repo_roles

    def _get_key_for_role_key(self, role_name: str, key_name: str) -> Dict | None:
        """ Return the named key for a given role. """
        key_index = self._key_index(role_name, key_name)
        if key_index not in self.keys:
            log.debug(f"No '{key_name}' key in keychain for '{role_name}' role.")
            return
        key = self.keys[key_index]
        if not key:
            raise RuggedKeyError(f"Error loading '{key_name}' key for '{role_name}' role.")
        return key

    def _get_keyid_for_role_key(self, role_name: str, key_name: str) -> str | None:
        """ Return the KeyID for a named key for a given role. """
        key = self._get_key_for_role_key(role_name, key_name)
        if not key:
            return
        if 'keyid' not in key:
            raise RuggedKeyError(f"Error finding keyid '{key_name}' key for '{role_name}' role.")
        return key['keyid']

    ################
    # Key rotation #
    ################

    def rotate_keys(
            self,
            keys_to_add: List[Tuple[str, str]],
            keys_to_remove: List[Tuple[str, str]]
    ) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]:
        """ Update keys and re-sign metadata. """
        added_keys = self._add_keys(keys_to_add)
        removed_keys = self._remove_keys(keys_to_remove)
        roles_to_regenerate_metadata = []
        for role, key in added_keys + removed_keys:
            if role not in roles_to_regenerate_metadata:
                roles_to_regenerate_metadata.append(role)
        # Rotating a 'targets' key requires regenerating all role metadata.
        if 'targets' in roles_to_regenerate_metadata:
            self.update_root()
            self.write_metadata('root')
            self.update_targets()
            self.write_metadata('targets')
            self.update_snapshot()
            self.write_metadata('snapshot')
            self.update_timestamp()
            self.write_metadata('timestamp')
        # Rotating a 'snapshot' key requires regenerating root, timestamp and snapshot metadata.
        elif 'snapshot' in roles_to_regenerate_metadata:
            self.update_root()
            self.write_metadata('root')
            self.update_snapshot()
            self.write_metadata('snapshot')
            self.update_timestamp()
            self.write_metadata('timestamp')
        # Rotating 'timestamp' key requires regenerating root and timestamp metadata.
        elif 'timestamp' in roles_to_regenerate_metadata:
            self.update_root()
            self.write_metadata('root')
            self.update_timestamp()
            self.write_metadata('timestamp')
        # Rotating 'root' key only requires regenerating root metadata.
        elif 'root' in roles_to_regenerate_metadata:
            self.update_root()
            self.write_metadata('root')
        return (added_keys, removed_keys)

    def _add_keys(self, keys_to_add: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
        """ Generate keys and add them to roles in root metadata. """
        added_keys = []
        for (role_name, key_name) in keys_to_add:
            KeyManager().generate_keypair(key_name, role_name)
            self._add_key_to_keychain(role_name, key_name)
            self._add_key_to_root(role_name, key_name)
            added_keys.append((role_name, key_name))
        return added_keys

    def _add_key_to_keychain(self, role_name: str, key_name: str) -> None:
        """ Add a key to the keychain. """
        # In #253, we should refer to a keypair, rather than a single key.
        log.debug(f"Adding '{key_name}' key to keychain.")
        key_index = self._key_index(role_name, key_name)
        role_keys = KeyManager().load_keys(key_name, role_name)
        if not role_keys:
            log.debug(f"No keys loaded for '{key_name}' keypair for '{role_name}' role.")
        else:
            self.keys[key_index] = role_keys

    def _add_key_to_root(self, role_name: str, key_name: str) -> None:
        """ Add a key to a role in root metadata. """
        log.debug(f"Adding '{key_name}' key to '{role_name}' role in root metadata.")
        role_key = self._get_key_for_role_key(role_name, key_name)
        if not role_key:
            return
        key = Key.from_securesystemslib_key(role_key)
        self.roles['root'].signed.add_key(key, role_name)

    def _remove_keys(self, keys_to_remove: List[Tuple[str, str]]) -> List[Tuple[str, str]]:
        """ Delete keys and remove them from roles in root metadata. """
        removed_keys = []
        for (role_name, key_name) in keys_to_remove:
            KeyManager().delete_keypair(key_name, role_name)
            self._remove_key_from_root(role_name, key_name)
            self._remove_key_from_keychain(role_name, key_name)
            removed_keys.append((role_name, key_name))
        return removed_keys

    def _remove_key_from_root(self, role_name: str, key_name: str) -> None:
        """ Remove a key from a role in root metadata. """
        log.debug(f"Removing '{key_name}' key from '{role_name}' role in root metadata.")
        key_id = self._get_keyid_for_role_key(role_name, key_name)
        self.roles['root'].signed.revoke_key(key_id, role_name)

    def _remove_key_from_keychain(self, role_name: str, key_name: str) -> None:
        """ Remove a key from the keychain. """
        log.debug(f"Removing '{key_name}' key from the keychain.")
        key_index = self._key_index(role_name, key_name)
        del self.keys[key_index]

    #########################
    # Hashed bin delegation #
    #########################

    # See: https://github.com/theupdateframework/python-tuf/blob/v2.1.0/examples/manual_repo/hashed_bin_delegation.py

    def update_hashed_bins(self, initializing: bool = False) -> None:
        """ Update all hashed bin roles to account for rotated keys. """
        for bin_n_name, bin_n_role in self.roles["bins"].signed.delegations.roles.items():
            self.update_hashed_bin(bin_n_name, initializing=initializing)
        self.update_hashed_bin("bins", initializing=initializing)

    def update_hashed_bin(self, bin_name: str, initializing: bool = False) -> None:
        """ Update a bin role to account for new or removed targets, or rotated keys. """
        if not initializing:
            new_version = self.roles[bin_name].signed.version + 1
            log.debug(f"Updating hashed bin '{bin_name}' metadata to version '{new_version}'.")
            self.roles[bin_name].signed.version = new_version
        self.update_metadata_expiry(bin_name, "targets")
        self.roles[bin_name].signatures.clear()
        self.sign_bin_metadata(bin_name)
        log.info(f"Updated hashed bins '{bin_name}' metadata.")

    def update_hashed_bin_versions_in_snapshot(self) -> None:
        """ Update snapshot metadata to reflect current hashed bin metafile info. """
        for bin_n_name, bin_n_role in self.roles["bins"].signed.delegations.roles.items():
            delete_bin_n_metadata = False
            if not self.roles[bin_n_name].signatures:
                self.load_metadata(bin_n_name)
                delete_bin_n_metadata = True
            self.roles["snapshot"].signed.meta[f"{bin_n_name}.json"] = get_metafile_info(self.roles[bin_n_name])
            if delete_bin_n_metadata:
                self.delete_metadata(bin_n_name)
        self.roles["snapshot"].signed.meta["bins.json"] = get_metafile_info(self.roles['bins'])

    def sign_bin_metadata(self, bin_name: str) -> None:
        """ Sign bin role metadata. """
        key_name = self._hashed_bins_key_name()
        signer = self._hashed_bins_signer(key_name)
        try:
            self.roles[bin_name].sign(signer, append=True)
            log.debug(f"Signed '{bin_name}' metadata with '{key_name}' key.")
        except Exception as e:
            log_exception(e)
            error = f"Failed to sign '{bin_name}' metadata with '{key_name}' key."
            log.error(error)
            raise RuggedMetadataError(error)

    def _hashed_bins_signer(self, key_name: str) -> SSlibSigner:
        """ Return an SSlibSigner using the key specified by name. """
        # @TODO: We are conflating the key name and role name here. Figure out
        # whether this is really the best way to make this work.
        role_name = key_name
        key = self._get_key_for_role_key(role_name, key_name)
        if not key:
            raise RuggedKeyError(f"Could '{key_name}' signing key for '{role_name}' role.")
        return SSlibSigner(key)

    def _hashed_bins_key_name(self) -> str:
        """ Return the name of the key to use for hashed bins. """
        return config['hashed_bins_key_name'].get()

    def _init_bin_roles(self) -> None:
        """ Create metadata objects for hashed bins. """
        self._init_bins_role()
        bin_count = 0
        for bin_n_name, bin_n_hash_prefixes in generate_hash_bins():
            self._init_bin_n_role(bin_n_name, bin_n_hash_prefixes)
            bin_count += 1
        number_of_bins = get_number_of_bins()
        if bin_count != number_of_bins:
            log.debug(f"Created {bin_count} bins, whereas 'number_of_bins: {number_of_bins}' was configured.")

    def _init_bins_role(self) -> None:
        """ Create metadata objects for 'bins' role. """
        # Create preliminary delegating targets role (bins) and add public key for
        # delegated targets (bin_n) to key store.
        expiry = config['roles']['targets']['expiry'].get(float)
        self.roles["bins"] = Metadata[Targets](
            signed=Targets(
                version=1,
                spec_version=get_spec_version_string(),
                expires=seconds_from_now(expiry),
                delegations=Delegations(
                    keys=self._bins_keys(),
                    roles={},
                )
            ),
            signatures=OrderedDict(),
        )
        log.debug("Initialized 'bins' metadata.")
        # Update top-level targets role with delegation details for the "bins"
        # delegated targets role.
        self.roles["targets"].signed.delegations = Delegations(
            keys=self._bins_keys(),
            roles={
                "bins": DelegatedRole(
                    name="bins",
                    keyids=self._bin_n_keyids(),
                    threshold=1,
                    terminating=False,
                    # Delegate all target paths to 'bins'. From there we'll
                    # split it up by path hash prefixes.
                    paths=['*'],
                ),
            },
        )
        log.debug("Added top-level 'bins' delegated role to 'targets' metadata, in order to handle hashed bin "
                  "delegation.")

    def _init_bin_n_role(self, bin_n_name: str, bin_n_hash_prefixes: List[str]) -> None:
        """ Create metadata object for 'bin-n' role. """
        # Create delegated targets role (bin_n)
        expiry = config['roles']['targets']['expiry'].get(float)
        self.roles[bin_n_name] = Metadata[Targets](
            signed=Targets(
                version=1,
                spec_version=get_spec_version_string(),
                expires=seconds_from_now(expiry),
            ),
            signatures=OrderedDict(),
        )
        log.debug(f"Initialized '{bin_n_name}' metadata.")
        # Update delegating targets role (bins) with delegation details for this
        # delegated targets role (bin_n).
        self.roles["bins"].signed.delegations.roles[bin_n_name] = DelegatedRole(
            name=bin_n_name,
            keyids=self._bin_n_keyids(),
            threshold=1,
            terminating=True,
            path_hash_prefixes=bin_n_hash_prefixes,
        )
        log.debug("Delegated the following target file hash prefixes to "
                  f"'{bin_n_name}': {', '.join(bin_n_hash_prefixes)}.")

    def _bins_keys(self) -> Dict[str, Key]:
        """ Return the key(s) to use for the 'bins' role. """
        key_name = self._hashed_bins_key_name()
        bin_n_key = Key.from_securesystemslib_key(self.keys[f"{key_name}:{key_name}"])
        return {self.keys[f"{key_name}:{key_name}"]['keyid']: bin_n_key}

    def _bin_n_keyids(self) -> List[str]:
        """ Return the key IDs to use for 'bin_n' roles. """
        key_name = self._hashed_bins_key_name()
        return [self.keys[f"{key_name}:{key_name}"]['keyid']]

    def _add_bin_roles_to_snapshot(self) -> None:
        """ Register bin roles with snapshot metadata. """
        self.roles["snapshot"].signed.meta["bins.json"] = MetaFile(version=1)
        for bin_n_name, bin_n_hash_prefixes in generate_hash_bins():
            self.roles["snapshot"].signed.meta[f"{bin_n_name}.json"] = MetaFile(version=1)

An opinionated TUF repository using the low-level TUF Metadata API.

@TODO: Implement support for consistent snapshots. (#100) @TODO: Implement support for delegated targets. (#36)

Methods

def add_target(self, target: str) ‑> None
Expand source code
def add_target(self, target: str) -> None:
    """ Add a single target to the targets (or hashed bin) metadata. """
    try:
        moved_target_path = move_inbound_target_to_targets_dir(target)
        target_file_info = TargetFile.from_file(target, moved_target_path)
        self.add_target_to_metadata(target, target_file_info)
        if config['delete_targets_after_signing'].get():
            delete_target_after_signing(moved_target_path)
    except Exception as e:
        log_exception(e)
        log.warning(f"Failed to add target '{target}' to the repository.")

Add a single target to the targets (or hashed bin) metadata.

def add_target_to_metadata(self, target: str, target_file_info: tuf.api.metadata.TargetFile) ‑> None
Expand source code
def add_target_to_metadata(self, target: str, target_file_info: TargetFile) -> None:
    """ Add a target to the targets (or hashed bin) metadata. """
    targets_role = "targets"
    if hashed_bins_is_enabled():
        targets_role = find_hash_bin(target)
    self.roles[targets_role].signed.targets[target] = target_file_info
    log.info(f"Added target '{target}' to '{targets_role}' role.")

Add a target to the targets (or hashed bin) metadata.

def add_targets(self) ‑> List[str]
Expand source code
def add_targets(self) -> List[str]:
    """ Add any inbound targets to the targets metadata. """
    added_targets = []
    for inbound_target in get_inbound_targets():
        self.add_target(inbound_target)
        added_targets.append(inbound_target)
    if added_targets:
        if hashed_bins_is_enabled():
            hashed_bins_to_update = get_bins_for_targets(added_targets)
            for bin_n_name in hashed_bins_to_update:
                self.update_hashed_bin(bin_n_name)
            # All targets are delegated to hashed bins. As a result,
            # there's no need to update the version of the targets
            # metadata.
        else:
            self.update_targets()
    return added_targets

Add any inbound targets to the targets metadata.

def delete_initial_hashed_bin_roles(self) ‑> None
Expand source code
def delete_initial_hashed_bin_roles(self) -> None:
    """ Delete initial hashed bin roles. """
    current_bins = self.roles['bins'].signed.delegations.roles
    for bin_name, bin_n in current_bins.items():
        self.delete_metadata(bin_name)

Delete initial hashed bin roles.

def delete_metadata(self, role_name: str) ‑> None
Expand source code
def delete_metadata(self, role_name: str) -> None:
    """ Delete a role's metadata. """
    log.debug(f"Deleting '{role_name}' role from repository.")
    del self.roles[role_name]

Delete a role's metadata.

def delete_metadata_file(self, role_name: str) ‑> None
Expand source code
def delete_metadata_file(self, role_name: str) -> None:
    """ Delete a role's metadata file from storage. """
    path = self._get_metadata_path(role_name)
    log.debug(f"Deleting metadata file: {path}")
    remove(path)

Delete a role's metadata file from storage.

def get_keys_by_role(self) ‑> Dict[str, str]
Expand source code
def get_keys_by_role(self) -> Dict[str, str]:
    """ Return a dictionary of roles and their associated keys. """
    keys_by_role = {}
    for role_name, role in self.roles.items():
        keys_by_role[role_name] = []
        for keyid in role.signatures:
            for key_index, key in self.keys.items():
                key_name = self._key_name(key_index)
                if key['keyid'] == keyid:
                    keys_by_role[role_name].append(key_name)
    return keys_by_role

Return a dictionary of roles and their associated keys.

def initialize(self) ‑> None
Expand source code
def initialize(self) -> None:
    """ Initialize signed TUF metadata. """
    self.update_root(initializing=True)
    self.update_targets(initializing=True)
    self.update_snapshot(initializing=True)
    self.update_timestamp(initializing=True)

Initialize signed TUF metadata.

def load(self) ‑> None
Expand source code
def load(self) -> None:
    """ Load all metadata from storage. """
    for role_name in TOP_LEVEL_ROLE_NAMES:
        try:
            self.load_metadata(role_name)
            if hashed_bins_is_enabled():
                self.load_hashed_bins_metadata()
        except (RuggedMetadataError, RuggedStorageError) as e:
            log_exception(e)
            error = f"Failed to load '{role_name}' metadata."
            log.error(error)
            raise RuggedMetadataError(error)
    log.debug("Loaded all metadata.")

Load all metadata from storage.

def load_hashed_bins_metadata(self) ‑> None
Expand source code
def load_hashed_bins_metadata(self) -> None:
    """ Load hashed bins metadata. """
    # We want to load from metadata from storage. However, if the
    # configured number of hashed bins has changed, the initialized roles
    # will be incorrect. So we delete those, load 'bins.json', then load
    # the individual 'bin_n' metadata from there.
    self.delete_initial_hashed_bin_roles()
    self.load_metadata('bins')
    current_bins = self.roles['bins'].signed.delegations.roles
    for bin_name, bin_n in current_bins.items():
        self.load_metadata(bin_name)

Load hashed bins metadata.

def load_metadata(self, role_name: str) ‑> None
Expand source code
def load_metadata(self, role_name: str) -> None:
    """ Load a role's metadata from storage. """
    try:
        path = self._get_metadata_path(role_name)
        self.roles[role_name] = load_metadata_from_file(role_name, path)
    except StorageError as e:
        log_exception(e)
        error = f"Error loading '{role_name}' metadata from storage."
        log.error(error)
        raise RuggedStorageError(error)

Load a role's metadata from storage.

def load_metadata_for_hashed_bins_targets(self, targets: List[str]) ‑> None
Expand source code
def load_metadata_for_hashed_bins_targets(self, targets: List[str]) -> None:
    """ Load only the metadata required for a given set of targets. """
    role_names = list(self.roles.keys())
    target_hashed_bins = [find_hash_bin(target) for target in targets]
    # Remove all hashed bins except those for the targets provided.
    for bin_n_name, bin_n_hash_prefixes in generate_hash_bins():
        if bin_n_name not in target_hashed_bins:
            role_names.remove(bin_n_name)
    for role_name in role_names:
        try:
            self.load_metadata(role_name)
        except (RuggedMetadataError, RuggedStorageError) as e:
            log_exception(e)
            error = f"Failed to load '{role_name}' metadata."
            log.error(error)
            raise RuggedMetadataError(error)
    log.debug("Loaded all metadata for targets.")

Load only the metadata required for a given set of targets.

def remove_target(self, target: str) ‑> bool
Expand source code
def remove_target(self, target: str) -> bool:
    """ Remove a single target from the targets (or hashed bin) metadata. """
    try:
        targets_role = "targets"
        if hashed_bins_is_enabled():
            targets_role = find_hash_bin(target)
        del self.roles[targets_role].signed.targets[target]
        log.info(f"Removed target '{target}' from the '{targets_role}' role.")
        delete_removed_target(target)
        return True
    except Exception as e:
        log_exception(e)
        warning = f"Failed to remove target '{target}' from the "\
                  "repository."
        log.warning(warning)
    return False

Remove a single target from the targets (or hashed bin) metadata.

def remove_targets(self, targets: List[str]) ‑> List[str]
Expand source code
def remove_targets(self, targets: List[str]) -> List[str]:
    """ Remove given targets from targets (or hashed bin) metadata. """
    removed_targets = []
    for target in targets:
        if self.remove_target(target):
            removed_targets.append(target)
    if removed_targets:
        if hashed_bins_is_enabled():
            hashed_bins_to_update = get_bins_for_targets(removed_targets)
            for bin_n_name in hashed_bins_to_update:
                self.update_hashed_bin(bin_n_name)
            # All targets are delegated to hashed bins. As a result,
            # there's no need to update the version of the targets
            # metadata.
        else:
            self.update_targets()
    return removed_targets

Remove given targets from targets (or hashed bin) metadata.

def rotate_keys(self, keys_to_add: List[Tuple[str, str]], keys_to_remove: List[Tuple[str, str]]) ‑> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]
Expand source code
def rotate_keys(
        self,
        keys_to_add: List[Tuple[str, str]],
        keys_to_remove: List[Tuple[str, str]]
) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]:
    """ Update keys and re-sign metadata. """
    added_keys = self._add_keys(keys_to_add)
    removed_keys = self._remove_keys(keys_to_remove)
    roles_to_regenerate_metadata = []
    for role, key in added_keys + removed_keys:
        if role not in roles_to_regenerate_metadata:
            roles_to_regenerate_metadata.append(role)
    # Rotating a 'targets' key requires regenerating all role metadata.
    if 'targets' in roles_to_regenerate_metadata:
        self.update_root()
        self.write_metadata('root')
        self.update_targets()
        self.write_metadata('targets')
        self.update_snapshot()
        self.write_metadata('snapshot')
        self.update_timestamp()
        self.write_metadata('timestamp')
    # Rotating a 'snapshot' key requires regenerating root, timestamp and snapshot metadata.
    elif 'snapshot' in roles_to_regenerate_metadata:
        self.update_root()
        self.write_metadata('root')
        self.update_snapshot()
        self.write_metadata('snapshot')
        self.update_timestamp()
        self.write_metadata('timestamp')
    # Rotating 'timestamp' key requires regenerating root and timestamp metadata.
    elif 'timestamp' in roles_to_regenerate_metadata:
        self.update_root()
        self.write_metadata('root')
        self.update_timestamp()
        self.write_metadata('timestamp')
    # Rotating 'root' key only requires regenerating root metadata.
    elif 'root' in roles_to_regenerate_metadata:
        self.update_root()
        self.write_metadata('root')
    return (added_keys, removed_keys)

Update keys and re-sign metadata.

def sign_bin_metadata(self, bin_name: str) ‑> None
Expand source code
def sign_bin_metadata(self, bin_name: str) -> None:
    """ Sign bin role metadata. """
    key_name = self._hashed_bins_key_name()
    signer = self._hashed_bins_signer(key_name)
    try:
        self.roles[bin_name].sign(signer, append=True)
        log.debug(f"Signed '{bin_name}' metadata with '{key_name}' key.")
    except Exception as e:
        log_exception(e)
        error = f"Failed to sign '{bin_name}' metadata with '{key_name}' key."
        log.error(error)
        raise RuggedMetadataError(error)

Sign bin role metadata.

def sign_metadata(self, role_name: str) ‑> None
Expand source code
def sign_metadata(self, role_name: str) -> None:
    """ Sign a role's metadata. """
    # This needs to use KeyManager because the root role may not be defined yet.
    for key_name in KeyManager().find_keys_for_role(role_name):
        key = self._get_key_for_role_key(role_name, key_name)
        if not key:
            raise RuggedMetadataError(f"No key available to sign '{role_name}' metadata.")
        try:
            signer = SSlibSigner(key)
            self.roles[role_name].sign(signer, append=True)
        except Exception as e:
            log_exception(e)
            error = f"Failed to sign '{role_name}' metadata with '{key_name}' key."
            log.error(error)
            raise RuggedMetadataError(error)
        log.debug(f"Signed '{role_name}' metadata with '{key_name}' key.")

Sign a role's metadata.

def status(self) ‑> Dict[str, Dict[str, Any]]
Expand source code
def status(self) -> Dict[str, Dict[str, Any]]:
    repo_status: Dict[str, Dict[str, Any]] = {
        'roles': {},
    }
    if 'targets' in self.roles:
        targets = self.roles['targets'].signed.targets
        repo_status['targets'] = {
            'count': len(targets),
            'size': sum(target.length for target in targets.values()),
        }
    for role_name, role_info in self.roles.items():
        # Ignore hashed bin roles, since the code below assumes we're only
        # showing the status of roles signed by root.
        # @TODO: Figure out a better way to do this. Maybe skip delegated roles?
        # @TODO: Alternatively, figure out how to report the status of hashed bins succinctly.
        if role_name[:4] in ['bins', 'bin_']:
            continue
        repo_status['roles'][role_name] = {
            'signatures': len(role_info.signatures),
            'version': role_info.signed.version,
            'tuf_spec': role_info.signed.spec_version,
            'expires': role_info.signed.expires.replace(tzinfo=timezone.utc).isoformat(),
        }
        if 'root' in self.roles:
            threshold = self.roles['root'].signed.roles[role_name].threshold
            repo_status['roles'][role_name]['threshold'] = threshold
        repo_status['roles'][role_name]['keys'] = {}
        for key_index, key in self.keys.items():
            key_name = self._key_name(key_index)
            # @TODO: In _role_has_signing_capability() we look up keys in
            # the Root (or Targets) metadata. That seems more correct than
            # using the signatures themselves.
            if key['keyid'] not in role_info.signatures.keys():
                continue
            key_types = list(key['keyval'].keys())
            if 'private' in key_types:
                key_path = KeyManager().get_key_path(key_name, role_name, 'signing')
            else:
                key_path = KeyManager().get_key_path(key_name, role_name, 'verification')
            repo_status['roles'][role_name]['keys'][key_name] = {
                'types': key_types,
                'scheme': key['scheme'],
                'key_path': key_path
            }
    return repo_status
def update_hashed_bin(self, bin_name: str, initializing: bool = False) ‑> None
Expand source code
def update_hashed_bin(self, bin_name: str, initializing: bool = False) -> None:
    """ Update a bin role to account for new or removed targets, or rotated keys. """
    if not initializing:
        new_version = self.roles[bin_name].signed.version + 1
        log.debug(f"Updating hashed bin '{bin_name}' metadata to version '{new_version}'.")
        self.roles[bin_name].signed.version = new_version
    self.update_metadata_expiry(bin_name, "targets")
    self.roles[bin_name].signatures.clear()
    self.sign_bin_metadata(bin_name)
    log.info(f"Updated hashed bins '{bin_name}' metadata.")

Update a bin role to account for new or removed targets, or rotated keys.

def update_hashed_bin_versions_in_snapshot(self) ‑> None
Expand source code
def update_hashed_bin_versions_in_snapshot(self) -> None:
    """ Update snapshot metadata to reflect current hashed bin metafile info. """
    for bin_n_name, bin_n_role in self.roles["bins"].signed.delegations.roles.items():
        delete_bin_n_metadata = False
        if not self.roles[bin_n_name].signatures:
            self.load_metadata(bin_n_name)
            delete_bin_n_metadata = True
        self.roles["snapshot"].signed.meta[f"{bin_n_name}.json"] = get_metafile_info(self.roles[bin_n_name])
        if delete_bin_n_metadata:
            self.delete_metadata(bin_n_name)
    self.roles["snapshot"].signed.meta["bins.json"] = get_metafile_info(self.roles['bins'])

Update snapshot metadata to reflect current hashed bin metafile info.

def update_hashed_bins(self, initializing: bool = False) ‑> None
Expand source code
def update_hashed_bins(self, initializing: bool = False) -> None:
    """ Update all hashed bin roles to account for rotated keys. """
    for bin_n_name, bin_n_role in self.roles["bins"].signed.delegations.roles.items():
        self.update_hashed_bin(bin_n_name, initializing=initializing)
    self.update_hashed_bin("bins", initializing=initializing)

Update all hashed bin roles to account for rotated keys.

def update_metadata_expiry(self, role_name: str, config_role: str = '') ‑> None
Expand source code
def update_metadata_expiry(self, role_name: str, config_role: str = '') -> None:
    """ Update expiry for a given role. """
    if config_role == '':
        config_role = role_name
    expiry = config['roles'][config_role]['expiry'].get(float)
    log.debug(f"Setting '{role_name}' metadata expiry to {expiry}.")
    self.roles[role_name].signed.expires = seconds_from_now(expiry)

Update expiry for a given role.

def update_root(self, initializing: bool = False) ‑> None
Expand source code
def update_root(self, initializing: bool = False) -> None:
    """ Update root to account for newly added or removed keys. """
    if initializing:
        try:
            log.debug("Trying to initialize 'root' metadata from disk.")
            self.load_metadata("root")
            log.warn("Initialized 'root' metadata from disk.")
            log.info("If you did not intend to initialize with existing 'root' metadata then delete '1.root.json' "
                     "and re-run this command.")
        except FileNotFoundError:
            log.debug("Did not find 'root' metadata on disk to load.")
            pass
        # If we've loaded signed root metadata from disk, don't overwrite it.
        if self.roles["root"].signatures:
            return
    if not initializing:
        self.roles["root"].signed.version += 1
    self.update_metadata_expiry("root")
    self.roles["root"].signatures.clear()
    self.sign_metadata("root")
    log.info("Updated root metadata.")

Update root to account for newly added or removed keys.

def update_snapshot(self, initializing: bool = False) ‑> None
Expand source code
def update_snapshot(self, initializing: bool = False) -> None:
    """ Update snapshot to account for changed targets metadata, or rotated keys. """
    self.roles["snapshot"].signed.meta["targets.json"] = get_metafile_info(self.roles['targets'])
    if hashed_bins_is_enabled():
        self.update_hashed_bin_versions_in_snapshot()
    if not initializing:
        self.roles["snapshot"].signed.version += 1
    self.update_metadata_expiry("snapshot")
    self.roles["snapshot"].signatures.clear()
    self.sign_metadata("snapshot")
    log.info("Updated snapshot metadata.")

Update snapshot to account for changed targets metadata, or rotated keys.

def update_targets(self, initializing: bool = False) ‑> None
Expand source code
def update_targets(self, initializing: bool = False) -> None:
    """ Update targets to account for new targets, or rotated keys. """
    if hashed_bins_is_enabled():
        self.update_hashed_bins(initializing=initializing)
    if not initializing:
        self.roles["targets"].signed.version += 1
    self.update_metadata_expiry("targets")
    self.roles["targets"].signatures.clear()
    self.sign_metadata("targets")
    log.info("Updated targets metadata.")

Update targets to account for new targets, or rotated keys.

def update_timestamp(self, initializing: bool = False) ‑> None
Expand source code
def update_timestamp(self, initializing: bool = False) -> None:
    """ Update timestamp to account for changed snapshot metadata info, or rotated keys. """
    self.roles["timestamp"].signed.snapshot_meta = get_metafile_info(self.roles['snapshot'])
    if not initializing:
        self.roles["timestamp"].signed.version += 1
    self.update_metadata_expiry("timestamp")
    self.roles["timestamp"].signatures.clear()
    self.sign_metadata("timestamp")
    log.info("Updated timestamp metadata.")

Update timestamp to account for changed snapshot metadata info, or rotated keys.

def write(self) ‑> bool
Expand source code
def write(self) -> bool:
    """ Write all metadata to storage. """
    for role_name in self.roles.keys():
        result = self.write_metadata(role_name)
        if not result:
            return False
    return True

Write all metadata to storage.

def write_metadata(self, role_name: str) ‑> bool
Expand source code
def write_metadata(self, role_name: str) -> bool:
    """ Write a role's signed metadata to storage. """
    path = self._get_metadata_path(role_name, new=True)
    metadata = self.roles[role_name]
    return write_metadata_to_file(role_name, metadata, path)

Write a role's signed metadata to storage.