Module rugged.workers.timestamp-worker
Classes
class TimestampWorker
-
Expand source code
class TimestampWorker(BaseWorker): """ Rugged (Celery) worker that fulfills the TUF 'timestamp' role. """ @staticmethod @worker.task(name='update_timestamp_task', queue=queue) def update_timestamp_task(**context): """ Task to update timestamp metadata for a TUF repository. """ log.info("Received update-timestamp task.") repo = RuggedRepository() repo.load() repo.update_timestamp() result = repo.write_metadata('timestamp') if result: message = "Updated timestamp metadata." log.info(message) else: message = "Failed to timestamp snapshot metadata." log.error(message) return (result, message) @worker.task(name='get_expiring_metadata_task', queue=queue) def get_expiring_metadata_task(**context) -> tuple[bool, list]: """ Task to return a list of imminently expiring metadata. """ log.info("Received get-expiring-metadata task.") return get_expiring_metadata('timestamp') @worker.task(name='refresh_expiry_task', queue=queue) def refresh_expiry_task(**context) -> tuple[bool, str]: """ Task to refresh timestamp metadata expiry period. """ log.info("Received refresh-expiry task.") return refresh_expiry('timestamp')
Rugged (Celery) worker that fulfills the TUF 'timestamp' role.
Ancestors
Static methods
def update_timestamp_task(**context)
-
Expand source code
@staticmethod @worker.task(name='update_timestamp_task', queue=queue) def update_timestamp_task(**context): """ Task to update timestamp metadata for a TUF repository. """ log.info("Received update-timestamp task.") repo = RuggedRepository() repo.load() repo.update_timestamp() result = repo.write_metadata('timestamp') if result: message = "Updated timestamp metadata." log.info(message) else: message = "Failed to timestamp snapshot metadata." log.error(message) return (result, message)
Task to update timestamp metadata for a TUF repository.
Methods
def get_expiring_metadata_task(**context) ‑> tuple[bool, list]
-
Expand source code
@worker.task(name='get_expiring_metadata_task', queue=queue) def get_expiring_metadata_task(**context) -> tuple[bool, list]: """ Task to return a list of imminently expiring metadata. """ log.info("Received get-expiring-metadata task.") return get_expiring_metadata('timestamp')
Task to return a list of imminently expiring metadata.
def refresh_expiry_task(**context) ‑> tuple[bool, str]
-
Expand source code
@worker.task(name='refresh_expiry_task', queue=queue) def refresh_expiry_task(**context) -> tuple[bool, str]: """ Task to refresh timestamp metadata expiry period. """ log.info("Received refresh-expiry task.") return refresh_expiry('timestamp')
Task to refresh timestamp metadata expiry period.
Inherited members