Module rugged.tuf.validators.metadata
Validator for Rugged TUF repository metadata
MetadataValidator
borrows heavily from TrustedMetadataSet
(https://github.com/theupdateframework/python-tuf/blob/develop/tuf/ngclient/_internal/trusted_metadata_set.py)
MetadataValidator
implements most steps of the "Detailed client workflow" (
https://theupdateframework.github.io/specification/latest#detailed-client-workflow)
in the TUF specification.
Loaded metadata can be accessed via index access with rolename as key
(eg. validator[Root.type]
) or, in the case of top-level metadata, using the
helper properties (eg. validator.root
).
The rules for top-level metadata are: * Metadata must be loaded in order: root -> timestamp -> snapshot -> targets -> (delegated targets). * Metadata can be loaded even if it is expired (or in the snapshot case if the meta info does not match): this is called "intermediate metadata". * Intermediate metadata can only be used to load newer versions of the same metadata: As an example an expired root can be used to load a new root. * Metadata is loadable only if metadata before it in loading order is loaded (and is not intermediate): As an example timestamp can be loaded if a final (non-expired) root has been loaded. * Metadata is not loadable if any metadata after it in loading order has been loaded: As an example new roots cannot be loaded if timestamp is loaded.
Classes
class MetadataValidator
-
Expand source code
class MetadataValidator(): def __init__(self): """ Initialize Validator """ self.metadata: Dict[str, Metadata] = {} self.reference_time = datetime.datetime.utcnow() # Helper properties for top level metadata @property def root(self) -> Metadata[Root]: """ Current root metadata """ return self.metadata[Root.type] @property def timestamp(self) -> Optional[Metadata[Timestamp]]: """Current timestamp ``Metadata`` or ``None``""" return self.metadata.get(Timestamp.type) @property def snapshot(self) -> Optional[Metadata[Snapshot]]: """Current snapshot ``Metadata`` or ``None``""" return self.metadata.get(Snapshot.type) @property def targets(self) -> Optional[Metadata[Targets]]: """Current targets ``Metadata`` or ``None``""" return self.metadata.get(Targets.type) @property def bins(self) -> Optional[Metadata[Targets]]: """Current bins ``Metadata`` or ``None``""" return self.metadata.get('bins') def validate_root(self): """ Validate the 'root' role metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. log.debug("Loading initial trusted root metadata.") try: metadata = self._load_role_metadata('root') self._load_trusted_root(metadata) # @TODO: iterate over subsequent root metadata. # For now, ensure trusted root isn't expired. if self.root.signed.is_expired(self.reference_time): raise ExpiredMetadataError(f"{self.root.signed.version}.root.json is expired") except (RepositoryError, UnsignedMetadataError) as e: log_exception(e) raise RuggedMetadataError("Failed to validate 'root' role metadata.") def validate_timestamp(self): """ Validate the 'timestamp' role metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. try: metadata = self._load_role_metadata('timestamp') self._update_timestamp(metadata) except (RepositoryError, UnsignedMetadataError) as e: log_exception(e) raise RuggedMetadataError("Failed to update 'timestamp' role metadata.") # §2.1.4. Timestamp role & §4.6. File formats: timestamp.json error = '' if self.timestamp.signed.snapshot_meta.hashes is None: error = "'timestamp' metadata must contain hashes of 'snapshot.json'." elif self.timestamp.signed.snapshot_meta.length is None: error = "'timestamp' metadata must contain length of 'snapshot.json'." if error: log.error(error) raise RuggedMetadataError(error) def validate_snapshot(self): """ Validate the 'snapshot' role metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. metadata = self._load_role_metadata('snapshot') try: self._update_snapshot(metadata) except RepositoryError as e: log_exception(e) raise RuggedMetadataError("Failed to validate 'snapshot' role metadata.") # §2.1.3. Snapshot role & §4.4. File formats: snapshot.json error = '' for filename, fileinfo in self.snapshot.signed.meta.items(): if fileinfo.length is None: error = f"'snapshot' metadata must contain length of '{filename}'." elif fileinfo.hashes is None: error = f"'snapshot' metadata must contain hashes of '{filename}'." if error: log.error(error) raise RuggedMetadataError(error) def validate_targets(self): """ Validate the 'targets' role metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. metadata = self._load_role_metadata('targets') try: self._update_targets(metadata) except RepositoryError as e: log_exception(e) raise RuggedMetadataError("Failed to validate 'targets' role metadata.") def validate_hashed_bins(self): """ Validate the hashed bin roles' metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. metadata = self._load_role_metadata('bins') try: self._update_hashed_bins(metadata) except RepositoryError as e: log_exception(e) raise RuggedMetadataError("Failed to validate hashed-bins roles' metadata.") def _load_role_metadata(self, role): """ Load metadata for a role from its file. """ metadata_file_path = self._get_metadata_path(role) log.debug(f"Loading '{role}' metadata from '{metadata_file_path}'") with open(metadata_file_path, "rb") as file: return file.read() def _get_metadata_path(self, role): """ Determine the path for a given role's metadata file. """ metadata_dir = config['repo_metadata_path'].get() if role == 'root' or config['consistent_snapshot'].get(): base_filename = f"{role}.json" filename = self._get_versioned_metadata_filename(base_filename) return path.join(metadata_dir, filename) return path.join(metadata_dir, f"{role}.json") # Borrowed directly from `repo.py` def _get_versioned_metadata_filename(self, filename): """ Find the most recent versioned filename for a given role's metadata. """ metadata_dir = config['repo_metadata_path'].get() chdir(metadata_dir) files = glob(f"*.{filename}") # Sort numbers naturally. Based on magic from https://stackoverflow.com/a/33159707. files.sort(key=lambda f: int(sub('\\D', '', f))) # Return the latest version. return files[-1] def _load_trusted_root(self, data: bytes) -> None: """ Verifies and loads trusted root metadata. Note that an expired initial root is considered valid: expiry is only checked for the final root in `_update_timestamp()`. """ log.debug("Loading trusted root metadata") new_root = Metadata[Root].from_bytes(data) if new_root.signed.type != Root.type: raise RepositoryError( f"Expected 'root', got '{new_root.signed.type}'" ) log.debug("Verifying trusted root metadata") new_root.verify_delegate(Root.type, new_root) self.metadata[Root.type] = new_root log.debug("Loaded trusted root v%d", new_root.signed.version) def _update_timestamp(self, data: bytes) -> Metadata[Timestamp]: """ Verifies and loads timestamp metadata. Note that an intermediate timestamp is allowed to be expired: ``Validator`` will throw an ``ExpiredMetadataError`` in this case but the intermediate timestamp will be loaded. This way a newer timestamp can still be loaded (and the intermediate timestamp will be used for rollback protection). Expired timestamp will prevent loading snapshot metadata. Args: data: Unverified new timestamp metadata as bytes Raises: RuntimeError: This function is called after updating snapshot. RepositoryError: Metadata failed to load or verify as final timestamp. The actual error type and content will contain more details. Returns: Deserialized and verified timestamp ``Metadata`` object """ if self.snapshot is not None: raise RuntimeError("Cannot update timestamp after snapshot") # client workflow 5.3.10: Make sure final root is not expired. if self.root.signed.is_expired(self.reference_time): raise ExpiredMetadataError("Final 'root.json' is expired") # No need to check for 5.3.11 (fast forward attack recovery): # timestamp/snapshot can not yet be loaded at this point new_timestamp = Metadata[Timestamp].from_bytes(data) if new_timestamp.signed.type != Timestamp.type: raise RepositoryError( f"Expected 'timestamp', got '{new_timestamp.signed.type}'" ) self.root.verify_delegate(Timestamp.type, new_timestamp) # If an existing trusted timestamp is updated, # check for a rollback attack if self.timestamp is not None: # Prevent rolling back timestamp version if new_timestamp.signed.version < self.timestamp.signed.version: raise BadVersionNumberError( f"New timestamp version {new_timestamp.signed.version} must" f" be >= {self.timestamp.signed.version}" ) # Prevent rolling back snapshot version snapshot_meta = self.timestamp.signed.snapshot_meta new_snapshot_meta = new_timestamp.signed.snapshot_meta if new_snapshot_meta.version < snapshot_meta.version: raise BadVersionNumberError( f"New snapshot version must be >= {snapshot_meta.version}" f", got version {new_snapshot_meta.version}" ) # expiry not checked to allow old timestamp to be used for rollback # protection of new timestamp: expiry is checked in update_snapshot() self.metadata[Timestamp.type] = new_timestamp log.debug("Updated timestamp v%d", new_timestamp.signed.version) # timestamp is loaded: raise if it is not valid _final_ timestamp self._check_final_timestamp() return new_timestamp def _check_final_timestamp(self) -> None: """Raise if timestamp is expired""" if self.timestamp is None: raise AssertionError("Expected Timestamp metadata is missing.") if self.timestamp.signed.is_expired(self.reference_time): raise ExpiredMetadataError("timestamp.json is expired") def _update_snapshot( self, data: bytes, trusted: Optional[bool] = False ) -> Metadata[Snapshot]: """ Verifies and loads snapshot metadata. Note that an intermediate snapshot is allowed to be expired and version is allowed to not match timestamp meta version: ``TrustedMetadataSet`` will throw an ``ExpiredMetadataError``/``BadVersionNumberError`` in these cases but the intermediate snapshot will be loaded. This way a newer snapshot can still be loaded (and the intermediate snapshot will be used for rollback protection). Expired snapshot or snapshot that does not match timestamp meta version will prevent loading targets. Args: data: Unverified new snapshot metadata as bytes trusted: ``True`` if data has at some point been verified by ``Validator`` as a valid snapshot. Purpose of trusted is to allow loading of locally stored snapshot as intermediate snapshot even if hashes in current timestamp meta no longer match data. Default is False. Raises: RuntimeError: This function is called before updating timestamp or after updating targets. RepositoryError: Data failed to load or verify as final snapshot. The actual error type and content will contain more details. Returns: Deserialized and verified snapshot ``Metadata`` object """ if self.timestamp is None: raise RuntimeError("Cannot update snapshot before timestamp") if self.targets is not None: raise RuntimeError("Cannot update snapshot after targets") log.debug("Updating snapshot metadata.") # Snapshot cannot be loaded if final timestamp is expired self._check_final_timestamp() snapshot_meta = self.timestamp.signed.snapshot_meta # Verify non-trusted data against the hashes in timestamp, if any. # Trusted snapshot data has already been verified once. if not trusted: snapshot_meta.verify_length_and_hashes(data) new_snapshot = Metadata[Snapshot].from_bytes(data) if new_snapshot.signed.type != Snapshot.type: raise RepositoryError( f"Expected 'snapshot', got '{new_snapshot.signed.type}'" ) self.root.verify_delegate(Snapshot.type, new_snapshot) # version not checked against meta version to allow old snapshot to be # used in rollback protection: it is checked when targets is updated # If an existing trusted snapshot is updated, check for rollback attack if self.snapshot is not None: for filename, fileinfo in self.snapshot.signed.meta.items(): new_fileinfo = new_snapshot.signed.meta.get(filename) # Prevent removal of any metadata in meta if new_fileinfo is None: raise RepositoryError( f"New snapshot is missing info for '{filename}'" ) # Prevent rollback of any metadata versions if new_fileinfo.version < fileinfo.version: raise BadVersionNumberError( f"Expected {filename} version " f"{new_fileinfo.version}, got {fileinfo.version}." ) # expiry not checked to allow old snapshot to be used for rollback # protection of new snapshot: it is checked when targets is updated self.metadata[Snapshot.type] = new_snapshot log.debug("Updated snapshot v%d", new_snapshot.signed.version) # snapshot is loaded, but we raise if it's not valid _final_ snapshot self._check_final_snapshot() return new_snapshot def _check_final_snapshot(self) -> None: """Raise if snapshot is expired or meta version does not match""" if self.snapshot is None: raise RepositoryError("Snapshot metadata is not defined.") if self.timestamp is None: raise RepositoryError("Timestamp metadata is not defined.") if self.snapshot.signed.is_expired(self.reference_time): raise ExpiredMetadataError("snapshot.json is expired") snapshot_meta = self.timestamp.signed.snapshot_meta if self.snapshot.signed.version != snapshot_meta.version: raise BadVersionNumberError( f"Expected snapshot version {snapshot_meta.version}, " f"got {self.snapshot.signed.version}" ) def _update_targets(self, data: bytes) -> Metadata[Targets]: """ Verifies and loads top-level targets metadata. Args: data: Unverified new targets metadata as bytes Raises: RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. Returns: Deserialized and verified targets ``Metadata`` object """ if self.snapshot is None: raise RuntimeError("Cannot load targets before snapshot") # Targets cannot be loaded if final snapshot is expired or its version # does not match meta version in timestamp self._check_final_snapshot() return self._update_delegated_targets(data, Targets.type, Root.type) def _update_hashed_bins(self, data: bytes) -> Metadata[Targets]: """ Verifies and loads hashed bins metadata. Args: data: Unverified new hashed-bins targets metadata as bytes Raises: RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. Returns: Deserialized and verified targets ``Metadata`` object """ hashed_bins = self._update_delegated_targets(data, 'bins', Targets.type) for bin_n_name in hashed_bins.signed.delegations.roles: bin_n_data = self._load_role_metadata(bin_n_name) self._update_delegated_targets(bin_n_data, bin_n_name, 'bins') log.info(f"Metadata for the '{bin_n_name}' role is valid.") return hashed_bins def _update_delegated_targets( self, data: bytes, role_name: str, delegator_name: str ) -> Metadata[Targets]: """ Verifies and loads metadata for target ``role_name``. Args: data: Unverified new metadata as bytes role_name: Role name of the new metadata delegator_name: Name of the role delegating to the new metadata Raises: RuntimeError: This function is called before updating snapshot. RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. Returns: Deserialized and verified targets ``Metadata`` object """ delegator: Optional[Metadata] = getattr(self, delegator_name) if delegator is None: raise RuntimeError(f"Cannot load '{role_name}' role before '{delegator_name}' delegator.") log.debug("Updating %s delegated by %s", role_name, delegator_name) # Verify against the hashes in snapshot, if any meta = self.snapshot.signed.meta.get(f"{role_name}.json") if meta is None: raise RepositoryError( f"Snapshot does not contain information for '{role_name}'" ) meta.verify_length_and_hashes(data) new_delegate = Metadata[Targets].from_bytes(data) if new_delegate.signed.type != Targets.type: raise RepositoryError( f"Expected 'targets', got '{new_delegate.signed.type}'" ) delegator.verify_delegate(role_name, new_delegate) version = new_delegate.signed.version log.debug(f"Version in {role_name}.json is '{version}'.") log.debug(f"Expected version in snapshot.json for {role_name} is '{meta.version}'.") if version != meta.version: log.error(f"Metadata for the '{role_name}' role is not valid.") raise BadVersionNumberError( f"Expected {role_name} v{meta.version}, got v{version}." ) if new_delegate.signed.is_expired(self.reference_time): raise ExpiredMetadataError(f"{role_name}.json is expired") self.metadata[role_name] = new_delegate log.debug("Updated %s v%d", role_name, version) return new_delegate
Initialize Validator
Instance variables
prop bins : tuf.api.metadata.Metadata[tuf.api.metadata.Targets] | None
-
Expand source code
@property def bins(self) -> Optional[Metadata[Targets]]: """Current bins ``Metadata`` or ``None``""" return self.metadata.get('bins')
Current bins
Metadata
orNone
prop root : tuf.api.metadata.Metadata[tuf.api.metadata.Root]
-
Expand source code
@property def root(self) -> Metadata[Root]: """ Current root metadata """ return self.metadata[Root.type]
Current root metadata
prop snapshot : tuf.api.metadata.Metadata[tuf.api.metadata.Snapshot] | None
-
Expand source code
@property def snapshot(self) -> Optional[Metadata[Snapshot]]: """Current snapshot ``Metadata`` or ``None``""" return self.metadata.get(Snapshot.type)
Current snapshot
Metadata
orNone
prop targets : tuf.api.metadata.Metadata[tuf.api.metadata.Targets] | None
-
Expand source code
@property def targets(self) -> Optional[Metadata[Targets]]: """Current targets ``Metadata`` or ``None``""" return self.metadata.get(Targets.type)
Current targets
Metadata
orNone
prop timestamp : tuf.api.metadata.Metadata[tuf.api.metadata.Timestamp] | None
-
Expand source code
@property def timestamp(self) -> Optional[Metadata[Timestamp]]: """Current timestamp ``Metadata`` or ``None``""" return self.metadata.get(Timestamp.type)
Current timestamp
Metadata
orNone
Methods
def validate_hashed_bins(self)
-
Expand source code
def validate_hashed_bins(self): """ Validate the hashed bin roles' metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. metadata = self._load_role_metadata('bins') try: self._update_hashed_bins(metadata) except RepositoryError as e: log_exception(e) raise RuggedMetadataError("Failed to validate hashed-bins roles' metadata.")
Validate the hashed bin roles' metadata.
def validate_root(self)
-
Expand source code
def validate_root(self): """ Validate the 'root' role metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. log.debug("Loading initial trusted root metadata.") try: metadata = self._load_role_metadata('root') self._load_trusted_root(metadata) # @TODO: iterate over subsequent root metadata. # For now, ensure trusted root isn't expired. if self.root.signed.is_expired(self.reference_time): raise ExpiredMetadataError(f"{self.root.signed.version}.root.json is expired") except (RepositoryError, UnsignedMetadataError) as e: log_exception(e) raise RuggedMetadataError("Failed to validate 'root' role metadata.")
Validate the 'root' role metadata.
def validate_snapshot(self)
-
Expand source code
def validate_snapshot(self): """ Validate the 'snapshot' role metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. metadata = self._load_role_metadata('snapshot') try: self._update_snapshot(metadata) except RepositoryError as e: log_exception(e) raise RuggedMetadataError("Failed to validate 'snapshot' role metadata.") # §2.1.3. Snapshot role & §4.4. File formats: snapshot.json error = '' for filename, fileinfo in self.snapshot.signed.meta.items(): if fileinfo.length is None: error = f"'snapshot' metadata must contain length of '{filename}'." elif fileinfo.hashes is None: error = f"'snapshot' metadata must contain hashes of '{filename}'." if error: log.error(error) raise RuggedMetadataError(error)
Validate the 'snapshot' role metadata.
def validate_targets(self)
-
Expand source code
def validate_targets(self): """ Validate the 'targets' role metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. metadata = self._load_role_metadata('targets') try: self._update_targets(metadata) except RepositoryError as e: log_exception(e) raise RuggedMetadataError("Failed to validate 'targets' role metadata.")
Validate the 'targets' role metadata.
def validate_timestamp(self)
-
Expand source code
def validate_timestamp(self): """ Validate the 'timestamp' role metadata. """ # @TODO: use `rugged.commands.lib.signatures.verify_signature_is_valid_for_key()` # to verify signatures are valid for this metadata. try: metadata = self._load_role_metadata('timestamp') self._update_timestamp(metadata) except (RepositoryError, UnsignedMetadataError) as e: log_exception(e) raise RuggedMetadataError("Failed to update 'timestamp' role metadata.") # §2.1.4. Timestamp role & §4.6. File formats: timestamp.json error = '' if self.timestamp.signed.snapshot_meta.hashes is None: error = "'timestamp' metadata must contain hashes of 'snapshot.json'." elif self.timestamp.signed.snapshot_meta.length is None: error = "'timestamp' metadata must contain length of 'snapshot.json'." if error: log.error(error) raise RuggedMetadataError(error)
Validate the 'timestamp' role metadata.