Module rugged.workers.base_worker

Classes

class BaseWorker
Expand source code
class BaseWorker():
    """ Base class that defines common tasks for Rugged workers. """

    # This is invoked before each task starts execution.
    @staticmethod
    @task_prerun.connect
    def set_log_level(sender: Celery, **kwargs):
        """ Set log level based on what was passed in for this task. """
        if 'log_level' not in kwargs['kwargs']:
            return
        log.setLevel(kwargs['kwargs']['log_level'])

    # This is invoked before each task starts execution. The memory trace is
    # logged in the next task (below).
    @staticmethod
    @task_prerun.connect
    def start_tracing_memory_usage(sender: Celery, **kwargs):
        """ Start tracing memory usage. """
        if 'log_level' not in kwargs['kwargs']:
            return
        if kwargs['kwargs']['log_level'] == logging.DEBUG:
            log.debug('Starting trace of memory allocation.')
            tracemalloc.start()

    # This is invoked after each task completes execution. The memory trace is
    # started in the previous task (above).
    @staticmethod
    @task_postrun.connect
    def print_memory_usage(sender: Celery, **kwargs):
        """ Print memory usage. """
        if log.level == logging.DEBUG:
            current, peak = tracemalloc.get_traced_memory()
            tracemalloc.stop()
            # pyrefly: ignore[missing-attribute]
            log.debug(f"== Memory allocation for: {sender.name} ==")
            log.debug(f"Peak memory allocation: {peak / 1024:,.2f}KiB")
            log.debug(f"Final memory allocation: {current / 1024:,.2f}KiB")

    @staticmethod
    @worker.task(name='echo', queue=queue)
    def echo(worker_name, message, **context):
        """ Dummy task to test connections to the queue. """
        log.info(f"{worker_name} received echo task: {message}")
        if log.level == logging.DEBUG:
            log.debug(f"{worker_name} received debug flag.")
        return f"{worker_name} PONG: {message}"

    @staticmethod
    @worker.task(name='logs', queue=queue)
    def logs(**context):
        """ Return this worker's logs. """
        return get_log_entries(log)

    @staticmethod
    @worker.task(name='truncate_logs', queue=queue)
    def truncate_logs(**context):
        """ Truncate this worker's logs. """
        return truncate_log(log)

    @staticmethod
    @worker.task(name='get_configs', queue=queue)
    def get_configs(**context):
        """ Return this worker's configs. """
        return get_local_configs()

    @staticmethod
    @worker.task(name='status', queue=queue)
    def status(**context):
        """ Return the status of the repo from the worker's perpective. """
        repo = RuggedRepository()
        repo.load()
        return repo.status()

Base class that defines common tasks for Rugged workers.

Subclasses

Static methods

def echo(worker_name, message, **context)
Expand source code
@staticmethod
@worker.task(name='echo', queue=queue)
def echo(worker_name, message, **context):
    """ Dummy task to test connections to the queue. """
    log.info(f"{worker_name} received echo task: {message}")
    if log.level == logging.DEBUG:
        log.debug(f"{worker_name} received debug flag.")
    return f"{worker_name} PONG: {message}"

Dummy task to test connections to the queue.

def get_configs(**context)
Expand source code
@staticmethod
@worker.task(name='get_configs', queue=queue)
def get_configs(**context):
    """ Return this worker's configs. """
    return get_local_configs()

Return this worker's configs.

def logs(**context)
Expand source code
@staticmethod
@worker.task(name='logs', queue=queue)
def logs(**context):
    """ Return this worker's logs. """
    return get_log_entries(log)

Return this worker's logs.

def print_memory_usage(sender: celery.app.base.Celery, **kwargs)
Expand source code
@staticmethod
@task_postrun.connect
def print_memory_usage(sender: Celery, **kwargs):
    """ Print memory usage. """
    if log.level == logging.DEBUG:
        current, peak = tracemalloc.get_traced_memory()
        tracemalloc.stop()
        # pyrefly: ignore[missing-attribute]
        log.debug(f"== Memory allocation for: {sender.name} ==")
        log.debug(f"Peak memory allocation: {peak / 1024:,.2f}KiB")
        log.debug(f"Final memory allocation: {current / 1024:,.2f}KiB")

Print memory usage.

def set_log_level(sender: celery.app.base.Celery, **kwargs)
Expand source code
@staticmethod
@task_prerun.connect
def set_log_level(sender: Celery, **kwargs):
    """ Set log level based on what was passed in for this task. """
    if 'log_level' not in kwargs['kwargs']:
        return
    log.setLevel(kwargs['kwargs']['log_level'])

Set log level based on what was passed in for this task.

def start_tracing_memory_usage(sender: celery.app.base.Celery, **kwargs)
Expand source code
@staticmethod
@task_prerun.connect
def start_tracing_memory_usage(sender: Celery, **kwargs):
    """ Start tracing memory usage. """
    if 'log_level' not in kwargs['kwargs']:
        return
    if kwargs['kwargs']['log_level'] == logging.DEBUG:
        log.debug('Starting trace of memory allocation.')
        tracemalloc.start()

Start tracing memory usage.

def status(**context)
Expand source code
@staticmethod
@worker.task(name='status', queue=queue)
def status(**context):
    """ Return the status of the repo from the worker's perpective. """
    repo = RuggedRepository()
    repo.load()
    return repo.status()

Return the status of the repo from the worker's perpective.

def truncate_logs(**context)
Expand source code
@staticmethod
@worker.task(name='truncate_logs', queue=queue)
def truncate_logs(**context):
    """ Truncate this worker's logs. """
    return truncate_log(log)

Truncate this worker's logs.