Module rugged.workers.targets-worker

Classes

class TargetsWorker
Expand source code
class TargetsWorker(BaseWorker):
    """ Rugged (Celery) worker that fulfills the TUF 'targets' role. """

    @worker.task(name='add_targets_task', queue=queue)
    def add_targets_task(**context):
        """ Task to add targets to a TUF repository. """
        log.info("Received add-targets task.")
        return add_targets()

    @worker.task(name='remove_targets_task', queue=queue)
    def remove_targets_task(targets: List[str], **context):
        """ Task to remove targets from a TUF repository. """
        log.info("Received remove-targets task.")
        return remove_targets(targets)

    @worker.task(name='update_targets_task', queue=queue)
    def update_targets_task(**context):
        """ Task to update targets metadata for a TUF repository. """
        log.info("Received update-targets task.")
        repo = RuggedRepository()
        repo.load()
        if hashed_bins_is_enabled():
            repo.update_hashed_bin('bins')
            repo.write_metadata('bins')
            for bin_name in repo.roles['bins'].signed.delegations.roles.keys():
                repo.update_hashed_bin(bin_name)
                repo.write_metadata(bin_name)
        repo.update_targets()
        result = repo.write_metadata('targets')
        if result:
            message = "Updated targets metadata."
            log.info(message)
        else:
            message = "Failed to refresh targets metadata."
            log.error(message)
        return (result, message)

    @worker.task(name='get_expiring_metadata_task', queue=queue)
    def get_expiring_metadata_task(**context) -> tuple[bool, list]:
        """ Task to return a list of imminently expiring metadata. """
        log.info("Received get-expiring-metadata task.")
        expiring_metadata = []
        try:
            repo = RuggedRepository()
            repo.load_metadata('targets')
            if expiry_is_imminent(repo.roles['targets'].signed.expires):
                log.info("'targets.json' metadata expiry is imminent.")
                expiring_metadata.append('targets')
            if hashed_bins_is_enabled():
                repo.load_metadata('bins')
                if expiry_is_imminent(repo.roles['bins'].signed.expires):
                    log.info("'bins.json' metadata expiry is imminent.")
                    expiring_metadata.append('bins')
                for bin_n_name in repo.roles["bins"].signed.delegations.roles.keys():
                    repo.load_metadata(bin_n_name)
                    if expiry_is_imminent(repo.roles[bin_n_name].signed.expires):
                        log.info(f"'{bin_n_name}.json' metadata expiry is imminent.")
                        expiring_metadata.append(bin_n_name)
                    repo.delete_metadata(bin_n_name)
            return (True, expiring_metadata)
        except Exception as e:
            log_exception(e)
            return (False, expiring_metadata)

    @worker.task(name='get_all_targets_metadata_task', queue=queue)
    def get_all_targets_metadata_task(**context) -> tuple[bool, list]:
        """ Task to return a list of all targets metadata. """
        log.info("Received get-all-targets-metadata task.")
        metadata = ['targets']
        try:
            if hashed_bins_is_enabled():
                metadata.append('bins')
                repo = RuggedRepository()
                # To keep peak memory low, we only load 'bins' metadata.
                repo.load_metadata('bins')
                for bin_n_name in repo.roles["bins"].signed.delegations.roles.keys():
                    metadata.append(bin_n_name)
            return (True, metadata)
        except Exception as e:
            log_exception(e)
            return (False, metadata)

    @worker.task(name='refresh_expiry_task', queue=queue)
    def refresh_expiry_task(expiring_metadata: list[str], **context) -> tuple[bool, str]:
        """ Task to refresh targets metadata expiry period. """
        log.info("Received refresh-expiry task.")
        repo = RuggedRepository()
        repo.load()
        results = {}
        for metadata in expiring_metadata:
            try:
                if metadata == 'targets':
                    repo.update_metadata_expiry(metadata)
                    repo.roles[metadata].signatures.clear()
                    repo.sign_metadata(metadata)
                else:
                    repo.update_metadata_expiry(metadata, 'targets')
                    repo.roles[metadata].signatures.clear()
                    repo.sign_bin_metadata(metadata)
                results[metadata] = repo.write_metadata(metadata)
            except Exception as e:
                log_exception(e)
                log.warning(f"Failure during attempt to refresh {metadata} expiry period.")
                results[metadata] = False
            if False in results.values():
                message = f"Failed to refresh {metadata} metadata expiry period."
                log.error(message)
                return (False, message)
        messages = ''
        for role in results.keys():
            message = f"Refreshed {role} metadata expiry period."
            log.info(message)
            messages += f"{message}\n"
        return (True, messages)

Rugged (Celery) worker that fulfills the TUF 'targets' role.

Ancestors

Methods

def add_targets_task(**context)
Expand source code
@worker.task(name='add_targets_task', queue=queue)
def add_targets_task(**context):
    """ Task to add targets to a TUF repository. """
    log.info("Received add-targets task.")
    return add_targets()

