Module rugged.workers.targets-worker

Expand source code
from rugged.lib.logger import (
    get_rugged_logger,
    set_log_level_from_context,
)
from rugged.lib.task_queue import TaskQueue
from rugged.tuf.repo import RuggedRepository
from rugged.workers.base_worker import BaseWorker

log = get_rugged_logger()
queue = 'targets-worker'
worker = TaskQueue().get_task_queue()


class TargetsWorker(BaseWorker):
    """ Rugged (Celery) worker that fulfills the TUF 'targets' role. """

    @worker.task(name='add_targets_task', queue=queue)
    def add_targets_task(**context):
        """ Task to add targets to a TUF repository. """
        set_log_level_from_context(context)
        log.info("Received add-targets task.")
        repo = RuggedRepository()
        repo.load()
        inbound_targets = repo.get_inbound_targets()
        added_targets = repo.add_targets()
        failed_targets = set(inbound_targets).difference(set(added_targets))
        if failed_targets:
            result = False
            data = {'failed_targets': list(failed_targets)}
        else:
            result = repo.write_metadata('targets')
            data = {'added_targets': added_targets}
        return (result, data)

    @worker.task(name='remove_targets_task', queue=queue)
    def remove_targets_task(targets, **context):
        """ Task to add targets to a TUF repository. """
        set_log_level_from_context(context)
        log.info("Received remove-targets task.")
        repo = RuggedRepository()
        repo.load()
        removed_targets = repo.remove_targets(targets)
        failed_targets = set(targets).difference(set(removed_targets))
        if failed_targets:
            result = False
            data = {'failed_targets': list(failed_targets)}
        else:
            result = repo.write_metadata('targets')
            data = {'removed_targets': removed_targets}
        return (result, data)

Classes

class TargetsWorker

Rugged (Celery) worker that fulfills the TUF 'targets' role.

Expand source code
class TargetsWorker(BaseWorker):
    """ Rugged (Celery) worker that fulfills the TUF 'targets' role. """

    @worker.task(name='add_targets_task', queue=queue)
    def add_targets_task(**context):
        """ Task to add targets to a TUF repository. """
        set_log_level_from_context(context)
        log.info("Received add-targets task.")
        repo = RuggedRepository()
        repo.load()
        inbound_targets = repo.get_inbound_targets()
        added_targets = repo.add_targets()
        failed_targets = set(inbound_targets).difference(set(added_targets))
        if failed_targets:
            result = False
            data = {'failed_targets': list(failed_targets)}
        else:
            result = repo.write_metadata('targets')
            data = {'added_targets': added_targets}
        return (result, data)

    @worker.task(name='remove_targets_task', queue=queue)
    def remove_targets_task(targets, **context):
        """ Task to add targets to a TUF repository. """
        set_log_level_from_context(context)
        log.info("Received remove-targets task.")
        repo = RuggedRepository()
        repo.load()
        removed_targets = repo.remove_targets(targets)
        failed_targets = set(targets).difference(set(removed_targets))
        if failed_targets:
            result = False
            data = {'failed_targets': list(failed_targets)}
        else:
            result = repo.write_metadata('targets')
            data = {'removed_targets': removed_targets}
        return (result, data)

Ancestors

Methods

def add_targets_task(**context)

Task to add targets to a TUF repository.

Expand source code
@worker.task(name='add_targets_task', queue=queue)
def add_targets_task(**context):
    """ Task to add targets to a TUF repository. """
    set_log_level_from_context(context)
    log.info("Received add-targets task.")
    repo = RuggedRepository()
    repo.load()
    inbound_targets = repo.get_inbound_targets()
    added_targets = repo.add_targets()
    failed_targets = set(inbound_targets).difference(set(added_targets))
    if failed_targets:
        result = False
        data = {'failed_targets': list(failed_targets)}
    else:
        result = repo.write_metadata('targets')
        data = {'added_targets': added_targets}
    return (result, data)
def remove_targets_task(targets, **context)

Task to add targets to a TUF repository.

Expand source code
@worker.task(name='remove_targets_task', queue=queue)
def remove_targets_task(targets, **context):
    """ Task to add targets to a TUF repository. """
    set_log_level_from_context(context)
    log.info("Received remove-targets task.")
    repo = RuggedRepository()
    repo.load()
    removed_targets = repo.remove_targets(targets)
    failed_targets = set(targets).difference(set(removed_targets))
    if failed_targets:
        result = False
        data = {'failed_targets': list(failed_targets)}
    else:
        result = repo.write_metadata('targets')
        data = {'removed_targets': removed_targets}
    return (result, data)

Inherited members