Module rugged.workers.snapshot-worker

Classes

class SnapshotWorker
Expand source code
class SnapshotWorker(BaseWorker):
    """ Rugged (Celery) worker that fulfills the TUF 'snapshot' role. """

    @worker.task(name='update_snapshot_task', queue=queue)
    def update_snapshot_task(**context):
        """ Task to update snapshot metadata for a TUF repository. """
        log.info("Received update-snapshot task.")
        repo = RuggedRepository()

        # If we are using hashed bins, we avoid loading ALL bin_n
        # metadata to conserve memory. Instead, we load and flush each bin_n
        # in repo.update_hashed_bin_versions_in_snapshot(), called below
        # in repo.update_snapshot().
        if hashed_bins_is_enabled():
            repo.load_metadata('bins')
            repo.load_metadata('targets')
            repo.load_metadata('snapshot')
        else:
            repo.load()

        repo.update_snapshot()
        result = repo.write_metadata('snapshot')
        if result:
            message = "Updated snapshot metadata."
            log.info(message)
        else:
            message = "Failed to refresh snapshot metadata."
            log.error(message)
        return (result, message)

    @worker.task(name='get_expiring_metadata_task', queue=queue)
    def get_expiring_metadata_task(**context) -> tuple[bool, list]:
        """ Task to return a list of imminently expiring metadata. """
        log.info("Received get-expiring-metadata task.")
        return get_expiring_metadata('snapshot')

    @worker.task(name='refresh_expiry_task', queue=queue)
    def refresh_expiry_task(**context) -> tuple[bool, str]:
        """ Task to refresh snapshot metadata expiry period. """
        log.info("Received refresh-expiry task.")
        return refresh_expiry('snapshot')

Rugged (Celery) worker that fulfills the TUF 'snapshot' role.

Ancestors

Methods

def get_expiring_metadata_task(**context) ‑> tuple[bool, list]
Expand source code
@worker.task(name='get_expiring_metadata_task', queue=queue)
def get_expiring_metadata_task(**context) -> tuple[bool, list]:
    """ Task to return a list of imminently expiring metadata. """
    log.info("Received get-expiring-metadata task.")
    return get_expiring_metadata('snapshot')

Task to return a list of imminently expiring metadata.

def refresh_expiry_task(**context) ‑> tuple[bool, str]
Expand source code
@worker.task(name='refresh_expiry_task', queue=queue)
def refresh_expiry_task(**context) -> tuple[bool, str]:
    """ Task to refresh snapshot metadata expiry period. """
    log.info("Received refresh-expiry task.")
    return refresh_expiry('snapshot')

Task to refresh snapshot metadata expiry period.

def update_snapshot_task(**context)
Expand source code
@worker.task(name='update_snapshot_task', queue=queue)
def update_snapshot_task(**context):
    """ Task to update snapshot metadata for a TUF repository. """
    log.info("Received update-snapshot task.")
    repo = RuggedRepository()

    # If we are using hashed bins, we avoid loading ALL bin_n
    # metadata to conserve memory. Instead, we load and flush each bin_n
    # in repo.update_hashed_bin_versions_in_snapshot(), called below
    # in repo.update_snapshot().
    if hashed_bins_is_enabled():
        repo.load_metadata('bins')
        repo.load_metadata('targets')
        repo.load_metadata('snapshot')
    else:
        repo.load()

    repo.update_snapshot()
    result = repo.write_metadata('snapshot')
    if result:
        message = "Updated snapshot metadata."
        log.info(message)
    else:
        message = "Failed to refresh snapshot metadata."
        log.error(message)
    return (result, message)

Task to update snapshot metadata for a TUF repository.

Inherited members