Module rugged.workers.snapshot-worker
Classes
class SnapshotWorker
-
Expand source code
class SnapshotWorker(BaseWorker): """ Rugged (Celery) worker that fulfills the TUF 'snapshot' role. """ @worker.task(name='update_snapshot_task', queue=queue) def update_snapshot_task(**context): """ Task to update snapshot metadata for a TUF repository. """ log.info("Received update-snapshot task.") repo = RuggedRepository() # If we are using hashed bins, we avoid loading ALL bin_n # metadata to conserve memory. Instead, we load and flush each bin_n # in repo.update_hashed_bin_versions_in_snapshot(), called below # in repo.update_snapshot(). if hashed_bins_is_enabled(): repo.load_metadata('bins') repo.load_metadata('targets') repo.load_metadata('snapshot') else: repo.load() repo.update_snapshot() result = repo.write_metadata('snapshot') if result: message = "Updated snapshot metadata." log.info(message) else: message = "Failed to refresh snapshot metadata." log.error(message) return (result, message) @worker.task(name='get_expiring_metadata_task', queue=queue) def get_expiring_metadata_task(**context) -> tuple[bool, list]: """ Task to return a list of imminently expiring metadata. """ log.info("Received get-expiring-metadata task.") return get_expiring_metadata('snapshot') @worker.task(name='refresh_expiry_task', queue=queue) def refresh_expiry_task(**context) -> tuple[bool, str]: """ Task to refresh snapshot metadata expiry period. """ log.info("Received refresh-expiry task.") return refresh_expiry('snapshot')
Rugged (Celery) worker that fulfills the TUF 'snapshot' role.
Ancestors
Methods
def get_expiring_metadata_task(**context) ‑> tuple[bool, list]
-
Expand source code
@worker.task(name='get_expiring_metadata_task', queue=queue) def get_expiring_metadata_task(**context) -> tuple[bool, list]: """ Task to return a list of imminently expiring metadata. """ log.info("Received get-expiring-metadata task.") return get_expiring_metadata('snapshot')
Task to return a list of imminently expiring metadata.
def refresh_expiry_task(**context) ‑> tuple[bool, str]
-
Expand source code
@worker.task(name='refresh_expiry_task', queue=queue) def refresh_expiry_task(**context) -> tuple[bool, str]: """ Task to refresh snapshot metadata expiry period. """ log.info("Received refresh-expiry task.") return refresh_expiry('snapshot')
Task to refresh snapshot metadata expiry period.
def update_snapshot_task(**context)
-
Expand source code
@worker.task(name='update_snapshot_task', queue=queue) def update_snapshot_task(**context): """ Task to update snapshot metadata for a TUF repository. """ log.info("Received update-snapshot task.") repo = RuggedRepository() # If we are using hashed bins, we avoid loading ALL bin_n # metadata to conserve memory. Instead, we load and flush each bin_n # in repo.update_hashed_bin_versions_in_snapshot(), called below # in repo.update_snapshot(). if hashed_bins_is_enabled(): repo.load_metadata('bins') repo.load_metadata('targets') repo.load_metadata('snapshot') else: repo.load() repo.update_snapshot() result = repo.write_metadata('snapshot') if result: message = "Updated snapshot metadata." log.info(message) else: message = "Failed to refresh snapshot metadata." log.error(message) return (result, message)
Task to update snapshot metadata for a TUF repository.
Inherited members