Module rugged.tuf.key_manager
Classes
class KeyManager
-
Expand source code
class KeyManager(): """ Provides key CRUD functionality. """ """ Static cache for keys found on the filesystem. """ _role_keys: Dict[str, List[str]] = {} def generate_keypair(self, key_name: str, role_name: str) -> Tuple[str, str]: """ Generate a keypair for a given role """ if not self._ensure_rugged_key_dirs(): return ('', '') with TemporaryDirectory() as tempdir: temp_privkey_path = f"{tempdir}/{role_name}/{key_name}" log.debug(f"Generating keypair at {temp_privkey_path}.") _generate_and_write_ed25519_keypair(filepath=temp_privkey_path) # Remove this once we update to newer securesystemslib self._remove_keyid_hash_algorithms(temp_privkey_path) try: privkey_path = self.get_key_path(key_name, role_name, 'signing') log.debug(f"Copying signing key to {privkey_path}.") shutil.copy(temp_privkey_path, privkey_path) temp_pubkey_path = f"{temp_privkey_path}.pub" pubkey_path = self.get_key_path(key_name, role_name, 'verification') log.debug(f"Copying verification key to {pubkey_path}.") shutil.copy(temp_pubkey_path, pubkey_path) # Clear the cache for this role, so that the directory will # be re-scanned to pick up the new key. self._clear_cache_by_role(role_name) return (privkey_path, pubkey_path) except (FileNotFoundError, PermissionError) as e: log_exception(e) return ('', '') def _remove_keyid_hash_algorithms(self, private_key_path: str) -> None: """ Remove deprecated `keyid_hash_algorithms` from generated keys. """ keys = { 'public': f"{private_key_path}.pub", 'private': private_key_path, } bad_key = 'keyid_hash_algorithms' for type, path in keys.items(): log.debug(f"Looking for '{bad_key}' in {type} key at '{path}'.") try: with open(path, 'r') as file: key = json.load(file) if bad_key in key: log.debug(f"Found '{bad_key}' in {type} key at '{path}'.") with open(path, 'w') as file: log.debug(f"Deleting '{bad_key}' from {type} key at '{path}'.") del key[bad_key] log.debug(f"Re-writing {type} key at '{path}'.") json.dump(key, file) else: log.warning("Did not find '{bad_key}' in generated key.") log.warning("Please remove `rugged.tuf.key_manager._remove_keyid_hash_algorithms()`.") except Exception as e: log_exception(e) raise def _clear_cache_by_role(self, role_name: str) -> None: """ Remove a role from the key cache, to ensure the role's key directories get re-scanned. """ if role_name in self._role_keys: del self._role_keys[role_name] def _ensure_rugged_key_dirs(self) -> bool: """ Ensure the Rugged key directories exist. """ for key_dir in [RUGGED_SIGNING_KEY_DIR, RUGGED_VERIFICATION_KEY_DIR]: try: os.makedirs(key_dir, mode=0o700, exist_ok=True) except PermissionError as e: log_exception(e) return False return True def find_keys(self) -> Dict[str, str]: """ Find keys for all roles. """ keys = {} for role_name in config['roles'].get().keys(): role_keys = self.find_keys_for_role(role_name) if role_keys: keys[role_name] = role_keys return keys def find_keys_for_role(self, role_name: str) -> List[str]: """ Find keys for a specific role. """ if role_name not in self._role_keys: # We don't have cached keys for this role, scan the filesystem for # them. self.find_verification_keys_for_role(role_name) self.find_signing_keys_for_role(role_name) # Return keys for the role if we found any. if role_name in self._role_keys: return self._role_keys[role_name] else: # No keys were found for this role. return [] def find_verification_keys_for_role(self, role_name: str) -> None: """ Find verification (public) keys for a specific role. """ for file in os.listdir(f"{RUGGED_VERIFICATION_KEY_DIR}/{role_name}"): # Ignore `.gitkeep` files. if file.startswith('.'): continue try: key_name = Path(file).stem if self.load_verification_key(key_name, role_name): log.debug(f"Found '{key_name}' verification key for '{role_name}' role.") if role_name not in self._role_keys: self._role_keys[role_name] = [key_name] else: self._role_keys[role_name].append(key_name) except CryptoError: # Any error in parsing a key means that it's not a valid # key. This is normal, as the key directories are likely # to have `.gitkeep` files, for example. We just ignore # these. pass def find_signing_keys_for_role(self, role_name: str) -> None: """ Find signing (private) keys for a specific role. """ try: files = os.listdir(f"{RUGGED_SIGNING_KEY_DIR}/{role_name}") except FileNotFoundError: # This is to be expected, since most workers won't have access # to all the signing keys. Similarly, this can happen when using # HSM-based root keys. So it's not really an error, and we can # return without registering any signing keys. return for file in files: # Ignore `.gitkeep` and other hidden files. if file.startswith('.'): continue key_name = Path(file).stem # If we've already registered this keypair, skip to the next one. if role_name in self._role_keys and key_name in self._role_keys[role_name]: continue try: if self.load_signing_key(key_name, role_name): log.debug(f"Found '{key_name}' signing key for '{role_name}' role.") if role_name not in self._role_keys: self._role_keys[role_name] = [key_name] else: self._role_keys[role_name].append(key_name) except CryptoError: # Any error in parsing a key means that it's not a valid # key. We should tolerate such cruft, if only to avoid a # possible denial-of-service attack vector. pass def load_keys(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict: """ Load public keys and private keys (if available) from storage. """ signing_key = self.load_signing_key(key_name, role_name) if signing_key: log.debug(f"Loaded '{key_name}' signing key for '{role_name}' role.") return signing_key verification_key = self.load_verification_key(key_name, role_name) if verification_key: log.debug(f"Loaded '{key_name}' verification key for '{role_name}' role.") return verification_key log.debug(f"Failed to load '{key_name}' keys for '{role_name}' role.") return {} def load_signing_key(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict: """ Load a signing key for a given role. """ key_path = self.get_key_path(key_name, role_name, 'signing') try: log.debug(f"Loading signing key for '{role_name}' role from '{key_path}'.") key = import_ed25519_privatekey_from_file(key_path) except StorageError: # This is to be expected. So it's not really an error. return {} return key def load_verification_key(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict: """ Load a verification key for a given role. """ key_path = self.get_key_path(key_name, role_name, 'verification') try: log.debug(f"Loading verification key for '{role_name}' role from '{key_path}'.") key = import_ed25519_publickey_from_file(key_path) except StorageError: # This is to be expected. So it's not really an error. return {} return key def delete_keypair(self, key_name: str, role_name: str) -> Tuple[str, str]: """ Delete a specific keypair. """ privkey_result = self._delete_key(key_name, role_name, 'signing') pubkey_result = self._delete_key(key_name, role_name, 'verification') # Clear the cache for this role, so that the stale key won't # be be returned when `find_keys()` and friends are called. self._clear_cache_by_role(role_name) return (privkey_result, pubkey_result) def _delete_key(self, key_name: str, role_name: str, key_type: str) -> str: """ Delete a specific key. """ key_path = self.get_key_path(key_name, role_name, key_type) log.debug(f"Deleting {key_type} key at {key_path}.") try: os.remove(key_path) except Exception as e: log_exception(e) log.error(f"Failed to delete file at '{key_path}'. Check log for details.") sys.exit(os.EX_IOERR) return key_path def get_key_path(self, key_name: str, role_name: str, key_type: str) -> str: """ Return the path of a specific key, based on its name, role and type. """ if key_type == 'signing': key_dir = RUGGED_SIGNING_KEY_DIR extension = '' elif key_type == 'verification': key_dir = RUGGED_VERIFICATION_KEY_DIR extension = '.pub' else: error = f"Unrecognized key type '{key_type}'." error += "Key type must be either 'signing' or 'verification'." raise Exception(error) return f"{key_dir}/{role_name}/{key_name}{extension}"
Provides key CRUD functionality.
Methods
def delete_keypair(self, key_name: str, role_name: str) ‑> Tuple[str, str]
-
Expand source code
def delete_keypair(self, key_name: str, role_name: str) -> Tuple[str, str]: """ Delete a specific keypair. """ privkey_result = self._delete_key(key_name, role_name, 'signing') pubkey_result = self._delete_key(key_name, role_name, 'verification') # Clear the cache for this role, so that the stale key won't # be be returned when `find_keys()` and friends are called. self._clear_cache_by_role(role_name) return (privkey_result, pubkey_result)
Delete a specific keypair.
def find_keys(self) ‑> Dict[str, str]
-
Expand source code
def find_keys(self) -> Dict[str, str]: """ Find keys for all roles. """ keys = {} for role_name in config['roles'].get().keys(): role_keys = self.find_keys_for_role(role_name) if role_keys: keys[role_name] = role_keys return keys
Find keys for all roles.
def find_keys_for_role(self, role_name: str) ‑> List[str]
-
Expand source code
def find_keys_for_role(self, role_name: str) -> List[str]: """ Find keys for a specific role. """ if role_name not in self._role_keys: # We don't have cached keys for this role, scan the filesystem for # them. self.find_verification_keys_for_role(role_name) self.find_signing_keys_for_role(role_name) # Return keys for the role if we found any. if role_name in self._role_keys: return self._role_keys[role_name] else: # No keys were found for this role. return []
Find keys for a specific role.
def find_signing_keys_for_role(self, role_name: str) ‑> None
-
Expand source code
def find_signing_keys_for_role(self, role_name: str) -> None: """ Find signing (private) keys for a specific role. """ try: files = os.listdir(f"{RUGGED_SIGNING_KEY_DIR}/{role_name}") except FileNotFoundError: # This is to be expected, since most workers won't have access # to all the signing keys. Similarly, this can happen when using # HSM-based root keys. So it's not really an error, and we can # return without registering any signing keys. return for file in files: # Ignore `.gitkeep` and other hidden files. if file.startswith('.'): continue key_name = Path(file).stem # If we've already registered this keypair, skip to the next one. if role_name in self._role_keys and key_name in self._role_keys[role_name]: continue try: if self.load_signing_key(key_name, role_name): log.debug(f"Found '{key_name}' signing key for '{role_name}' role.") if role_name not in self._role_keys: self._role_keys[role_name] = [key_name] else: self._role_keys[role_name].append(key_name) except CryptoError: # Any error in parsing a key means that it's not a valid # key. We should tolerate such cruft, if only to avoid a # possible denial-of-service attack vector. pass
Find signing (private) keys for a specific role.
def find_verification_keys_for_role(self, role_name: str) ‑> None
-
Expand source code
def find_verification_keys_for_role(self, role_name: str) -> None: """ Find verification (public) keys for a specific role. """ for file in os.listdir(f"{RUGGED_VERIFICATION_KEY_DIR}/{role_name}"): # Ignore `.gitkeep` files. if file.startswith('.'): continue try: key_name = Path(file).stem if self.load_verification_key(key_name, role_name): log.debug(f"Found '{key_name}' verification key for '{role_name}' role.") if role_name not in self._role_keys: self._role_keys[role_name] = [key_name] else: self._role_keys[role_name].append(key_name) except CryptoError: # Any error in parsing a key means that it's not a valid # key. This is normal, as the key directories are likely # to have `.gitkeep` files, for example. We just ignore # these. pass
Find verification (public) keys for a specific role.
def generate_keypair(self, key_name: str, role_name: str) ‑> Tuple[str, str]
-
Expand source code
def generate_keypair(self, key_name: str, role_name: str) -> Tuple[str, str]: """ Generate a keypair for a given role """ if not self._ensure_rugged_key_dirs(): return ('', '') with TemporaryDirectory() as tempdir: temp_privkey_path = f"{tempdir}/{role_name}/{key_name}" log.debug(f"Generating keypair at {temp_privkey_path}.") _generate_and_write_ed25519_keypair(filepath=temp_privkey_path) # Remove this once we update to newer securesystemslib self._remove_keyid_hash_algorithms(temp_privkey_path) try: privkey_path = self.get_key_path(key_name, role_name, 'signing') log.debug(f"Copying signing key to {privkey_path}.") shutil.copy(temp_privkey_path, privkey_path) temp_pubkey_path = f"{temp_privkey_path}.pub" pubkey_path = self.get_key_path(key_name, role_name, 'verification') log.debug(f"Copying verification key to {pubkey_path}.") shutil.copy(temp_pubkey_path, pubkey_path) # Clear the cache for this role, so that the directory will # be re-scanned to pick up the new key. self._clear_cache_by_role(role_name) return (privkey_path, pubkey_path) except (FileNotFoundError, PermissionError) as e: log_exception(e) return ('', '')
Generate a keypair for a given role
def get_key_path(self, key_name: str, role_name: str, key_type: str) ‑> str
-
Expand source code
def get_key_path(self, key_name: str, role_name: str, key_type: str) -> str: """ Return the path of a specific key, based on its name, role and type. """ if key_type == 'signing': key_dir = RUGGED_SIGNING_KEY_DIR extension = '' elif key_type == 'verification': key_dir = RUGGED_VERIFICATION_KEY_DIR extension = '.pub' else: error = f"Unrecognized key type '{key_type}'." error += "Key type must be either 'signing' or 'verification'." raise Exception(error) return f"{key_dir}/{role_name}/{key_name}{extension}"
Return the path of a specific key, based on its name, role and type.
def load_keys(self, key_name: str, role_name: str) ‑> Dict[str, Any] | Dict
-
Expand source code
def load_keys(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict: """ Load public keys and private keys (if available) from storage. """ signing_key = self.load_signing_key(key_name, role_name) if signing_key: log.debug(f"Loaded '{key_name}' signing key for '{role_name}' role.") return signing_key verification_key = self.load_verification_key(key_name, role_name) if verification_key: log.debug(f"Loaded '{key_name}' verification key for '{role_name}' role.") return verification_key log.debug(f"Failed to load '{key_name}' keys for '{role_name}' role.") return {}
Load public keys and private keys (if available) from storage.
def load_signing_key(self, key_name: str, role_name: str) ‑> Dict[str, Any] | Dict
-
Expand source code
def load_signing_key(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict: """ Load a signing key for a given role. """ key_path = self.get_key_path(key_name, role_name, 'signing') try: log.debug(f"Loading signing key for '{role_name}' role from '{key_path}'.") key = import_ed25519_privatekey_from_file(key_path) except StorageError: # This is to be expected. So it's not really an error. return {} return key
Load a signing key for a given role.
def load_verification_key(self, key_name: str, role_name: str) ‑> Dict[str, Any] | Dict
-
Expand source code
def load_verification_key(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict: """ Load a verification key for a given role. """ key_path = self.get_key_path(key_name, role_name, 'verification') try: log.debug(f"Loading verification key for '{role_name}' role from '{key_path}'.") key = import_ed25519_publickey_from_file(key_path) except StorageError: # This is to be expected. So it's not really an error. return {} return key
Load a verification key for a given role.