Module rugged.lib.task_queue

Functions

def run_task(worker: str,
task: str,
args: List[Any] = [],
timeout: int = 10,
broker_connection_string: str = '') ‑> Any
Expand source code
def run_task(
    worker: str,
    task: str,
    args: List[Any] = [],
    timeout: int = task_timeout,
    broker_connection_string: str = '',
) -> Any:
    """ Run a task on a worker and return the response. """
    try:
        queue = TaskQueue(broker_connection_string)
        return queue.send_task(
            task=f"{task}",
            worker=worker,
            args=args,
            timeout=timeout,
        )
    except RuggedTimeoutError:
        warning = f"The operation timed out. Check status of {worker}."
        log.warning(warning)
        raise RuggedTimeoutError(warning)
    except RuggedConnectionError:
        error = "Failed to connect to worker queue. "\
                "Check configuration."
        log.error(error)
        sys.exit(os.EX_NOHOST)
    except RuggedAccessRefused:
        error = "Failed to authenticate to worker queue. "\
                "Check credentials."
        log.error(error)
        sys.exit(os.EX_NOPERM)

Run a task on a worker and return the response.

Classes

class TaskQueue (broker_connection_string: str = '')
Expand source code
class TaskQueue:
    """ Return a fully initialized task queue. """

    def __init__(self, broker_connection_string: str = '') -> None:
        if not broker_connection_string:
            broker_connection_string = config['broker_connection_string'].get()
        try:
            log.debug("Initializing connection to RabbitMQ.")
            self.celery: Celery = Celery(
                '',  # @TODO: Document this argument.
                broker=broker_connection_string,
                backend='rpc://',
            )
        except AccessRefused as e:
            log_exception(e)
            log.error("Failed to authenticate to RabbitMQ.")
            raise RuggedAccessRefused

        worker_max_memory_per_child = config['celery_worker_max_memory_per_child'].get()

        # pyrefly: ignore[missing-attribute]
        self.celery.conf.update(
            task_serializer='json',
            result_persistent=True,
            result_serializer='json',
            broker_transport_options={'max_retries': 1},
            worker_max_memory_per_child=worker_max_memory_per_child
        )

    def get_task_queue(self) -> Celery:
        return self.celery

    def send_task(self, task: str, worker: str, args: List[Any] = [], timeout: int = task_timeout) -> bool:
        # Pass context as keyword arguments.
        context = {
            "log_level": log.level
        }
        try:
            queue = self.get_task_queue()
            result = queue.send_task(
                task,
                queue=worker,
                args=args,
                kwargs=context,
            )
            # @TODO: Evaluate if there's a better way to handle sub-tasks.
            # See: https://docs.celeryq.dev/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks
            return result.get(timeout=timeout, disable_sync_subtasks=False)
        except TimeoutError as e:
            log_exception(e)
            raise RuggedTimeoutError
        except AccessRefused as e:
            log_exception(e)
            raise RuggedAccessRefused
        except OperationalError as e:
            log_exception(e)
            raise RuggedConnectionError

Return a fully initialized task queue.

Methods

def get_task_queue(self) ‑> celery.app.base.Celery
Expand source code
def get_task_queue(self) -> Celery:
    return self.celery
def send_task(self, task: str, worker: str, args: List[Any] = [], timeout: int = 10) ‑> bool
Expand source code
def send_task(self, task: str, worker: str, args: List[Any] = [], timeout: int = task_timeout) -> bool:
    # Pass context as keyword arguments.
    context = {
        "log_level": log.level
    }
    try:
        queue = self.get_task_queue()
        result = queue.send_task(
            task,
            queue=worker,
            args=args,
            kwargs=context,
        )
        # @TODO: Evaluate if there's a better way to handle sub-tasks.
        # See: https://docs.celeryq.dev/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks
        return result.get(timeout=timeout, disable_sync_subtasks=False)
    except TimeoutError as e:
        log_exception(e)
        raise RuggedTimeoutError
    except AccessRefused as e:
        log_exception(e)
        raise RuggedAccessRefused
    except OperationalError as e:
        log_exception(e)
        raise RuggedConnectionError