Module rugged.workers.base_worker
Expand source code
import logging
from rugged.lib.config import get_local_configs
from rugged.lib.logger import (
get_rugged_logger,
get_log_entries,
truncate_log,
set_log_level_from_context,
)
from rugged.lib.task_queue import TaskQueue
from rugged.tuf.repo import RuggedRepository
worker = TaskQueue().get_task_queue()
queue = 'base-worker'
log = get_rugged_logger()
class BaseWorker():
""" Base class that defines common tasks for Rugged workers. """
@staticmethod
@worker.task(name='echo', queue=queue)
def echo(worker_name, message, **context):
""" Dummy task to test connections to the queue. """
set_log_level_from_context(context)
log.info(f"{worker_name} received echo task: {message}")
if log.level == logging.DEBUG:
log.debug(f"{worker_name} received debug flag.")
return f"{worker_name} PONG: {message}"
@staticmethod
@worker.task(name='logs', queue=queue)
def logs(**context):
""" Return this worker's logs. """
set_log_level_from_context(context)
return get_log_entries(log)
@staticmethod
@worker.task(name='truncate_logs', queue=queue)
def truncate_logs(**context):
""" Truncate this worker's logs. """
set_log_level_from_context(context)
return truncate_log(log)
@staticmethod
@worker.task(name='get_configs', queue=queue)
def get_configs(**context):
""" Return this worker's configs. """
set_log_level_from_context(context)
return get_local_configs()
@staticmethod
@worker.task(name='status', queue=queue)
def status(**context):
""" Return the status of the repo from the worker's perpective. """
set_log_level_from_context(context)
repo = RuggedRepository()
repo.load()
return repo.status()
Classes
class BaseWorker
-
Base class that defines common tasks for Rugged workers.
Expand source code
class BaseWorker(): """ Base class that defines common tasks for Rugged workers. """ @staticmethod @worker.task(name='echo', queue=queue) def echo(worker_name, message, **context): """ Dummy task to test connections to the queue. """ set_log_level_from_context(context) log.info(f"{worker_name} received echo task: {message}") if log.level == logging.DEBUG: log.debug(f"{worker_name} received debug flag.") return f"{worker_name} PONG: {message}" @staticmethod @worker.task(name='logs', queue=queue) def logs(**context): """ Return this worker's logs. """ set_log_level_from_context(context) return get_log_entries(log) @staticmethod @worker.task(name='truncate_logs', queue=queue) def truncate_logs(**context): """ Truncate this worker's logs. """ set_log_level_from_context(context) return truncate_log(log) @staticmethod @worker.task(name='get_configs', queue=queue) def get_configs(**context): """ Return this worker's configs. """ set_log_level_from_context(context) return get_local_configs() @staticmethod @worker.task(name='status', queue=queue) def status(**context): """ Return the status of the repo from the worker's perpective. """ set_log_level_from_context(context) repo = RuggedRepository() repo.load() return repo.status()
Subclasses
Static methods
def echo(worker_name, message, **context)
-
Dummy task to test connections to the queue.
Expand source code
@staticmethod @worker.task(name='echo', queue=queue) def echo(worker_name, message, **context): """ Dummy task to test connections to the queue. """ set_log_level_from_context(context) log.info(f"{worker_name} received echo task: {message}") if log.level == logging.DEBUG: log.debug(f"{worker_name} received debug flag.") return f"{worker_name} PONG: {message}"
def get_configs(**context)
-
Return this worker's configs.
Expand source code
@staticmethod @worker.task(name='get_configs', queue=queue) def get_configs(**context): """ Return this worker's configs. """ set_log_level_from_context(context) return get_local_configs()
def logs(**context)
-
Return this worker's logs.
Expand source code
@staticmethod @worker.task(name='logs', queue=queue) def logs(**context): """ Return this worker's logs. """ set_log_level_from_context(context) return get_log_entries(log)
def status(**context)
-
Return the status of the repo from the worker's perpective.
Expand source code
@staticmethod @worker.task(name='status', queue=queue) def status(**context): """ Return the status of the repo from the worker's perpective. """ set_log_level_from_context(context) repo = RuggedRepository() repo.load() return repo.status()
def truncate_logs(**context)
-
Truncate this worker's logs.
Expand source code
@staticmethod @worker.task(name='truncate_logs', queue=queue) def truncate_logs(**context): """ Truncate this worker's logs. """ set_log_level_from_context(context) return truncate_log(log)