Module rugged.workers.base_worker

Expand source code
import logging
from rugged.lib.config import get_local_configs
from rugged.lib.logger import (
    get_rugged_logger,
    get_log_entries,
    truncate_log,
    set_log_level_from_context,
)
from rugged.lib.task_queue import TaskQueue
from rugged.tuf.repo import RuggedRepository

worker = TaskQueue().get_task_queue()
queue = 'base-worker'
log = get_rugged_logger()


class BaseWorker():
    """ Base class that defines common tasks for Rugged workers. """

    @staticmethod
    @worker.task(name='echo', queue=queue)
    def echo(worker_name, message, **context):
        """ Dummy task to test connections to the queue. """
        set_log_level_from_context(context)
        log.info(f"{worker_name} received echo task: {message}")
        if log.level == logging.DEBUG:
            log.debug(f"{worker_name} received debug flag.")
        return f"{worker_name} PONG: {message}"

    @staticmethod
    @worker.task(name='logs', queue=queue)
    def logs(**context):
        """ Return this worker's logs. """
        set_log_level_from_context(context)
        return get_log_entries(log)

    @staticmethod
    @worker.task(name='truncate_logs', queue=queue)
    def truncate_logs(**context):
        """ Truncate this worker's logs. """
        set_log_level_from_context(context)
        return truncate_log(log)

    @staticmethod
    @worker.task(name='get_configs', queue=queue)
    def get_configs(**context):
        """ Return this worker's configs. """
        set_log_level_from_context(context)
        return get_local_configs()

    @staticmethod
    @worker.task(name='status', queue=queue)
    def status(**context):
        """ Return the status of the repo from the worker's perpective. """
        set_log_level_from_context(context)
        repo = RuggedRepository()
        repo.load()
        return repo.status()

Classes

class BaseWorker

Base class that defines common tasks for Rugged workers.

Expand source code
class BaseWorker():
    """ Base class that defines common tasks for Rugged workers. """

    @staticmethod
    @worker.task(name='echo', queue=queue)
    def echo(worker_name, message, **context):
        """ Dummy task to test connections to the queue. """
        set_log_level_from_context(context)
        log.info(f"{worker_name} received echo task: {message}")
        if log.level == logging.DEBUG:
            log.debug(f"{worker_name} received debug flag.")
        return f"{worker_name} PONG: {message}"

    @staticmethod
    @worker.task(name='logs', queue=queue)
    def logs(**context):
        """ Return this worker's logs. """
        set_log_level_from_context(context)
        return get_log_entries(log)

    @staticmethod
    @worker.task(name='truncate_logs', queue=queue)
    def truncate_logs(**context):
        """ Truncate this worker's logs. """
        set_log_level_from_context(context)
        return truncate_log(log)

    @staticmethod
    @worker.task(name='get_configs', queue=queue)
    def get_configs(**context):
        """ Return this worker's configs. """
        set_log_level_from_context(context)
        return get_local_configs()

    @staticmethod
    @worker.task(name='status', queue=queue)
    def status(**context):
        """ Return the status of the repo from the worker's perpective. """
        set_log_level_from_context(context)
        repo = RuggedRepository()
        repo.load()
        return repo.status()

Subclasses

Static methods

def echo(worker_name, message, **context)

Dummy task to test connections to the queue.

Expand source code
@staticmethod
@worker.task(name='echo', queue=queue)
def echo(worker_name, message, **context):
    """ Dummy task to test connections to the queue. """
    set_log_level_from_context(context)
    log.info(f"{worker_name} received echo task: {message}")
    if log.level == logging.DEBUG:
        log.debug(f"{worker_name} received debug flag.")
    return f"{worker_name} PONG: {message}"
def get_configs(**context)

Return this worker's configs.

Expand source code
@staticmethod
@worker.task(name='get_configs', queue=queue)
def get_configs(**context):
    """ Return this worker's configs. """
    set_log_level_from_context(context)
    return get_local_configs()
def logs(**context)

Return this worker's logs.

Expand source code
@staticmethod
@worker.task(name='logs', queue=queue)
def logs(**context):
    """ Return this worker's logs. """
    set_log_level_from_context(context)
    return get_log_entries(log)
def status(**context)

Return the status of the repo from the worker's perpective.

Expand source code
@staticmethod
@worker.task(name='status', queue=queue)
def status(**context):
    """ Return the status of the repo from the worker's perpective. """
    set_log_level_from_context(context)
    repo = RuggedRepository()
    repo.load()
    return repo.status()
def truncate_logs(**context)

Truncate this worker's logs.

Expand source code
@staticmethod
@worker.task(name='truncate_logs', queue=queue)
def truncate_logs(**context):
    """ Truncate this worker's logs. """
    set_log_level_from_context(context)
    return truncate_log(log)