Module rugged.tuf.key_manager

Classes

class KeyManager
Expand source code
class KeyManager():
    """ Provides key CRUD functionality. """

    """ Static cache for keys found on the filesystem. """
    _role_keys: Dict[str, List[str]] = {}

    def generate_keypair(self, key_name: str, role_name: str) -> Tuple[str, str]:
        """ Generate a keypair for a given role """
        if not self._ensure_rugged_key_dirs():
            return ('', '')
        with TemporaryDirectory() as tempdir:
            temp_privkey_path = f"{tempdir}/{role_name}/{key_name}"
            log.debug(f"Generating keypair at {temp_privkey_path}.")
            _generate_and_write_ed25519_keypair(filepath=temp_privkey_path)
            # Remove this once we update to newer securesystemslib
            self._remove_keyid_hash_algorithms(temp_privkey_path)
            try:
                privkey_path = self.get_key_path(key_name, role_name, 'signing')
                log.debug(f"Copying signing key to {privkey_path}.")
                shutil.copy(temp_privkey_path, privkey_path)

                temp_pubkey_path = f"{temp_privkey_path}.pub"
                pubkey_path = self.get_key_path(key_name, role_name, 'verification')
                log.debug(f"Copying verification key to {pubkey_path}.")
                shutil.copy(temp_pubkey_path, pubkey_path)

                # Clear the cache for this role, so that the directory will
                # be re-scanned to pick up the new key.
                self._clear_cache_by_role(role_name)
                return (privkey_path, pubkey_path)
            except (FileNotFoundError, PermissionError) as e:
                log_exception(e)
                return ('', '')

    def _remove_keyid_hash_algorithms(self, private_key_path: str) -> None:
        """ Remove deprecated `keyid_hash_algorithms` from generated keys. """
        keys = {
            'public': f"{private_key_path}.pub",
            'private': private_key_path,
        }
        bad_key = 'keyid_hash_algorithms'
        for type, path in keys.items():
            log.debug(f"Looking for '{bad_key}' in {type} key at '{path}'.")
            try:
                with open(path, 'r') as file:
                    key = json.load(file)
                    if bad_key in key:
                        log.debug(f"Found '{bad_key}' in {type} key at '{path}'.")
                        with open(path, 'w') as file:
                            log.debug(f"Deleting '{bad_key}' from {type} key at '{path}'.")
                            del key[bad_key]
                            log.debug(f"Re-writing {type} key at '{path}'.")
                            json.dump(key, file)
                    else:
                        log.warning("Did not find '{bad_key}' in generated key.")
                        log.warning("Please remove `rugged.tuf.key_manager._remove_keyid_hash_algorithms()`.")
            except Exception as e:
                log_exception(e)
                raise

    def _clear_cache_by_role(self, role_name: str) -> None:
        """ Remove a role from the key cache, to ensure the role's key directories get re-scanned. """
        if role_name in self._role_keys:
            del self._role_keys[role_name]

    def _ensure_rugged_key_dirs(self) -> bool:
        """ Ensure the Rugged key directories exist. """
        for key_dir in [RUGGED_SIGNING_KEY_DIR, RUGGED_VERIFICATION_KEY_DIR]:
            try:
                os.makedirs(key_dir, mode=0o700, exist_ok=True)
            except PermissionError as e:
                log_exception(e)
                return False
        return True

    def find_keys(self) -> Dict[str, str]:
        """ Find keys for all roles. """
        keys = {}
        for role_name in config['roles'].get().keys():
            role_keys = self.find_keys_for_role(role_name)
            if role_keys:
                keys[role_name] = role_keys
        return keys

    def find_keys_for_role(self, role_name: str) -> List[str]:
        """ Find keys for a specific role. """
        if role_name not in self._role_keys:
            # We don't have cached keys for this role, scan the filesystem for
            # them.
            self.find_verification_keys_for_role(role_name)
            self.find_signing_keys_for_role(role_name)

        # Return keys for the role if we found any.
        if role_name in self._role_keys:
            return self._role_keys[role_name]
        else:
            # No keys were found for this role.
            return []

    def find_verification_keys_for_role(self, role_name: str) -> None:
        """ Find verification (public) keys for a specific role. """
        for file in os.listdir(f"{RUGGED_VERIFICATION_KEY_DIR}/{role_name}"):
            # Ignore `.gitkeep` files.
            if file.startswith('.'):
                continue
            try:
                key_name = Path(file).stem
                if self.load_verification_key(key_name, role_name):
                    log.debug(f"Found '{key_name}' verification key for '{role_name}' role.")
                    if role_name not in self._role_keys:
                        self._role_keys[role_name] = [key_name]
                    else:
                        self._role_keys[role_name].append(key_name)
            except CryptoError:
                # Any error in parsing a key means that it's not a valid
                # key. This is normal, as the key directories are likely
                # to have `.gitkeep` files, for example. We just ignore
                # these.
                pass

    def find_signing_keys_for_role(self, role_name: str) -> None:
        """ Find signing (private) keys for a specific role. """
        try:
            files = os.listdir(f"{RUGGED_SIGNING_KEY_DIR}/{role_name}")
        except FileNotFoundError:
            # This is to be expected, since most workers won't have access
            # to all the signing keys. Similarly, this can happen when using
            # HSM-based root keys. So it's not really an error, and we can
            # return without registering any signing keys.
            return
        for file in files:
            # Ignore `.gitkeep` and other hidden files.
            if file.startswith('.'):
                continue
            key_name = Path(file).stem
            # If we've already registered this keypair, skip to the next one.
            if role_name in self._role_keys and key_name in self._role_keys[role_name]:
                continue
            try:
                if self.load_signing_key(key_name, role_name):
                    log.debug(f"Found '{key_name}' signing key for '{role_name}' role.")
                    if role_name not in self._role_keys:
                        self._role_keys[role_name] = [key_name]
                    else:
                        self._role_keys[role_name].append(key_name)
            except CryptoError:
                # Any error in parsing a key means that it's not a valid
                # key. We should tolerate such cruft, if only to avoid a
                # possible denial-of-service attack vector.
                pass

    def load_keys(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict:
        """ Load public keys and private keys (if available) from storage. """
        signing_key = self.load_signing_key(key_name, role_name)
        if signing_key:
            log.debug(f"Loaded '{key_name}' signing key for '{role_name}' role.")
            return signing_key
        verification_key = self.load_verification_key(key_name, role_name)
        if verification_key:
            log.debug(f"Loaded '{key_name}' verification key for '{role_name}' role.")
            return verification_key
        log.debug(f"Failed to load '{key_name}' keys for '{role_name}' role.")
        return {}

    def load_signing_key(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict:
        """ Load a signing key for a given role. """
        key_path = self.get_key_path(key_name, role_name, 'signing')
        try:
            log.debug(f"Loading signing key for '{role_name}' role from '{key_path}'.")
            key = import_ed25519_privatekey_from_file(key_path)
        except StorageError:
            # This is to be expected. So it's not really an error.
            return {}
        return key

    def load_verification_key(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict:
        """ Load a verification key for a given role. """
        key_path = self.get_key_path(key_name, role_name, 'verification')
        try:
            log.debug(f"Loading verification key for '{role_name}' role from '{key_path}'.")
            key = import_ed25519_publickey_from_file(key_path)
        except StorageError:
            # This is to be expected. So it's not really an error.
            return {}
        return key

    def delete_keypair(self, key_name: str, role_name: str) -> Tuple[str, str]:
        """ Delete a specific keypair. """
        privkey_result = self._delete_key(key_name, role_name, 'signing')
        pubkey_result = self._delete_key(key_name, role_name, 'verification')
        # Clear the cache for this role, so that the stale key won't
        # be be returned when `find_keys()` and friends are called.
        self._clear_cache_by_role(role_name)
        return (privkey_result, pubkey_result)

    def _delete_key(self, key_name: str, role_name: str, key_type: str) -> str:
        """ Delete a specific key. """
        key_path = self.get_key_path(key_name, role_name, key_type)
        log.debug(f"Deleting {key_type} key at {key_path}.")
        try:
            os.remove(key_path)
        except Exception as e:
            log_exception(e)
            log.error(f"Failed to delete file at '{key_path}'. Check log for details.")
            sys.exit(os.EX_IOERR)
        return key_path

    def get_key_path(self, key_name: str, role_name: str, key_type: str) -> str:
        """ Return the path of a specific key, based on its name, role and type. """
        if key_type == 'signing':
            key_dir = RUGGED_SIGNING_KEY_DIR
            extension = ''
        elif key_type == 'verification':
            key_dir = RUGGED_VERIFICATION_KEY_DIR
            extension = '.pub'
        else:
            error = f"Unrecognized key type '{key_type}'."
            error += "Key type must be either 'signing' or 'verification'."
            raise Exception(error)
        return f"{key_dir}/{role_name}/{key_name}{extension}"

Provides key CRUD functionality.

Methods

def delete_keypair(self, key_name: str, role_name: str) ‑> Tuple[str, str]
Expand source code
def delete_keypair(self, key_name: str, role_name: str) -> Tuple[str, str]:
    """ Delete a specific keypair. """
    privkey_result = self._delete_key(key_name, role_name, 'signing')
    pubkey_result = self._delete_key(key_name, role_name, 'verification')
    # Clear the cache for this role, so that the stale key won't
    # be be returned when `find_keys()` and friends are called.
    self._clear_cache_by_role(role_name)
    return (privkey_result, pubkey_result)

Delete a specific keypair.

def find_keys(self) ‑> Dict[str, str]
Expand source code
def find_keys(self) -> Dict[str, str]:
    """ Find keys for all roles. """
    keys = {}
    for role_name in config['roles'].get().keys():
        role_keys = self.find_keys_for_role(role_name)
        if role_keys:
            keys[role_name] = role_keys
    return keys

Find keys for all roles.

def find_keys_for_role(self, role_name: str) ‑> List[str]
Expand source code
def find_keys_for_role(self, role_name: str) -> List[str]:
    """ Find keys for a specific role. """
    if role_name not in self._role_keys:
        # We don't have cached keys for this role, scan the filesystem for
        # them.
        self.find_verification_keys_for_role(role_name)
        self.find_signing_keys_for_role(role_name)

    # Return keys for the role if we found any.
    if role_name in self._role_keys:
        return self._role_keys[role_name]
    else:
        # No keys were found for this role.
        return []

Find keys for a specific role.

def find_signing_keys_for_role(self, role_name: str) ‑> None
Expand source code
def find_signing_keys_for_role(self, role_name: str) -> None:
    """ Find signing (private) keys for a specific role. """
    try:
        files = os.listdir(f"{RUGGED_SIGNING_KEY_DIR}/{role_name}")
    except FileNotFoundError:
        # This is to be expected, since most workers won't have access
        # to all the signing keys. Similarly, this can happen when using
        # HSM-based root keys. So it's not really an error, and we can
        # return without registering any signing keys.
        return
    for file in files:
        # Ignore `.gitkeep` and other hidden files.
        if file.startswith('.'):
            continue
        key_name = Path(file).stem
        # If we've already registered this keypair, skip to the next one.
        if role_name in self._role_keys and key_name in self._role_keys[role_name]:
            continue
        try:
            if self.load_signing_key(key_name, role_name):
                log.debug(f"Found '{key_name}' signing key for '{role_name}' role.")
                if role_name not in self._role_keys:
                    self._role_keys[role_name] = [key_name]
                else:
                    self._role_keys[role_name].append(key_name)
        except CryptoError:
            # Any error in parsing a key means that it's not a valid
            # key. We should tolerate such cruft, if only to avoid a
            # possible denial-of-service attack vector.
            pass

Find signing (private) keys for a specific role.

def find_verification_keys_for_role(self, role_name: str) ‑> None
Expand source code
def find_verification_keys_for_role(self, role_name: str) -> None:
    """ Find verification (public) keys for a specific role. """
    for file in os.listdir(f"{RUGGED_VERIFICATION_KEY_DIR}/{role_name}"):
        # Ignore `.gitkeep` files.
        if file.startswith('.'):
            continue
        try:
            key_name = Path(file).stem
            if self.load_verification_key(key_name, role_name):
                log.debug(f"Found '{key_name}' verification key for '{role_name}' role.")
                if role_name not in self._role_keys:
                    self._role_keys[role_name] = [key_name]
                else:
                    self._role_keys[role_name].append(key_name)
        except CryptoError:
            # Any error in parsing a key means that it's not a valid
            # key. This is normal, as the key directories are likely
            # to have `.gitkeep` files, for example. We just ignore
            # these.
            pass

Find verification (public) keys for a specific role.

def generate_keypair(self, key_name: str, role_name: str) ‑> Tuple[str, str]
Expand source code
def generate_keypair(self, key_name: str, role_name: str) -> Tuple[str, str]:
    """ Generate a keypair for a given role """
    if not self._ensure_rugged_key_dirs():
        return ('', '')
    with TemporaryDirectory() as tempdir:
        temp_privkey_path = f"{tempdir}/{role_name}/{key_name}"
        log.debug(f"Generating keypair at {temp_privkey_path}.")
        _generate_and_write_ed25519_keypair(filepath=temp_privkey_path)
        # Remove this once we update to newer securesystemslib
        self._remove_keyid_hash_algorithms(temp_privkey_path)
        try:
            privkey_path = self.get_key_path(key_name, role_name, 'signing')
            log.debug(f"Copying signing key to {privkey_path}.")
            shutil.copy(temp_privkey_path, privkey_path)

            temp_pubkey_path = f"{temp_privkey_path}.pub"
            pubkey_path = self.get_key_path(key_name, role_name, 'verification')
            log.debug(f"Copying verification key to {pubkey_path}.")
            shutil.copy(temp_pubkey_path, pubkey_path)

            # Clear the cache for this role, so that the directory will
            # be re-scanned to pick up the new key.
            self._clear_cache_by_role(role_name)
            return (privkey_path, pubkey_path)
        except (FileNotFoundError, PermissionError) as e:
            log_exception(e)
            return ('', '')

Generate a keypair for a given role

def get_key_path(self, key_name: str, role_name: str, key_type: str) ‑> str
Expand source code
def get_key_path(self, key_name: str, role_name: str, key_type: str) -> str:
    """ Return the path of a specific key, based on its name, role and type. """
    if key_type == 'signing':
        key_dir = RUGGED_SIGNING_KEY_DIR
        extension = ''
    elif key_type == 'verification':
        key_dir = RUGGED_VERIFICATION_KEY_DIR
        extension = '.pub'
    else:
        error = f"Unrecognized key type '{key_type}'."
        error += "Key type must be either 'signing' or 'verification'."
        raise Exception(error)
    return f"{key_dir}/{role_name}/{key_name}{extension}"

Return the path of a specific key, based on its name, role and type.

def load_keys(self, key_name: str, role_name: str) ‑> Dict[str, Any] | Dict
Expand source code
def load_keys(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict:
    """ Load public keys and private keys (if available) from storage. """
    signing_key = self.load_signing_key(key_name, role_name)
    if signing_key:
        log.debug(f"Loaded '{key_name}' signing key for '{role_name}' role.")
        return signing_key
    verification_key = self.load_verification_key(key_name, role_name)
    if verification_key:
        log.debug(f"Loaded '{key_name}' verification key for '{role_name}' role.")
        return verification_key
    log.debug(f"Failed to load '{key_name}' keys for '{role_name}' role.")
    return {}

Load public keys and private keys (if available) from storage.

def load_signing_key(self, key_name: str, role_name: str) ‑> Dict[str, Any] | Dict
Expand source code
def load_signing_key(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict:
    """ Load a signing key for a given role. """
    key_path = self.get_key_path(key_name, role_name, 'signing')
    try:
        log.debug(f"Loading signing key for '{role_name}' role from '{key_path}'.")
        key = import_ed25519_privatekey_from_file(key_path)
    except StorageError:
        # This is to be expected. So it's not really an error.
        return {}
    return key

Load a signing key for a given role.

def load_verification_key(self, key_name: str, role_name: str) ‑> Dict[str, Any] | Dict
Expand source code
def load_verification_key(self, key_name: str, role_name: str) -> Dict[str, Any] | Dict:
    """ Load a verification key for a given role. """
    key_path = self.get_key_path(key_name, role_name, 'verification')
    try:
        log.debug(f"Loading verification key for '{role_name}' role from '{key_path}'.")
        key = import_ed25519_publickey_from_file(key_path)
    except StorageError:
        # This is to be expected. So it's not really an error.
        return {}
    return key

Load a verification key for a given role.