Task to add targets to a TUF repository.

def get_all_targets_metadata_task(**context) ‑> tuple[bool, list]
Expand source code
@worker.task(name='get_all_targets_metadata_task', queue=queue)
def get_all_targets_metadata_task(**context) -> tuple[bool, list]:
    """ Task to return a list of all targets metadata. """
    log.info("Received get-all-targets-metadata task.")
    metadata = ['targets']
    try:
        if hashed_bins_is_enabled():
            metadata.append('bins')
            repo = RuggedRepository()
            # To keep peak memory low, we only load 'bins' metadata.
            repo.load_metadata('bins')
            for bin_n_name in repo.roles["bins"].signed.delegations.roles.keys():
                metadata.append(bin_n_name)
        return (True, metadata)
    except Exception as e:
        log_exception(e)
        return (False, metadata)

Task to return a list of all targets metadata.

def get_expiring_metadata_task(**context) ‑> tuple[bool, list]
Expand source code
@worker.task(name='get_expiring_metadata_task', queue=queue)
def get_expiring_metadata_task(**context) -> tuple[bool, list]:
    """ Task to return a list of imminently expiring metadata. """
    log.info("Received get-expiring-metadata task.")
    expiring_metadata = []
    try:
        repo = RuggedRepository()
        repo.load_metadata('targets')
        if expiry_is_imminent(repo.roles['targets'].signed.expires):
            log.info("'targets.json' metadata expiry is imminent.")
            expiring_metadata.append('targets')
        if hashed_bins_is_enabled():
            repo.load_metadata('bins')
            if expiry_is_imminent(repo.roles['bins'].signed.expires):
                log.info("'bins.json' metadata expiry is imminent.")
                expiring_metadata.append('bins')
            for bin_n_name in repo.roles["bins"].signed.delegations.roles.keys():
                repo.load_metadata(bin_n_name)
                if expiry_is_imminent(repo.roles[bin_n_name].signed.expires):
                    log.info(f"'{bin_n_name}.json' metadata expiry is imminent.")
                    expiring_metadata.append(bin_n_name)
                repo.delete_metadata(bin_n_name)
        return (True, expiring_metadata)
    except Exception as e:
        log_exception(e)
        return (False, expiring_metadata)

Task to return a list of imminently expiring metadata.

def refresh_expiry_task(expiring_metadata: list[str], **context) ‑> tuple[bool, str]
Expand source code
@worker.task(name='refresh_expiry_task', queue=queue)
def refresh_expiry_task(expiring_metadata: list[str], **context) -> tuple[bool, str]:
    """ Task to refresh targets metadata expiry period. """
    log.info("Received refresh-expiry task.")
    repo = RuggedRepository()
    repo.load()
    results = {}
    for metadata in expiring_metadata:
        try:
            if metadata == 'targets':
                repo.update_metadata_expiry(metadata)
                repo.roles[metadata].signatures.clear()
                repo.sign_metadata(metadata)
            else:
                repo.update_metadata_expiry(metadata, 'targets')
                repo.roles[metadata].signatures.clear()
                repo.sign_bin_metadata(metadata)
            results[metadata] = repo.write_metadata(metadata)
        except Exception as e:
            log_exception(e)
            log.warning(f"Failure during attempt to refresh {metadata} expiry period.")
            results[metadata] = False
        if False in results.values():
            message = f"Failed to refresh {metadata} metadata expiry period."
            log.error(message)
            return (False, message)
    messages = ''
    for role in results.keys():
        message = f"Refreshed {role} metadata expiry period."
        log.info(message)
        messages += f"{message}\n"
    return (True, messages)

Task to refresh targets metadata expiry period.

def remove_targets_task(targets: List[str], **context)
Expand source code
@worker.task(name='remove_targets_task', queue=queue)
def remove_targets_task(targets: List[str], **context):
    """ Task to remove targets from a TUF repository. """
    log.info("Received remove-targets task.")
    return remove_targets(targets)

Task to remove targets from a TUF repository.

def update_targets_task(**context)
Expand source code
@worker.task(name='update_targets_task', queue=queue)
def update_targets_task(**context):
    """ Task to update targets metadata for a TUF repository. """
    log.info("Received update-targets task.")
    repo = RuggedRepository()
    repo.load()
    if hashed_bins_is_enabled():
        repo.update_hashed_bin('bins')
        repo.write_metadata('bins')
        for bin_name in repo.roles['bins'].signed.delegations.roles.keys():
            repo.update_hashed_bin(bin_name)
            repo.write_metadata(bin_name)
    repo.update_targets()
    result = repo.write_metadata('targets')
    if result:
        message = "Updated targets metadata."
        log.info(message)
    else:
        message = "Failed to refresh targets metadata."
        log.error(message)
    return (result, message)

Task to update targets metadata for a TUF repository.

Inherited members