Module rugged.workers.timestamp-worker

Classes

class TimestampWorker
Expand source code
class TimestampWorker(BaseWorker):
    """ Rugged (Celery) worker that fulfills the TUF 'timestamp' role. """

    @staticmethod
    @worker.task(name='update_timestamp_task', queue=queue)
    def update_timestamp_task(**context):
        """ Task to update timestamp metadata for a TUF repository. """
        log.info("Received update-timestamp task.")
        repo = RuggedRepository()
        repo.load()
        repo.update_timestamp()
        result = repo.write_metadata('timestamp')
        if result:
            message = "Updated timestamp metadata."
            log.info(message)
        else:
            message = "Failed to timestamp snapshot metadata."
            log.error(message)
        return (result, message)

    @worker.task(name='get_expiring_metadata_task', queue=queue)
    def get_expiring_metadata_task(**context) -> tuple[bool, list]:
        """ Task to return a list of imminently expiring metadata. """
        log.info("Received get-expiring-metadata task.")
        return get_expiring_metadata('timestamp')

    @worker.task(name='refresh_expiry_task', queue=queue)
    def refresh_expiry_task(**context) -> tuple[bool, str]:
        """ Task to refresh timestamp metadata expiry period. """
        log.info("Received refresh-expiry task.")
        return refresh_expiry('timestamp')

Rugged (Celery) worker that fulfills the TUF 'timestamp' role.

Ancestors

Static methods

def update_timestamp_task(**context)
Expand source code
@staticmethod
@worker.task(name='update_timestamp_task', queue=queue)
def update_timestamp_task(**context):
    """ Task to update timestamp metadata for a TUF repository. """
    log.info("Received update-timestamp task.")
    repo = RuggedRepository()
    repo.load()
    repo.update_timestamp()
    result = repo.write_metadata('timestamp')
    if result:
        message = "Updated timestamp metadata."
        log.info(message)
    else:
        message = "Failed to timestamp snapshot metadata."
        log.error(message)
    return (result, message)

Task to update timestamp metadata for a TUF repository.

Methods

def get_expiring_metadata_task(**context) ‑> tuple[bool, list]
Expand source code
@worker.task(name='get_expiring_metadata_task', queue=queue)
def get_expiring_metadata_task(**context) -> tuple[bool, list]:
    """ Task to return a list of imminently expiring metadata. """
    log.info("Received get-expiring-metadata task.")
    return get_expiring_metadata('timestamp')

Task to return a list of imminently expiring metadata.

def refresh_expiry_task(**context) ‑> tuple[bool, str]
Expand source code
@worker.task(name='refresh_expiry_task', queue=queue)
def refresh_expiry_task(**context) -> tuple[bool, str]:
    """ Task to refresh timestamp metadata expiry period. """
    log.info("Received refresh-expiry task.")
    return refresh_expiry('timestamp')

Task to refresh timestamp metadata expiry period.

Inherited members