Module rugged.tuf.repo
Classes
class RuggedRepository
-
Expand source code
class RuggedRepository(): """ An opinionated TUF repository using the low-level TUF Metadata API. @TODO: Implement support for consistent snapshots. (#100) @TODO: Implement support for delegated targets. (#36) """ def __init__(self) -> None: self.keys: Dict[str, Dict[str, Any]] = {} self.roles: Dict[str, Metadata[Any]] = {} try: self._init_dirs() self._init_keys() self._init_top_level_roles() log.debug("Instantiated repository.") except Exception as e: log_exception(e) log.error("Failed to instantiate repository.") raise RuggedRepositoryError() def initialize(self) -> None: """ Initialize signed TUF metadata. """ self.update_root(initializing=True) self.update_targets(initializing=True) self.update_snapshot(initializing=True) self.update_timestamp(initializing=True) def load(self) -> None: """ Load all metadata from storage. """ for role_name in TOP_LEVEL_ROLE_NAMES: try: self.load_metadata(role_name) if hashed_bins_is_enabled(): self.load_hashed_bins_metadata() except (RuggedMetadataError, RuggedStorageError) as e: log_exception(e) error = f"Failed to load '{role_name}' metadata." log.error(error) raise RuggedMetadataError(error) log.debug("Loaded all metadata.") def load_hashed_bins_metadata(self) -> None: """ Load hashed bins metadata. """ # We want to load from metadata from storage. However, if the # configured number of hashed bins has changed, the initialized roles # will be incorrect. So we delete those, load 'bins.json', then load # the individual 'bin_n' metadata from there. self.delete_initial_hashed_bin_roles() self.load_metadata('bins') current_bins = self.roles['bins'].signed.delegations.roles for bin_name, bin_n in current_bins.items(): self.load_metadata(bin_name) def delete_initial_hashed_bin_roles(self) -> None: """ Delete initial hashed bin roles. """ current_bins = self.roles['bins'].signed.delegations.roles for bin_name, bin_n in current_bins.items(): self.delete_metadata(bin_name) def load_metadata_for_hashed_bins_targets(self, targets: List[str]) -> None: """ Load only the metadata required for a given set of targets. """ role_names = list(self.roles.keys()) target_hashed_bins = [find_hash_bin(target) for target in targets] # Remove all hashed bins except those for the targets provided. for bin_n_name, bin_n_hash_prefixes in generate_hash_bins(): if bin_n_name not in target_hashed_bins: role_names.remove(bin_n_name) for role_name in role_names: try: self.load_metadata(role_name) except (RuggedMetadataError, RuggedStorageError) as e: log_exception(e) error = f"Failed to load '{role_name}' metadata." log.error(error) raise RuggedMetadataError(error) log.debug("Loaded all metadata for targets.") def load_metadata(self, role_name: str) -> None: """ Load a role's metadata from storage. """ try: path = self._get_metadata_path(role_name) self.roles[role_name] = load_metadata_from_file(role_name, path) except StorageError as e: log_exception(e) error = f"Error loading '{role_name}' metadata from storage." log.error(error) raise RuggedStorageError(error) def delete_metadata(self, role_name: str) -> None: """ Delete a role's metadata. """ log.debug(f"Deleting '{role_name}' role from repository.") del self.roles[role_name] def delete_metadata_file(self, role_name: str) -> None: """ Delete a role's metadata file from storage. """ path = self._get_metadata_path(role_name) log.debug(f"Deleting metadata file: {path}") remove(path) def _get_metadata_path(self, role_name: str, new: bool = False) -> str: """ Determine the path for a given role's metadata file. """ metadata_dir = config['repo_metadata_path'].get() filename = self._get_metadata_filename(role_name, new) return path.join(metadata_dir, filename) def _get_metadata_filename(self, role_name: str, new: bool = False) -> str: """ Determine the filename for a given role's metadata. """ filename = f"{role_name}.json" # Timestamp metadata doesn't use consistent snapshots. if role_name == "timestamp": return filename # We need to special-case root here, since it should always have the # version prefix. This is what will allow clients to find and validate # the chain of root metadata that links the original (shipped) root to # its current version. if role_name == 'root' or config['consistent_snapshot'].get(): if new: # Use the version in the role metadata that has already been incremented. filename = f"{self.roles[role_name].signed.version}.{filename}" else: # Otherwise, find the most recent one that exists in storage (eg. on disk). filename = self._get_versioned_metadata_filename(filename) return filename def _get_versioned_metadata_filename(self, filename: str) -> str: """ Determine the versioned filename for a given role's metadata. """ metadata_dir = config['repo_metadata_path'].get() chdir(metadata_dir) files = glob(f"*.{filename}") # Sort numbers naturally. Based on magic from https://stackoverflow.com/a/33159707. files.sort(key=lambda f: int(sub(r'\D', '', f))) if not files: raise FileNotFoundError # Return the latest version. return files[-1] def write(self) -> bool: """ Write all metadata to storage. """ for role_name in self.roles.keys(): result = self.write_metadata(role_name) if not result: return False return True def write_metadata(self, role_name: str) -> bool: """ Write a role's signed metadata to storage. """ path = self._get_metadata_path(role_name, new=True) metadata = self.roles[role_name] return write_metadata_to_file(role_name, metadata, path) def _key_index(self, role_name: str, key_name: str) -> str: """ Return a unique index based on a key's role and name. """ return f"{role_name}{RUGGED_KEY_INDEX_DELIMITER}{key_name}" def _key_name(self, key_index: str) -> str: """ Return a key's name based on its index. """ return key_index.split(RUGGED_KEY_INDEX_DELIMITER)[1] def sign_metadata(self, role_name: str) -> None: """ Sign a role's metadata. """ # This needs to use KeyManager because the root role may not be defined yet. for key_name in KeyManager().find_keys_for_role(role_name): key = self._get_key_for_role_key(role_name, key_name) if not key: raise RuggedMetadataError(f"No key available to sign '{role_name}' metadata.") try: signer = SSlibSigner(key) self.roles[role_name].sign(signer, append=True) except Exception as e: log_exception(e) error = f"Failed to sign '{role_name}' metadata with '{key_name}' key." log.error(error) raise RuggedMetadataError(error) log.debug(f"Signed '{role_name}' metadata with '{key_name}' key.") def add_targets(self) -> List[str]: """ Add any inbound targets to the targets metadata. """ added_targets = [] for inbound_target in get_inbound_targets(): self.add_target(inbound_target) added_targets.append(inbound_target) if added_targets: if hashed_bins_is_enabled(): hashed_bins_to_update = get_bins_for_targets(added_targets) for bin_n_name in hashed_bins_to_update: self.update_hashed_bin(bin_n_name) # All targets are delegated to hashed bins. As a result, # there's no need to update the version of the targets # metadata. else: self.update_targets() return added_targets def add_target(self, target: str) -> None: """ Add a single target to the targets (or hashed bin) metadata. """ try: moved_target_path = move_inbound_target_to_targets_dir(target) target_file_info = TargetFile.from_file(target, moved_target_path) self.add_target_to_metadata(target, target_file_info) if config['delete_targets_after_signing'].get(): delete_target_after_signing(moved_target_path) except Exception as e: log_exception(e) log.warning(f"Failed to add target '{target}' to the repository.") def add_target_to_metadata(self, target: str, target_file_info: TargetFile) -> None: """ Add a target to the targets (or hashed bin) metadata. """ targets_role = "targets" if hashed_bins_is_enabled(): targets_role = find_hash_bin(target) self.roles[targets_role].signed.targets[target] = target_file_info log.info(f"Added target '{target}' to '{targets_role}' role.") def remove_targets(self, targets: List[str]) -> List[str]: """ Remove given targets from targets (or hashed bin) metadata. """ removed_targets = [] for target in targets: if self.remove_target(target): removed_targets.append(target) if removed_targets: if hashed_bins_is_enabled(): hashed_bins_to_update = get_bins_for_targets(removed_targets) for bin_n_name in hashed_bins_to_update: self.update_hashed_bin(bin_n_name) # All targets are delegated to hashed bins. As a result, # there's no need to update the version of the targets # metadata. else: self.update_targets() return removed_targets def remove_target(self, target: str) -> bool: """ Remove a single target from the targets (or hashed bin) metadata. """ try: targets_role = "targets" if hashed_bins_is_enabled(): targets_role = find_hash_bin(target) del self.roles[targets_role].signed.targets[target] log.info(f"Removed target '{target}' from the '{targets_role}' role.") delete_removed_target(target) return True except Exception as e: log_exception(e) warning = f"Failed to remove target '{target}' from the "\ "repository." log.warning(warning) return False def update_targets(self, initializing: bool = False) -> None: """ Update targets to account for new targets, or rotated keys. """ if hashed_bins_is_enabled(): self.update_hashed_bins(initializing=initializing) if not initializing: self.roles["targets"].signed.version += 1 self.update_metadata_expiry("targets") self.roles["targets"].signatures.clear() self.sign_metadata("targets") log.info("Updated targets metadata.") def update_snapshot(self, initializing: bool = False) -> None: """ Update snapshot to account for changed targets metadata, or rotated keys. """ self.roles["snapshot"].signed.meta["targets.json"] = get_metafile_info(self.roles['targets']) if hashed_bins_is_enabled(): self.update_hashed_bin_versions_in_snapshot() if not initializing: self.roles["snapshot"].signed.version += 1 self.update_metadata_expiry("snapshot") self.roles["snapshot"].signatures.clear() self.sign_metadata("snapshot") log.info("Updated snapshot metadata.") def update_timestamp(self, initializing: bool = False) -> None: """ Update timestamp to account for changed snapshot metadata info, or rotated keys. """ self.roles["timestamp"].signed.snapshot_meta = get_metafile_info(self.roles['snapshot']) if not initializing: self.roles["timestamp"].signed.version += 1 self.update_metadata_expiry("timestamp") self.roles["timestamp"].signatures.clear() self.sign_metadata("timestamp") log.info("Updated timestamp metadata.") def update_root(self, initializing: bool = False) -> None: """ Update root to account for newly added or removed keys. """ if initializing: try: log.debug("Trying to initialize 'root' metadata from disk.") self.load_metadata("root") log.warn("Initialized 'root' metadata from disk.") log.info("If you did not intend to initialize with existing 'root' metadata then delete '1.root.json' " "and re-run this command.") except FileNotFoundError: log.debug("Did not find 'root' metadata on disk to load.") pass # If we've loaded signed root metadata from disk, don't overwrite it. if self.roles["root"].signatures: return if not initializing: self.roles["root"].signed.version += 1 self.update_metadata_expiry("root") self.roles["root"].signatures.clear() self.sign_metadata("root") log.info("Updated root metadata.") def status(self) -> Dict[str, Dict[str, Any]]: repo_status: Dict[str, Dict[str, Any]] = { 'roles': {}, } if 'targets' in self.roles: targets = self.roles['targets'].signed.targets repo_status['targets'] = { 'count': len(targets), 'size': sum(target.length for target in targets.values()), } for role_name, role_info in self.roles.items(): # Ignore hashed bin roles, since the code below assumes we're only # showing the status of roles signed by root. # @TODO: Figure out a better way to do this. Maybe skip delegated roles? # @TODO: Alternatively, figure out how to report the status of hashed bins succinctly. if role_name[:4] in ['bins', 'bin_']: continue repo_status['roles'][role_name] = { 'signatures': len(role_info.signatures), 'version': role_info.signed.version, 'tuf_spec': role_info.signed.spec_version, 'expires': role_info.signed.expires.replace(tzinfo=timezone.utc).isoformat(), } if 'root' in self.roles: threshold = self.roles['root'].signed.roles[role_name].threshold repo_status['roles'][role_name]['threshold'] = threshold repo_status['roles'][role_name]['keys'] = {} for key_index, key in self.keys.items(): key_name = self._key_name(key_index) # @TODO: In _role_has_signing_capability() we look up keys in # the Root (or Targets) metadata. That seems more correct than # using the signatures themselves. if key['keyid'] not in role_info.signatures.keys(): continue key_types = list(key['keyval'].keys()) if 'private' in key_types: key_path = KeyManager().get_key_path(key_name, role_name, 'signing') else: key_path = KeyManager().get_key_path(key_name, role_name, 'verification') repo_status['roles'][role_name]['keys'][key_name] = { 'types': key_types, 'scheme': key['scheme'], 'key_path': key_path } return repo_status def get_keys_by_role(self) -> Dict[str, str]: """ Return a dictionary of roles and their associated keys. """ keys_by_role = {} for role_name, role in self.roles.items(): keys_by_role[role_name] = [] for keyid in role.signatures: for key_index, key in self.keys.items(): key_name = self._key_name(key_index) if key['keyid'] == keyid: keys_by_role[role_name].append(key_name) return keys_by_role def _init_dirs(self) -> None: """ Ensure all repository directories exist. """ dirs = { config['repo_metadata_path'].get(): 0o755, config['repo_targets_path'].get(): 0o755, } for dir, mode in dirs.items(): try: makedirs(dir, mode=mode, exist_ok=True) except PermissionError as e: log_exception(e) raise RuggedStorageError def _init_keys(self) -> None: """ Initialize a dictionary of keys. """ for role_name, key_names in KeyManager().find_keys().items(): for key_name in key_names: self._add_key_to_keychain(role_name, key_name) def _init_top_level_roles(self) -> None: """ Create all top-level metadata objects. """ self._init_targets_role() self._init_snapshot_role() self._init_timestamp_role() self._init_root_role() def _init_targets_role(self) -> None: """ Create targets metadata object. """ expiry = config['roles']['targets']['expiry'].get(float) log.debug(f"Setting 'targets' metadata expiry to {expiry}.") self.roles["targets"] = Metadata[Targets]( signed=Targets( version=1, spec_version=get_spec_version_string(), expires=seconds_from_now(expiry), targets={}, ), signatures=OrderedDict(), ) log.debug("Initialized 'targets' metadata.") if hashed_bins_is_enabled(): self._init_bin_roles() def _init_snapshot_role(self) -> None: """ Create snapshot metadata object. """ expiry = config['roles']['snapshot']['expiry'].get(float) log.debug(f"Setting 'snapshot' metadata expiry to {expiry}.") self.roles["snapshot"] = Metadata[Snapshot]( Snapshot( version=1, spec_version=get_spec_version_string(), expires=seconds_from_now(expiry), meta={"targets.json": MetaFile(version=1)}, ), signatures=OrderedDict(), ) if hashed_bins_is_enabled(): self._add_bin_roles_to_snapshot() log.debug("Initialized 'snapshot' metadata.") def _init_timestamp_role(self) -> None: """ Create timestamp metadata object. """ expiry = config['roles']['timestamp']['expiry'].get(float) log.debug(f"Setting 'timestamp' metadata expiry to {expiry}.") self.roles["timestamp"] = Metadata[Timestamp]( Timestamp( version=1, spec_version=get_spec_version_string(), expires=seconds_from_now(expiry), snapshot_meta=get_metafile_info(self.roles['snapshot']) ), signatures=OrderedDict(), ) log.debug("Initialized 'timestamp' metadata.") def _init_root_role(self) -> None: """ Create root metadata object. """ try: log.debug("Collecting keys for 'root' metadata.") keys = self._get_repo_keys_for_root_role() log.debug("Collecting roles for 'root' metadata.") roles = self._get_repo_roles_for_root_role() self.roles["root"] = Metadata[Root]( signed=Root( version=1, spec_version=get_spec_version_string(), keys=keys, roles=roles, consistent_snapshot=False, ), signatures=OrderedDict(), ) except ValueError as e: log_exception(e) error = "Failed to initialize Root metadata during TUF repository initialization." raise RuggedMetadataError(error) self.update_metadata_expiry("root") log.debug("Initialized 'root' metadata.") def update_metadata_expiry(self, role_name: str, config_role: str = '') -> None: """ Update expiry for a given role. """ if config_role == '': config_role = role_name expiry = config['roles'][config_role]['expiry'].get(float) log.debug(f"Setting '{role_name}' metadata expiry to {expiry}.") self.roles[role_name].signed.expires = seconds_from_now(expiry) def _get_repo_keys_for_root_role(self) -> Dict[str, Key]: """ Return all known keys, keyed by ID. """ repo_keys = {} for key in self.keys.values(): if not key: # @TODO: Re-visit once #253 lands. raise RuggedKeyError("Empty key returned from keychain.") try: log.debug(f"Loading key with ID: {key['keyid']}") repo_keys[key["keyid"]] = Key.from_securesystemslib_key(key) except TypeError as e: log_exception(e) error = "Failed to generate key metadata during TUF repository initialization." raise RuggedMetadataError(error) if not repo_keys: raise RuggedKeyError("No keys found for Root role.") return repo_keys def _get_repo_roles_for_root_role(self) -> Dict[str, Role]: """ Return all known roles, keyed by ID. """ repo_roles = {} for role_name, role_info in config['roles'].get().items(): if role_name not in TOP_LEVEL_ROLE_NAMES: continue role_keys = [] # This needs to use KeyManager because the root role isn't defined yet. keys = KeyManager().find_keys() if role_name in keys: for key_name in keys[role_name]: keyid = self._get_keyid_for_role_key(role_name, key_name) if keyid: log.debug(f"Adding 'key_name' key to '{role_name}' role with KeyID: {keyid}") role_keys.append(keyid) else: if role_name != Root.type: message = f"No keys found for '{role_name}'." log.error(message) raise RuggedKeyError(message) try: threshold = role_info['threshold'] log.debug(f"Setting '{role_name}' signature threshold to {threshold}.") repo_roles[role_name] = Role(role_keys, threshold=threshold) except ValueError as e: log_exception(e) error = "Failed to generate role metadata during TUF "\ "repository initialization." raise RuggedMetadataError(error) return repo_roles def _get_key_for_role_key(self, role_name: str, key_name: str) -> Dict | None: """ Return the named key for a given role. """ key_index = self._key_index(role_name, key_name) if key_index not in self.keys: log.debug(f"No '{key_name}' key in keychain for '{role_name}' role.") return key = self.keys[key_index] if not key: raise RuggedKeyError(f"Error loading '{key_name}' key for '{role_name}' role.") return key def _get_keyid_for_role_key(self, role_name: str, key_name: str) -> str | None: """ Return the KeyID for a named key for a given role. """ key = self._get_key_for_role_key(role_name, key_name) if not key: return if 'keyid' not in key: raise RuggedKeyError(f"Error finding keyid '{key_name}' key for '{role_name}' role.") return key['keyid'] ################ # Key rotation # ################ def rotate_keys( self, keys_to_add: List[Tuple[str, str]], keys_to_remove: List[Tuple[str, str]] ) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]: """ Update keys and re-sign metadata. """ added_keys = self._add_keys(keys_to_add) removed_keys = self._remove_keys(keys_to_remove) roles_to_regenerate_metadata = [] for role, key in added_keys + removed_keys: if role not in roles_to_regenerate_metadata: roles_to_regenerate_metadata.append(role) # Rotating a 'targets' key requires regenerating all role metadata. if 'targets' in roles_to_regenerate_metadata: self.update_root() self.write_metadata('root') self.update_targets() self.write_metadata('targets') self.update_snapshot() self.write_metadata('snapshot') self.update_timestamp() self.write_metadata('timestamp') # Rotating a 'snapshot' key requires regenerating root, timestamp and snapshot metadata. elif 'snapshot' in roles_to_regenerate_metadata: self.update_root() self.write_metadata('root') self.update_snapshot() self.write_metadata('snapshot') self.update_timestamp() self.write_metadata('timestamp') # Rotating 'timestamp' key requires regenerating root and timestamp metadata. elif 'timestamp' in roles_to_regenerate_metadata: self.update_root() self.write_metadata('root') self.update_timestamp() self.write_metadata('timestamp') # Rotating 'root' key only requires regenerating root metadata. elif 'root' in roles_to_regenerate_metadata: self.update_root() self.write_metadata('root') return (added_keys, removed_keys) def _add_keys(self, keys_to_add: List[Tuple[str, str]]) -> List[Tuple[str, str]]: """ Generate keys and add them to roles in root metadata. """ added_keys = [] for (role_name, key_name) in keys_to_add: KeyManager().generate_keypair(key_name, role_name) self._add_key_to_keychain(role_name, key_name) self._add_key_to_root(role_name, key_name) added_keys.append((role_name, key_name)) return added_keys def _add_key_to_keychain(self, role_name: str, key_name: str) -> None: """ Add a key to the keychain. """ # In #253, we should refer to a keypair, rather than a single key. log.debug(f"Adding '{key_name}' key to keychain.") key_index = self._key_index(role_name, key_name) role_keys = KeyManager().load_keys(key_name, role_name) if not role_keys: log.debug(f"No keys loaded for '{key_name}' keypair for '{role_name}' role.") else: self.keys[key_index] = role_keys def _add_key_to_root(self, role_name: str, key_name: str) -> None: """ Add a key to a role in root metadata. """ log.debug(f"Adding '{key_name}' key to '{role_name}' role in root metadata.") role_key = self._get_key_for_role_key(role_name, key_name) if not role_key: return key = Key.from_securesystemslib_key(role_key) self.roles['root'].signed.add_key(key, role_name) def _remove_keys(self, keys_to_remove: List[Tuple[str, str]]) -> List[Tuple[str, str]]: """ Delete keys and remove them from roles in root metadata. """ removed_keys = [] for (role_name, key_name) in keys_to_remove: KeyManager().delete_keypair(key_name, role_name) self._remove_key_from_root(role_name, key_name) self._remove_key_from_keychain(role_name, key_name) removed_keys.append((role_name, key_name)) return removed_keys def _remove_key_from_root(self, role_name: str, key_name: str) -> None: """ Remove a key from a role in root metadata. """ log.debug(f"Removing '{key_name}' key from '{role_name}' role in root metadata.") key_id = self._get_keyid_for_role_key(role_name, key_name) self.roles['root'].signed.revoke_key(key_id, role_name) def _remove_key_from_keychain(self, role_name: str, key_name: str) -> None: """ Remove a key from the keychain. """ log.debug(f"Removing '{key_name}' key from the keychain.") key_index = self._key_index(role_name, key_name) del self.keys[key_index] ######################### # Hashed bin delegation # ######################### # See: https://github.com/theupdateframework/python-tuf/blob/v2.1.0/examples/manual_repo/hashed_bin_delegation.py def update_hashed_bins(self, initializing: bool = False) -> None: """ Update all hashed bin roles to account for rotated keys. """ for bin_n_name, bin_n_role in self.roles["bins"].signed.delegations.roles.items(): self.update_hashed_bin(bin_n_name, initializing=initializing) self.update_hashed_bin("bins", initializing=initializing) def update_hashed_bin(self, bin_name: str, initializing: bool = False) -> None: """ Update a bin role to account for new or removed targets, or rotated keys. """ if not initializing: new_version = self.roles[bin_name].signed.version + 1 log.debug(f"Updating hashed bin '{bin_name}' metadata to version '{new_version}'.") self.roles[bin_name].signed.version = new_version self.update_metadata_expiry(bin_name, "targets") self.roles[bin_name].signatures.clear() self.sign_bin_metadata(bin_name) log.info(f"Updated hashed bins '{bin_name}' metadata.") def update_hashed_bin_versions_in_snapshot(self) -> None: """ Update snapshot metadata to reflect current hashed bin metafile info. """ for bin_n_name, bin_n_role in self.roles["bins"].signed.delegations.roles.items(): delete_bin_n_metadata = False if not self.roles[bin_n_name].signatures: self.load_metadata(bin_n_name) delete_bin_n_metadata = True self.roles["snapshot"].signed.meta[f"{bin_n_name}.json"] = get_metafile_info(self.roles[bin_n_name]) if delete_bin_n_metadata: self.delete_metadata(bin_n_name) self.roles["snapshot"].signed.meta["bins.json"] = get_metafile_info(self.roles['bins']) def sign_bin_metadata(self, bin_name: str) -> None: """ Sign bin role metadata. """ key_name = self._hashed_bins_key_name() signer = self._hashed_bins_signer(key_name) try: self.roles[bin_name].sign(signer, append=True) log.debug(f"Signed '{bin_name}' metadata with '{key_name}' key.") except Exception as e: log_exception(e) error = f"Failed to sign '{bin_name}' metadata with '{key_name}' key." log.error(error) raise RuggedMetadataError(error) def _hashed_bins_signer(self, key_name: str) -> SSlibSigner: """ Return an SSlibSigner using the key specified by name. """ # @TODO: We are conflating the key name and role name here. Figure out # whether this is really the best way to make this work. role_name = key_name key = self._get_key_for_role_key(role_name, key_name) if not key: raise RuggedKeyError(f"Could '{key_name}' signing key for '{role_name}' role.") return SSlibSigner(key) def _hashed_bins_key_name(self) -> str: """ Return the name of the key to use for hashed bins. """ return config['hashed_bins_key_name'].get() def _init_bin_roles(self) -> None: """ Create metadata objects for hashed bins. """ self._init_bins_role() bin_count = 0 for bin_n_name, bin_n_hash_prefixes in generate_hash_bins(): self._init_bin_n_role(bin_n_name, bin_n_hash_prefixes) bin_count += 1 number_of_bins = get_number_of_bins() if bin_count != number_of_bins: log.debug(f"Created {bin_count} bins, whereas 'number_of_bins: {number_of_bins}' was configured.") def _init_bins_role(self) -> None: """ Create metadata objects for 'bins' role. """ # Create preliminary delegating targets role (bins) and add public key for # delegated targets (bin_n) to key store. expiry = config['roles']['targets']['expiry'].get(float) self.roles["bins"] = Metadata[Targets]( signed=Targets( version=1, spec_version=get_spec_version_string(), expires=seconds_from_now(expiry), delegations=Delegations( keys=self._bins_keys(), roles={}, ) ), signatures=OrderedDict(), ) log.debug("Initialized 'bins' metadata.") # Update top-level targets role with delegation details for the "bins" # delegated targets role. self.roles["targets"].signed.delegations = Delegations( keys=self._bins_keys(), roles={ "bins": DelegatedRole( name="bins", keyids=self._bin_n_keyids(), threshold=1, terminating=False, # Delegate all target paths to 'bins'. From there we'll # split it up by path hash prefixes. paths=['*'], ), }, ) log.debug("Added top-level 'bins' delegated role to 'targets' metadata, in order to handle hashed bin " "delegation.") def _init_bin_n_role(self, bin_n_name: str, bin_n_hash_prefixes: List[str]) -> None: """ Create metadata object for 'bin-n' role. """ # Create delegated targets role (bin_n) expiry = config['roles']['targets']['expiry'].get(float) self.roles[bin_n_name] = Metadata[Targets]( signed=Targets( version=1, spec_version=get_spec_version_string(), expires=seconds_from_now(expiry), ), signatures=OrderedDict(), ) log.debug(f"Initialized '{bin_n_name}' metadata.") # Update delegating targets role (bins) with delegation details for this # delegated targets role (bin_n). self.roles["bins"].signed.delegations.roles[bin_n_name] = DelegatedRole( name=bin_n_name, keyids=self._bin_n_keyids(), threshold=1, terminating=True, path_hash_prefixes=bin_n_hash_prefixes, ) log.debug("Delegated the following target file hash prefixes to " f"'{bin_n_name}': {', '.join(bin_n_hash_prefixes)}.") def _bins_keys(self) -> Dict[str, Key]: """ Return the key(s) to use for the 'bins' role. """ key_name = self._hashed_bins_key_name() bin_n_key = Key.from_securesystemslib_key(self.keys[f"{key_name}:{key_name}"]) return {self.keys[f"{key_name}:{key_name}"]['keyid']: bin_n_key} def _bin_n_keyids(self) -> List[str]: """ Return the key IDs to use for 'bin_n' roles. """ key_name = self._hashed_bins_key_name() return [self.keys[f"{key_name}:{key_name}"]['keyid']] def _add_bin_roles_to_snapshot(self) -> None: """ Register bin roles with snapshot metadata. """ self.roles["snapshot"].signed.meta["bins.json"] = MetaFile(version=1) for bin_n_name, bin_n_hash_prefixes in generate_hash_bins(): self.roles["snapshot"].signed.meta[f"{bin_n_name}.json"] = MetaFile(version=1)
An opinionated TUF repository using the low-level TUF Metadata API.
@TODO: Implement support for consistent snapshots. (#100) @TODO: Implement support for delegated targets. (#36)
Methods
def add_target(self, target: str) ‑> None
-
Expand source code
def add_target(self, target: str) -> None: """ Add a single target to the targets (or hashed bin) metadata. """ try: moved_target_path = move_inbound_target_to_targets_dir(target) target_file_info = TargetFile.from_file(target, moved_target_path) self.add_target_to_metadata(target, target_file_info) if config['delete_targets_after_signing'].get(): delete_target_after_signing(moved_target_path) except Exception as e: log_exception(e) log.warning(f"Failed to add target '{target}' to the repository.")
Add a single target to the targets (or hashed bin) metadata.
def add_target_to_metadata(self, target: str, target_file_info: tuf.api.metadata.TargetFile) ‑> None
-
Expand source code
def add_target_to_metadata(self, target: str, target_file_info: TargetFile) -> None: """ Add a target to the targets (or hashed bin) metadata. """ targets_role = "targets" if hashed_bins_is_enabled(): targets_role = find_hash_bin(target) self.roles[targets_role].signed.targets[target] = target_file_info log.info(f"Added target '{target}' to '{targets_role}' role.")
Add a target to the targets (or hashed bin) metadata.
def add_targets(self) ‑> List[str]
-
Expand source code
def add_targets(self) -> List[str]: """ Add any inbound targets to the targets metadata. """ added_targets = [] for inbound_target in get_inbound_targets(): self.add_target(inbound_target) added_targets.append(inbound_target) if added_targets: if hashed_bins_is_enabled(): hashed_bins_to_update = get_bins_for_targets(added_targets) for bin_n_name in hashed_bins_to_update: self.update_hashed_bin(bin_n_name) # All targets are delegated to hashed bins. As a result, # there's no need to update the version of the targets # metadata. else: self.update_targets() return added_targets
Add any inbound targets to the targets metadata.
def delete_initial_hashed_bin_roles(self) ‑> None
-
Expand source code
def delete_initial_hashed_bin_roles(self) -> None: """ Delete initial hashed bin roles. """ current_bins = self.roles['bins'].signed.delegations.roles for bin_name, bin_n in current_bins.items(): self.delete_metadata(bin_name)
Delete initial hashed bin roles.
def delete_metadata(self, role_name: str) ‑> None
-
Expand source code
def delete_metadata(self, role_name: str) -> None: """ Delete a role's metadata. """ log.debug(f"Deleting '{role_name}' role from repository.") del self.roles[role_name]
Delete a role's metadata.
def delete_metadata_file(self, role_name: str) ‑> None
-
Expand source code
def delete_metadata_file(self, role_name: str) -> None: """ Delete a role's metadata file from storage. """ path = self._get_metadata_path(role_name) log.debug(f"Deleting metadata file: {path}") remove(path)
Delete a role's metadata file from storage.
def get_keys_by_role(self) ‑> Dict[str, str]
-
Expand source code
def get_keys_by_role(self) -> Dict[str, str]: """ Return a dictionary of roles and their associated keys. """ keys_by_role = {} for role_name, role in self.roles.items(): keys_by_role[role_name] = [] for keyid in role.signatures: for key_index, key in self.keys.items(): key_name = self._key_name(key_index) if key['keyid'] == keyid: keys_by_role[role_name].append(key_name) return keys_by_role
Return a dictionary of roles and their associated keys.
def initialize(self) ‑> None
-
Expand source code
def initialize(self) -> None: """ Initialize signed TUF metadata. """ self.update_root(initializing=True) self.update_targets(initializing=True) self.update_snapshot(initializing=True) self.update_timestamp(initializing=True)
Initialize signed TUF metadata.
def load(self) ‑> None
-
Expand source code
def load(self) -> None: """ Load all metadata from storage. """ for role_name in TOP_LEVEL_ROLE_NAMES: try: self.load_metadata(role_name) if hashed_bins_is_enabled(): self.load_hashed_bins_metadata() except (RuggedMetadataError, RuggedStorageError) as e: log_exception(e) error = f"Failed to load '{role_name}' metadata." log.error(error) raise RuggedMetadataError(error) log.debug("Loaded all metadata.")
Load all metadata from storage.
def load_hashed_bins_metadata(self) ‑> None
-
Expand source code
def load_hashed_bins_metadata(self) -> None: """ Load hashed bins metadata. """ # We want to load from metadata from storage. However, if the # configured number of hashed bins has changed, the initialized roles # will be incorrect. So we delete those, load 'bins.json', then load # the individual 'bin_n' metadata from there. self.delete_initial_hashed_bin_roles() self.load_metadata('bins') current_bins = self.roles['bins'].signed.delegations.roles for bin_name, bin_n in current_bins.items(): self.load_metadata(bin_name)
Load hashed bins metadata.
def load_metadata(self, role_name: str) ‑> None
-
Expand source code
def load_metadata(self, role_name: str) -> None: """ Load a role's metadata from storage. """ try: path = self._get_metadata_path(role_name) self.roles[role_name] = load_metadata_from_file(role_name, path) except StorageError as e: log_exception(e) error = f"Error loading '{role_name}' metadata from storage." log.error(error) raise RuggedStorageError(error)
Load a role's metadata from storage.
def load_metadata_for_hashed_bins_targets(self, targets: List[str]) ‑> None
-
Expand source code
def load_metadata_for_hashed_bins_targets(self, targets: List[str]) -> None: """ Load only the metadata required for a given set of targets. """ role_names = list(self.roles.keys()) target_hashed_bins = [find_hash_bin(target) for target in targets] # Remove all hashed bins except those for the targets provided. for bin_n_name, bin_n_hash_prefixes in generate_hash_bins(): if bin_n_name not in target_hashed_bins: role_names.remove(bin_n_name) for role_name in role_names: try: self.load_metadata(role_name) except (RuggedMetadataError, RuggedStorageError) as e: log_exception(e) error = f"Failed to load '{role_name}' metadata." log.error(error) raise RuggedMetadataError(error) log.debug("Loaded all metadata for targets.")
Load only the metadata required for a given set of targets.
def remove_target(self, target: str) ‑> bool
-
Expand source code
def remove_target(self, target: str) -> bool: """ Remove a single target from the targets (or hashed bin) metadata. """ try: targets_role = "targets" if hashed_bins_is_enabled(): targets_role = find_hash_bin(target) del self.roles[targets_role].signed.targets[target] log.info(f"Removed target '{target}' from the '{targets_role}' role.") delete_removed_target(target) return True except Exception as e: log_exception(e) warning = f"Failed to remove target '{target}' from the "\ "repository." log.warning(warning) return False
Remove a single target from the targets (or hashed bin) metadata.
def remove_targets(self, targets: List[str]) ‑> List[str]
-
Expand source code
def remove_targets(self, targets: List[str]) -> List[str]: """ Remove given targets from targets (or hashed bin) metadata. """ removed_targets = [] for target in targets: if self.remove_target(target): removed_targets.append(target) if removed_targets: if hashed_bins_is_enabled(): hashed_bins_to_update = get_bins_for_targets(removed_targets) for bin_n_name in hashed_bins_to_update: self.update_hashed_bin(bin_n_name) # All targets are delegated to hashed bins. As a result, # there's no need to update the version of the targets # metadata. else: self.update_targets() return removed_targets
Remove given targets from targets (or hashed bin) metadata.
def rotate_keys(self, keys_to_add: List[Tuple[str, str]], keys_to_remove: List[Tuple[str, str]]) ‑> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]
-
Expand source code
def rotate_keys( self, keys_to_add: List[Tuple[str, str]], keys_to_remove: List[Tuple[str, str]] ) -> Tuple[List[Tuple[str, str]], List[Tuple[str, str]]]: """ Update keys and re-sign metadata. """ added_keys = self._add_keys(keys_to_add) removed_keys = self._remove_keys(keys_to_remove) roles_to_regenerate_metadata = [] for role, key in added_keys + removed_keys: if role not in roles_to_regenerate_metadata: roles_to_regenerate_metadata.append(role) # Rotating a 'targets' key requires regenerating all role metadata. if 'targets' in roles_to_regenerate_metadata: self.update_root() self.write_metadata('root') self.update_targets() self.write_metadata('targets') self.update_snapshot() self.write_metadata('snapshot') self.update_timestamp() self.write_metadata('timestamp') # Rotating a 'snapshot' key requires regenerating root, timestamp and snapshot metadata. elif 'snapshot' in roles_to_regenerate_metadata: self.update_root() self.write_metadata('root') self.update_snapshot() self.write_metadata('snapshot') self.update_timestamp() self.write_metadata('timestamp') # Rotating 'timestamp' key requires regenerating root and timestamp metadata. elif 'timestamp' in roles_to_regenerate_metadata: self.update_root() self.write_metadata('root') self.update_timestamp() self.write_metadata('timestamp') # Rotating 'root' key only requires regenerating root metadata. elif 'root' in roles_to_regenerate_metadata: self.update_root() self.write_metadata('root') return (added_keys, removed_keys)
Update keys and re-sign metadata.
def sign_bin_metadata(self, bin_name: str) ‑> None
-
Expand source code
def sign_bin_metadata(self, bin_name: str) -> None: """ Sign bin role metadata. """ key_name = self._hashed_bins_key_name() signer = self._hashed_bins_signer(key_name) try: self.roles[bin_name].sign(signer, append=True) log.debug(f"Signed '{bin_name}' metadata with '{key_name}' key.") except Exception as e: log_exception(e) error = f"Failed to sign '{bin_name}' metadata with '{key_name}' key." log.error(error) raise RuggedMetadataError(error)
Sign bin role metadata.
def sign_metadata(self, role_name: str) ‑> None
-
Expand source code
def sign_metadata(self, role_name: str) -> None: """ Sign a role's metadata. """ # This needs to use KeyManager because the root role may not be defined yet. for key_name in KeyManager().find_keys_for_role(role_name): key = self._get_key_for_role_key(role_name, key_name) if not key: raise RuggedMetadataError(f"No key available to sign '{role_name}' metadata.") try: signer = SSlibSigner(key) self.roles[role_name].sign(signer, append=True) except Exception as e: log_exception(e) error = f"Failed to sign '{role_name}' metadata with '{key_name}' key." log.error(error) raise RuggedMetadataError(error) log.debug(f"Signed '{role_name}' metadata with '{key_name}' key.")
Sign a role's metadata.
def status(self) ‑> Dict[str, Dict[str, Any]]
-
Expand source code
def status(self) -> Dict[str, Dict[str, Any]]: repo_status: Dict[str, Dict[str, Any]] = { 'roles': {}, } if 'targets' in self.roles: targets = self.roles['targets'].signed.targets repo_status['targets'] = { 'count': len(targets), 'size': sum(target.length for target in targets.values()), } for role_name, role_info in self.roles.items(): # Ignore hashed bin roles, since the code below assumes we're only # showing the status of roles signed by root. # @TODO: Figure out a better way to do this. Maybe skip delegated roles? # @TODO: Alternatively, figure out how to report the status of hashed bins succinctly. if role_name[:4] in ['bins', 'bin_']: continue repo_status['roles'][role_name] = { 'signatures': len(role_info.signatures), 'version': role_info.signed.version, 'tuf_spec': role_info.signed.spec_version, 'expires': role_info.signed.expires.replace(tzinfo=timezone.utc).isoformat(), } if 'root' in self.roles: threshold = self.roles['root'].signed.roles[role_name].threshold repo_status['roles'][role_name]['threshold'] = threshold repo_status['roles'][role_name]['keys'] = {} for key_index, key in self.keys.items(): key_name = self._key_name(key_index) # @TODO: In _role_has_signing_capability() we look up keys in # the Root (or Targets) metadata. That seems more correct than # using the signatures themselves. if key['keyid'] not in role_info.signatures.keys(): continue key_types = list(key['keyval'].keys()) if 'private' in key_types: key_path = KeyManager().get_key_path(key_name, role_name, 'signing') else: key_path = KeyManager().get_key_path(key_name, role_name, 'verification') repo_status['roles'][role_name]['keys'][key_name] = { 'types': key_types, 'scheme': key['scheme'], 'key_path': key_path } return repo_status
def update_hashed_bin(self, bin_name: str, initializing: bool = False) ‑> None
-
Expand source code
def update_hashed_bin(self, bin_name: str, initializing: bool = False) -> None: """ Update a bin role to account for new or removed targets, or rotated keys. """ if not initializing: new_version = self.roles[bin_name].signed.version + 1 log.debug(f"Updating hashed bin '{bin_name}' metadata to version '{new_version}'.") self.roles[bin_name].signed.version = new_version self.update_metadata_expiry(bin_name, "targets") self.roles[bin_name].signatures.clear() self.sign_bin_metadata(bin_name) log.info(f"Updated hashed bins '{bin_name}' metadata.")
Update a bin role to account for new or removed targets, or rotated keys.
def update_hashed_bin_versions_in_snapshot(self) ‑> None
-
Expand source code
def update_hashed_bin_versions_in_snapshot(self) -> None: """ Update snapshot metadata to reflect current hashed bin metafile info. """ for bin_n_name, bin_n_role in self.roles["bins"].signed.delegations.roles.items(): delete_bin_n_metadata = False if not self.roles[bin_n_name].signatures: self.load_metadata(bin_n_name) delete_bin_n_metadata = True self.roles["snapshot"].signed.meta[f"{bin_n_name}.json"] = get_metafile_info(self.roles[bin_n_name]) if delete_bin_n_metadata: self.delete_metadata(bin_n_name) self.roles["snapshot"].signed.meta["bins.json"] = get_metafile_info(self.roles['bins'])
Update snapshot metadata to reflect current hashed bin metafile info.
def update_hashed_bins(self, initializing: bool = False) ‑> None
-
Expand source code
def update_hashed_bins(self, initializing: bool = False) -> None: """ Update all hashed bin roles to account for rotated keys. """ for bin_n_name, bin_n_role in self.roles["bins"].signed.delegations.roles.items(): self.update_hashed_bin(bin_n_name, initializing=initializing) self.update_hashed_bin("bins", initializing=initializing)
Update all hashed bin roles to account for rotated keys.
def update_metadata_expiry(self, role_name: str, config_role: str = '') ‑> None
-
Expand source code
def update_metadata_expiry(self, role_name: str, config_role: str = '') -> None: """ Update expiry for a given role. """ if config_role == '': config_role = role_name expiry = config['roles'][config_role]['expiry'].get(float) log.debug(f"Setting '{role_name}' metadata expiry to {expiry}.") self.roles[role_name].signed.expires = seconds_from_now(expiry)
Update expiry for a given role.
def update_root(self, initializing: bool = False) ‑> None
-
Expand source code
def update_root(self, initializing: bool = False) -> None: """ Update root to account for newly added or removed keys. """ if initializing: try: log.debug("Trying to initialize 'root' metadata from disk.") self.load_metadata("root") log.warn("Initialized 'root' metadata from disk.") log.info("If you did not intend to initialize with existing 'root' metadata then delete '1.root.json' " "and re-run this command.") except FileNotFoundError: log.debug("Did not find 'root' metadata on disk to load.") pass # If we've loaded signed root metadata from disk, don't overwrite it. if self.roles["root"].signatures: return if not initializing: self.roles["root"].signed.version += 1 self.update_metadata_expiry("root") self.roles["root"].signatures.clear() self.sign_metadata("root") log.info("Updated root metadata.")
Update root to account for newly added or removed keys.
def update_snapshot(self, initializing: bool = False) ‑> None
-
Expand source code
def update_snapshot(self, initializing: bool = False) -> None: """ Update snapshot to account for changed targets metadata, or rotated keys. """ self.roles["snapshot"].signed.meta["targets.json"] = get_metafile_info(self.roles['targets']) if hashed_bins_is_enabled(): self.update_hashed_bin_versions_in_snapshot() if not initializing: self.roles["snapshot"].signed.version += 1 self.update_metadata_expiry("snapshot") self.roles["snapshot"].signatures.clear() self.sign_metadata("snapshot") log.info("Updated snapshot metadata.")
Update snapshot to account for changed targets metadata, or rotated keys.
def update_targets(self, initializing: bool = False) ‑> None
-
Expand source code
def update_targets(self, initializing: bool = False) -> None: """ Update targets to account for new targets, or rotated keys. """ if hashed_bins_is_enabled(): self.update_hashed_bins(initializing=initializing) if not initializing: self.roles["targets"].signed.version += 1 self.update_metadata_expiry("targets") self.roles["targets"].signatures.clear() self.sign_metadata("targets") log.info("Updated targets metadata.")
Update targets to account for new targets, or rotated keys.
def update_timestamp(self, initializing: bool = False) ‑> None
-
Expand source code
def update_timestamp(self, initializing: bool = False) -> None: """ Update timestamp to account for changed snapshot metadata info, or rotated keys. """ self.roles["timestamp"].signed.snapshot_meta = get_metafile_info(self.roles['snapshot']) if not initializing: self.roles["timestamp"].signed.version += 1 self.update_metadata_expiry("timestamp") self.roles["timestamp"].signatures.clear() self.sign_metadata("timestamp") log.info("Updated timestamp metadata.")
Update timestamp to account for changed snapshot metadata info, or rotated keys.
def write(self) ‑> bool
-
Expand source code
def write(self) -> bool: """ Write all metadata to storage. """ for role_name in self.roles.keys(): result = self.write_metadata(role_name) if not result: return False return True
Write all metadata to storage.
def write_metadata(self, role_name: str) ‑> bool
-
Expand source code
def write_metadata(self, role_name: str) -> bool: """ Write a role's signed metadata to storage. """ path = self._get_metadata_path(role_name, new=True) metadata = self.roles[role_name] return write_metadata_to_file(role_name, metadata, path)
Write a role's signed metadata to storage.