Module rugged.workers.base_worker
Classes
class BaseWorker
-
Expand source code
class BaseWorker(): """ Base class that defines common tasks for Rugged workers. """ # This is invoked before each task starts execution. @staticmethod @task_prerun.connect def set_log_level(sender: Celery, **kwargs): """ Set log level based on what was passed in for this task. """ if 'log_level' not in kwargs['kwargs']: return log.setLevel(kwargs['kwargs']['log_level']) # This is invoked before each task starts execution. The memory trace is # logged in the next task (below). @staticmethod @task_prerun.connect def start_tracing_memory_usage(sender: Celery, **kwargs): """ Start tracing memory usage. """ if 'log_level' not in kwargs['kwargs']: return if kwargs['kwargs']['log_level'] == logging.DEBUG: log.debug('Starting trace of memory allocation.') tracemalloc.start() # This is invoked after each task completes execution. The memory trace is # started in the previous task (above). @staticmethod @task_postrun.connect def print_memory_usage(sender: Celery, **kwargs): """ Print memory usage. """ if log.level == logging.DEBUG: current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() # pyrefly: ignore[missing-attribute] log.debug(f"== Memory allocation for: {sender.name} ==") log.debug(f"Peak memory allocation: {peak / 1024:,.2f}KiB") log.debug(f"Final memory allocation: {current / 1024:,.2f}KiB") @staticmethod @worker.task(name='echo', queue=queue) def echo(worker_name, message, **context): """ Dummy task to test connections to the queue. """ log.info(f"{worker_name} received echo task: {message}") if log.level == logging.DEBUG: log.debug(f"{worker_name} received debug flag.") return f"{worker_name} PONG: {message}" @staticmethod @worker.task(name='logs', queue=queue) def logs(**context): """ Return this worker's logs. """ return get_log_entries(log) @staticmethod @worker.task(name='truncate_logs', queue=queue) def truncate_logs(**context): """ Truncate this worker's logs. """ return truncate_log(log) @staticmethod @worker.task(name='get_configs', queue=queue) def get_configs(**context): """ Return this worker's configs. """ return get_local_configs() @staticmethod @worker.task(name='status', queue=queue) def status(**context): """ Return the status of the repo from the worker's perpective. """ repo = RuggedRepository() repo.load() return repo.status()
Base class that defines common tasks for Rugged workers.
Subclasses
Static methods
def echo(worker_name, message, **context)
-
Expand source code
@staticmethod @worker.task(name='echo', queue=queue) def echo(worker_name, message, **context): """ Dummy task to test connections to the queue. """ log.info(f"{worker_name} received echo task: {message}") if log.level == logging.DEBUG: log.debug(f"{worker_name} received debug flag.") return f"{worker_name} PONG: {message}"
Dummy task to test connections to the queue.
def get_configs(**context)
-
Expand source code
@staticmethod @worker.task(name='get_configs', queue=queue) def get_configs(**context): """ Return this worker's configs. """ return get_local_configs()
Return this worker's configs.
def logs(**context)
-
Expand source code
@staticmethod @worker.task(name='logs', queue=queue) def logs(**context): """ Return this worker's logs. """ return get_log_entries(log)
Return this worker's logs.
def print_memory_usage(sender: celery.app.base.Celery, **kwargs)
-
Expand source code
@staticmethod @task_postrun.connect def print_memory_usage(sender: Celery, **kwargs): """ Print memory usage. """ if log.level == logging.DEBUG: current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() # pyrefly: ignore[missing-attribute] log.debug(f"== Memory allocation for: {sender.name} ==") log.debug(f"Peak memory allocation: {peak / 1024:,.2f}KiB") log.debug(f"Final memory allocation: {current / 1024:,.2f}KiB")
Print memory usage.
def set_log_level(sender: celery.app.base.Celery, **kwargs)
-
Expand source code
@staticmethod @task_prerun.connect def set_log_level(sender: Celery, **kwargs): """ Set log level based on what was passed in for this task. """ if 'log_level' not in kwargs['kwargs']: return log.setLevel(kwargs['kwargs']['log_level'])
Set log level based on what was passed in for this task.
def start_tracing_memory_usage(sender: celery.app.base.Celery, **kwargs)
-
Expand source code
@staticmethod @task_prerun.connect def start_tracing_memory_usage(sender: Celery, **kwargs): """ Start tracing memory usage. """ if 'log_level' not in kwargs['kwargs']: return if kwargs['kwargs']['log_level'] == logging.DEBUG: log.debug('Starting trace of memory allocation.') tracemalloc.start()
Start tracing memory usage.
def status(**context)
-
Expand source code
@staticmethod @worker.task(name='status', queue=queue) def status(**context): """ Return the status of the repo from the worker's perpective. """ repo = RuggedRepository() repo.load() return repo.status()
Return the status of the repo from the worker's perpective.
def truncate_logs(**context)
-
Expand source code
@staticmethod @worker.task(name='truncate_logs', queue=queue) def truncate_logs(**context): """ Truncate this worker's logs. """ return truncate_log(log)
Truncate this worker's logs.