Module rugged.tuf.validators.metadata

Validator for Rugged TUF repository metadata

MetadataValidator borrows heavily from TrustedMetadataSet (https://github.com/theupdateframework/python-tuf/blob/develop/tuf/ngclient/_internal/trusted_metadata_set.py)

MetadataValidator implements most steps of the "Detailed client workflow" ( https://theupdateframework.github.io/specification/latest#detailed-client-workflow) in the TUF specification.

Loaded metadata can be accessed via index access with rolename as key (eg. validator[Root.type]) or, in the case of top-level metadata, using the helper properties (eg. validator.root).

The rules for top-level metadata are: * Metadata must be loaded in order: root -> timestamp -> snapshot -> targets -> (delegated targets). * Metadata can be loaded even if it is expired (or in the snapshot case if the meta info does not match): this is called "intermediate metadata". * Intermediate metadata can only be used to load newer versions of the same metadata: As an example an expired root can be used to load a new root. * Metadata is loadable only if metadata before it in loading order is loaded (and is not intermediate): As an example timestamp can be loaded if a final (non-expired) root has been loaded. * Metadata is not loadable if any metadata after it in loading order has been loaded: As an example new roots cannot be loaded if timestamp is loaded.

Classes

class MetadataValidator
Expand source code
class MetadataValidator():

    def __init__(self):
        """ Initialize Validator """
        self.metadata: Dict[str, Metadata] = {}
        self.reference_time = datetime.datetime.utcnow()

    # Helper properties for top level metadata
    @property
    def root(self) -> Metadata[Root]:
        """ Current root metadata """
        return self.metadata[Root.type]

    @property
    def timestamp(self) -> Optional[Metadata[Timestamp]]:
        """Current timestamp ``Metadata`` or ``None``"""
        return self.metadata.get(Timestamp.type)

    @property
    def snapshot(self) -> Optional[Metadata[Snapshot]]:
        """Current snapshot ``Metadata`` or ``None``"""
        return self.metadata.get(Snapshot.type)

    @property
    def targets(self) -> Optional[Metadata[Targets]]:
        """Current targets ``Metadata`` or ``None``"""
        return self.metadata.get(Targets.type)

    @property
    def bins(self) -> Optional[Metadata[Targets]]:
        """Current bins ``Metadata`` or ``None``"""
        return self.metadata.get('bins')

    def validate_root(self):
        """ Validate the 'root' role metadata. """
        # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
        # to verify signatures are valid for this metadata.
        log.debug("Loading initial trusted root metadata.")
        try:
            metadata = self._load_role_metadata('root')
            self._load_trusted_root(metadata)
            # @TODO: iterate over subsequent root metadata.
            # For now, ensure trusted root isn't expired.
            if self.root.signed.is_expired(self.reference_time):
                raise ExpiredMetadataError(f"{self.root.signed.version}.root.json is expired")
        except (RepositoryError, UnsignedMetadataError) as e:
            log_exception(e)
            raise RuggedMetadataError("Failed to validate 'root' role metadata.")

    def validate_timestamp(self):
        """ Validate the 'timestamp' role metadata. """
        # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
        # to verify signatures are valid for this metadata.
        try:
            metadata = self._load_role_metadata('timestamp')
            self._update_timestamp(metadata)
        except (RepositoryError, UnsignedMetadataError) as e:
            log_exception(e)
            raise RuggedMetadataError("Failed to update 'timestamp' role metadata.")
        # §2.1.4. Timestamp role & §4.6. File formats: timestamp.json
        error = ''
        if self.timestamp.signed.snapshot_meta.hashes is None:
            error = "'timestamp' metadata must contain hashes of 'snapshot.json'."
        elif self.timestamp.signed.snapshot_meta.length is None:
            error = "'timestamp' metadata must contain length of 'snapshot.json'."
        if error:
            log.error(error)
            raise RuggedMetadataError(error)

    def validate_snapshot(self):
        """ Validate the 'snapshot' role metadata. """
        # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
        # to verify signatures are valid for this metadata.
        metadata = self._load_role_metadata('snapshot')
        try:
            self._update_snapshot(metadata)
        except RepositoryError as e:
            log_exception(e)
            raise RuggedMetadataError("Failed to validate 'snapshot' role metadata.")
        # §2.1.3. Snapshot role & §4.4. File formats: snapshot.json
        error = ''
        for filename, fileinfo in self.snapshot.signed.meta.items():
            if fileinfo.length is None:
                error = f"'snapshot' metadata must contain length of '{filename}'."
            elif fileinfo.hashes is None:
                error = f"'snapshot' metadata must contain hashes of '{filename}'."
            if error:
                log.error(error)
                raise RuggedMetadataError(error)

    def validate_targets(self):
        """ Validate the 'targets' role metadata. """
        # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
        # to verify signatures are valid for this metadata.
        metadata = self._load_role_metadata('targets')
        try:
            self._update_targets(metadata)
        except RepositoryError as e:
            log_exception(e)
            raise RuggedMetadataError("Failed to validate 'targets' role metadata.")

    def validate_hashed_bins(self):
        """ Validate the hashed bin roles' metadata. """
        # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
        # to verify signatures are valid for this metadata.
        metadata = self._load_role_metadata('bins')
        try:
            self._update_hashed_bins(metadata)
        except RepositoryError as e:
            log_exception(e)
            raise RuggedMetadataError("Failed to validate hashed-bins roles' metadata.")

    def _load_role_metadata(self, role):
        """ Load metadata for a role from its file. """
        metadata_file_path = self._get_metadata_path(role)
        log.debug(f"Loading '{role}' metadata from '{metadata_file_path}'")
        with open(metadata_file_path, "rb") as file:
            return file.read()

    def _get_metadata_path(self, role):
        """ Determine the path for a given role's metadata file. """
        metadata_dir = config['repo_metadata_path'].get()
        if role == 'root' or config['consistent_snapshot'].get():
            base_filename = f"{role}.json"
            filename = self._get_versioned_metadata_filename(base_filename)
            return path.join(metadata_dir, filename)
        return path.join(metadata_dir, f"{role}.json")

    # Borrowed directly from `repo.py`
    def _get_versioned_metadata_filename(self, filename):
        """ Find the most recent versioned filename for a given role's metadata. """
        metadata_dir = config['repo_metadata_path'].get()
        chdir(metadata_dir)
        files = glob(f"*.{filename}")
        # Sort numbers naturally. Based on magic from https://stackoverflow.com/a/33159707.
        files.sort(key=lambda f: int(sub('\\D', '', f)))
        # Return the latest version.
        return files[-1]

    def _load_trusted_root(self, data: bytes) -> None:
        """ Verifies and loads trusted root metadata.

        Note that an expired initial root is considered valid: expiry is
        only checked for the final root in `_update_timestamp()`.
        """
        log.debug("Loading trusted root metadata")
        new_root = Metadata[Root].from_bytes(data)

        if new_root.signed.type != Root.type:
            raise RepositoryError(
                f"Expected 'root', got '{new_root.signed.type}'"
            )

        log.debug("Verifying trusted root metadata")

        new_root.verify_delegate(Root.type, new_root)

        self.metadata[Root.type] = new_root
        log.debug("Loaded trusted root v%d", new_root.signed.version)

    def _update_timestamp(self, data: bytes) -> Metadata[Timestamp]:
        """ Verifies and loads timestamp metadata.

        Note that an intermediate timestamp is allowed to be expired:
        ``Validator`` will throw an ``ExpiredMetadataError`` in
        this case but the intermediate timestamp will be loaded. This way
        a newer timestamp can still be loaded (and the intermediate
        timestamp will be used for rollback protection). Expired timestamp
        will prevent loading snapshot metadata.

        Args:
            data: Unverified new timestamp metadata as bytes

        Raises:
            RuntimeError: This function is called after updating snapshot.
            RepositoryError: Metadata failed to load or verify as final
                timestamp. The actual error type and content will contain
                more details.

        Returns:
            Deserialized and verified timestamp ``Metadata`` object
        """
        if self.snapshot is not None:
            raise RuntimeError("Cannot update timestamp after snapshot")

        # client workflow 5.3.10: Make sure final root is not expired.
        if self.root.signed.is_expired(self.reference_time):
            raise ExpiredMetadataError("Final 'root.json' is expired")
        # No need to check for 5.3.11 (fast forward attack recovery):
        # timestamp/snapshot can not yet be loaded at this point

        new_timestamp = Metadata[Timestamp].from_bytes(data)

        if new_timestamp.signed.type != Timestamp.type:
            raise RepositoryError(
                f"Expected 'timestamp', got '{new_timestamp.signed.type}'"
            )

        self.root.verify_delegate(Timestamp.type, new_timestamp)

        # If an existing trusted timestamp is updated,
        # check for a rollback attack
        if self.timestamp is not None:
            # Prevent rolling back timestamp version
            if new_timestamp.signed.version < self.timestamp.signed.version:
                raise BadVersionNumberError(
                    f"New timestamp version {new_timestamp.signed.version} must"
                    f" be >= {self.timestamp.signed.version}"
                )
            # Prevent rolling back snapshot version
            snapshot_meta = self.timestamp.signed.snapshot_meta
            new_snapshot_meta = new_timestamp.signed.snapshot_meta
            if new_snapshot_meta.version < snapshot_meta.version:
                raise BadVersionNumberError(
                    f"New snapshot version must be >= {snapshot_meta.version}"
                    f", got version {new_snapshot_meta.version}"
                )

        # expiry not checked to allow old timestamp to be used for rollback
        # protection of new timestamp: expiry is checked in update_snapshot()

        self.metadata[Timestamp.type] = new_timestamp
        log.debug("Updated timestamp v%d", new_timestamp.signed.version)

        # timestamp is loaded: raise if it is not valid _final_ timestamp
        self._check_final_timestamp()

        return new_timestamp

    def _check_final_timestamp(self) -> None:
        """Raise if timestamp is expired"""
        if self.timestamp is None:
            raise AssertionError("Expected Timestamp metadata is missing.")
        if self.timestamp.signed.is_expired(self.reference_time):
            raise ExpiredMetadataError("timestamp.json is expired")

    def _update_snapshot(
        self, data: bytes, trusted: Optional[bool] = False
    ) -> Metadata[Snapshot]:
        """ Verifies and loads snapshot metadata.

        Note that an intermediate snapshot is allowed to be expired and version
        is allowed to not match timestamp meta version: ``TrustedMetadataSet``
        will throw an ``ExpiredMetadataError``/``BadVersionNumberError`` in
        these cases but the intermediate snapshot will be loaded. This way a
        newer snapshot can still be loaded (and the intermediate snapshot will
        be used for rollback protection). Expired snapshot or snapshot that
        does not match timestamp meta version will prevent loading targets.

        Args:
            data: Unverified new snapshot metadata as bytes
            trusted: ``True`` if data has at some point been verified by
                ``Validator`` as a valid snapshot. Purpose of trusted
                is to allow loading of locally stored snapshot as intermediate
                snapshot even if hashes in current timestamp meta no longer
                match data. Default is False.

        Raises:
            RuntimeError: This function is called before updating timestamp
                or after updating targets.
            RepositoryError: Data failed to load or verify as final snapshot.
                The actual error type and content will contain more details.

        Returns:
            Deserialized and verified snapshot ``Metadata`` object
        """

        if self.timestamp is None:
            raise RuntimeError("Cannot update snapshot before timestamp")
        if self.targets is not None:
            raise RuntimeError("Cannot update snapshot after targets")
        log.debug("Updating snapshot metadata.")

        # Snapshot cannot be loaded if final timestamp is expired
        self._check_final_timestamp()

        snapshot_meta = self.timestamp.signed.snapshot_meta

        # Verify non-trusted data against the hashes in timestamp, if any.
        # Trusted snapshot data has already been verified once.
        if not trusted:
            snapshot_meta.verify_length_and_hashes(data)

        new_snapshot = Metadata[Snapshot].from_bytes(data)

        if new_snapshot.signed.type != Snapshot.type:
            raise RepositoryError(
                f"Expected 'snapshot', got '{new_snapshot.signed.type}'"
            )

        self.root.verify_delegate(Snapshot.type, new_snapshot)

        # version not checked against meta version to allow old snapshot to be
        # used in rollback protection: it is checked when targets is updated

        # If an existing trusted snapshot is updated, check for rollback attack
        if self.snapshot is not None:
            for filename, fileinfo in self.snapshot.signed.meta.items():
                new_fileinfo = new_snapshot.signed.meta.get(filename)

                # Prevent removal of any metadata in meta
                if new_fileinfo is None:
                    raise RepositoryError(
                        f"New snapshot is missing info for '{filename}'"
                    )

                # Prevent rollback of any metadata versions
                if new_fileinfo.version < fileinfo.version:
                    raise BadVersionNumberError(
                        f"Expected {filename} version "
                        f"{new_fileinfo.version}, got {fileinfo.version}."
                    )

        # expiry not checked to allow old snapshot to be used for rollback
        # protection of new snapshot: it is checked when targets is updated

        self.metadata[Snapshot.type] = new_snapshot
        log.debug("Updated snapshot v%d", new_snapshot.signed.version)

        # snapshot is loaded, but we raise if it's not valid _final_ snapshot
        self._check_final_snapshot()

        return new_snapshot

    def _check_final_snapshot(self) -> None:
        """Raise if snapshot is expired or meta version does not match"""

        if self.snapshot is None:
            raise RepositoryError("Snapshot metadata is not defined.")
        if self.timestamp is None:
            raise RepositoryError("Timestamp metadata is not defined.")
        if self.snapshot.signed.is_expired(self.reference_time):
            raise ExpiredMetadataError("snapshot.json is expired")
        snapshot_meta = self.timestamp.signed.snapshot_meta
        if self.snapshot.signed.version != snapshot_meta.version:
            raise BadVersionNumberError(
                f"Expected snapshot version {snapshot_meta.version}, "
                f"got {self.snapshot.signed.version}"
            )

    def _update_targets(self, data: bytes) -> Metadata[Targets]:
        """ Verifies and loads top-level targets metadata.

        Args:
            data: Unverified new targets metadata as bytes

        Raises:
            RepositoryError: Metadata failed to load or verify. The actual
                error type and content will contain more details.

        Returns:
            Deserialized and verified targets ``Metadata`` object
        """

        if self.snapshot is None:
            raise RuntimeError("Cannot load targets before snapshot")

        # Targets cannot be loaded if final snapshot is expired or its version
        # does not match meta version in timestamp
        self._check_final_snapshot()

        return self._update_delegated_targets(data, Targets.type, Root.type)

    def _update_hashed_bins(self, data: bytes) -> Metadata[Targets]:
        """ Verifies and loads hashed bins metadata.

        Args:
            data: Unverified new hashed-bins targets metadata as bytes

        Raises:
            RepositoryError: Metadata failed to load or verify. The actual
                error type and content will contain more details.

        Returns:
            Deserialized and verified targets ``Metadata`` object
        """

        hashed_bins = self._update_delegated_targets(data, 'bins', Targets.type)
        for bin_n_name in hashed_bins.signed.delegations.roles:
            bin_n_data = self._load_role_metadata(bin_n_name)
            self._update_delegated_targets(bin_n_data, bin_n_name, 'bins')
            log.info(f"Metadata for the '{bin_n_name}' role is valid.")
        return hashed_bins

    def _update_delegated_targets(
        self, data: bytes, role_name: str, delegator_name: str
    ) -> Metadata[Targets]:
        """ Verifies and loads metadata for target ``role_name``.

        Args:
            data: Unverified new metadata as bytes
            role_name: Role name of the new metadata
            delegator_name: Name of the role delegating to the new metadata

        Raises:
            RuntimeError: This function is called before updating snapshot.
            RepositoryError: Metadata failed to load or verify. The actual
                error type and content will contain more details.

        Returns:
            Deserialized and verified targets ``Metadata`` object
        """
        delegator: Optional[Metadata] = getattr(self, delegator_name)
        if delegator is None:
            raise RuntimeError(f"Cannot load '{role_name}' role before '{delegator_name}' delegator.")

        log.debug("Updating %s delegated by %s", role_name, delegator_name)

        # Verify against the hashes in snapshot, if any
        meta = self.snapshot.signed.meta.get(f"{role_name}.json")
        if meta is None:
            raise RepositoryError(
                f"Snapshot does not contain information for '{role_name}'"
            )

        meta.verify_length_and_hashes(data)

        new_delegate = Metadata[Targets].from_bytes(data)

        if new_delegate.signed.type != Targets.type:
            raise RepositoryError(
                f"Expected 'targets', got '{new_delegate.signed.type}'"
            )

        delegator.verify_delegate(role_name, new_delegate)

        version = new_delegate.signed.version

        log.debug(f"Version in {role_name}.json is '{version}'.")
        log.debug(f"Expected version in snapshot.json for {role_name} is '{meta.version}'.")

        if version != meta.version:
            log.error(f"Metadata for the '{role_name}' role is not valid.")
            raise BadVersionNumberError(
                f"Expected {role_name} v{meta.version}, got v{version}."
            )

        if new_delegate.signed.is_expired(self.reference_time):
            raise ExpiredMetadataError(f"{role_name}.json is expired")

        self.metadata[role_name] = new_delegate
        log.debug("Updated %s v%d", role_name, version)

        return new_delegate

Initialize Validator

Instance variables

prop bins : tuf.api.metadata.Metadata[tuf.api.metadata.Targets] | None
Expand source code
@property
def bins(self) -> Optional[Metadata[Targets]]:
    """Current bins ``Metadata`` or ``None``"""
    return self.metadata.get('bins')

Current bins Metadata or None

prop root : tuf.api.metadata.Metadata[tuf.api.metadata.Root]
Expand source code
@property
def root(self) -> Metadata[Root]:
    """ Current root metadata """
    return self.metadata[Root.type]

Current root metadata

prop snapshot : tuf.api.metadata.Metadata[tuf.api.metadata.Snapshot] | None
Expand source code
@property
def snapshot(self) -> Optional[Metadata[Snapshot]]:
    """Current snapshot ``Metadata`` or ``None``"""
    return self.metadata.get(Snapshot.type)

Current snapshot Metadata or None

prop targets : tuf.api.metadata.Metadata[tuf.api.metadata.Targets] | None
Expand source code
@property
def targets(self) -> Optional[Metadata[Targets]]:
    """Current targets ``Metadata`` or ``None``"""
    return self.metadata.get(Targets.type)

Current targets Metadata or None

prop timestamp : tuf.api.metadata.Metadata[tuf.api.metadata.Timestamp] | None
Expand source code
@property
def timestamp(self) -> Optional[Metadata[Timestamp]]:
    """Current timestamp ``Metadata`` or ``None``"""
    return self.metadata.get(Timestamp.type)

Current timestamp Metadata or None

Methods

def validate_hashed_bins(self)
Expand source code
def validate_hashed_bins(self):
    """ Validate the hashed bin roles' metadata. """
    # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
    # to verify signatures are valid for this metadata.
    metadata = self._load_role_metadata('bins')
    try:
        self._update_hashed_bins(metadata)
    except RepositoryError as e:
        log_exception(e)
        raise RuggedMetadataError("Failed to validate hashed-bins roles' metadata.")

Validate the hashed bin roles' metadata.

def validate_root(self)
Expand source code
def validate_root(self):
    """ Validate the 'root' role metadata. """
    # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
    # to verify signatures are valid for this metadata.
    log.debug("Loading initial trusted root metadata.")
    try:
        metadata = self._load_role_metadata('root')
        self._load_trusted_root(metadata)
        # @TODO: iterate over subsequent root metadata.
        # For now, ensure trusted root isn't expired.
        if self.root.signed.is_expired(self.reference_time):
            raise ExpiredMetadataError(f"{self.root.signed.version}.root.json is expired")
    except (RepositoryError, UnsignedMetadataError) as e:
        log_exception(e)
        raise RuggedMetadataError("Failed to validate 'root' role metadata.")

Validate the 'root' role metadata.

def validate_snapshot(self)
Expand source code
def validate_snapshot(self):
    """ Validate the 'snapshot' role metadata. """
    # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
    # to verify signatures are valid for this metadata.
    metadata = self._load_role_metadata('snapshot')
    try:
        self._update_snapshot(metadata)
    except RepositoryError as e:
        log_exception(e)
        raise RuggedMetadataError("Failed to validate 'snapshot' role metadata.")
    # §2.1.3. Snapshot role & §4.4. File formats: snapshot.json
    error = ''
    for filename, fileinfo in self.snapshot.signed.meta.items():
        if fileinfo.length is None:
            error = f"'snapshot' metadata must contain length of '{filename}'."
        elif fileinfo.hashes is None:
            error = f"'snapshot' metadata must contain hashes of '{filename}'."
        if error:
            log.error(error)
            raise RuggedMetadataError(error)

Validate the 'snapshot' role metadata.

def validate_targets(self)
Expand source code
def validate_targets(self):
    """ Validate the 'targets' role metadata. """
    # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
    # to verify signatures are valid for this metadata.
    metadata = self._load_role_metadata('targets')
    try:
        self._update_targets(metadata)
    except RepositoryError as e:
        log_exception(e)
        raise RuggedMetadataError("Failed to validate 'targets' role metadata.")

Validate the 'targets' role metadata.

def validate_timestamp(self)
Expand source code
def validate_timestamp(self):
    """ Validate the 'timestamp' role metadata. """
    # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()`
    # to verify signatures are valid for this metadata.
    try:
        metadata = self._load_role_metadata('timestamp')
        self._update_timestamp(metadata)
    except (RepositoryError, UnsignedMetadataError) as e:
        log_exception(e)
        raise RuggedMetadataError("Failed to update 'timestamp' role metadata.")
    # §2.1.4. Timestamp role & §4.6. File formats: timestamp.json
    error = ''
    if self.timestamp.signed.snapshot_meta.hashes is None:
        error = "'timestamp' metadata must contain hashes of 'snapshot.json'."
    elif self.timestamp.signed.snapshot_meta.length is None:
        error = "'timestamp' metadata must contain length of 'snapshot.json'."
    if error:
        log.error(error)
        raise RuggedMetadataError(error)

Validate the 'timestamp' role metadata.