Module rugged.lib.task_queue
Functions
def run_task(worker: str,
task: str,
args: List[Any] = [],
timeout: int = 10,
broker_connection_string: str = '') ‑> Any-
Expand source code
def run_task( worker: str, task: str, args: List[Any] = [], timeout: int = task_timeout, broker_connection_string: str = '', ) -> Any: """ Run a task on a worker and return the response. """ try: queue = TaskQueue(broker_connection_string) return queue.send_task( task=f"{task}", worker=worker, args=args, timeout=timeout, ) except RuggedTimeoutError: warning = f"The operation timed out. Check status of {worker}." log.warning(warning) raise RuggedTimeoutError(warning) except RuggedConnectionError: error = "Failed to connect to worker queue. "\ "Check configuration." log.error(error) sys.exit(os.EX_NOHOST) except RuggedAccessRefused: error = "Failed to authenticate to worker queue. "\ "Check credentials." log.error(error) sys.exit(os.EX_NOPERM)
Run a task on a worker and return the response.
Classes
class TaskQueue (broker_connection_string: str = '')
-
Expand source code
class TaskQueue: """ Return a fully initialized task queue. """ def __init__(self, broker_connection_string: str = '') -> None: if not broker_connection_string: broker_connection_string = config['broker_connection_string'].get() try: log.debug("Initializing connection to RabbitMQ.") self.celery: Celery = Celery( '', # @TODO: Document this argument. broker=broker_connection_string, backend='rpc://', ) except AccessRefused as e: log_exception(e) log.error("Failed to authenticate to RabbitMQ.") raise RuggedAccessRefused worker_max_memory_per_child = config['celery_worker_max_memory_per_child'].get() # pyrefly: ignore[missing-attribute] self.celery.conf.update( task_serializer='json', result_persistent=True, result_serializer='json', broker_transport_options={'max_retries': 1}, worker_max_memory_per_child=worker_max_memory_per_child ) def get_task_queue(self) -> Celery: return self.celery def send_task(self, task: str, worker: str, args: List[Any] = [], timeout: int = task_timeout) -> bool: # Pass context as keyword arguments. context = { "log_level": log.level } try: queue = self.get_task_queue() result = queue.send_task( task, queue=worker, args=args, kwargs=context, ) # @TODO: Evaluate if there's a better way to handle sub-tasks. # See: https://docs.celeryq.dev/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks return result.get(timeout=timeout, disable_sync_subtasks=False) except TimeoutError as e: log_exception(e) raise RuggedTimeoutError except AccessRefused as e: log_exception(e) raise RuggedAccessRefused except OperationalError as e: log_exception(e) raise RuggedConnectionError
Return a fully initialized task queue.
Methods
def get_task_queue(self) ‑> celery.app.base.Celery
-
Expand source code
def get_task_queue(self) -> Celery: return self.celery
def send_task(self, task: str, worker: str, args: List[Any] = [], timeout: int = 10) ‑> bool
-
Expand source code
def send_task(self, task: str, worker: str, args: List[Any] = [], timeout: int = task_timeout) -> bool: # Pass context as keyword arguments. context = { "log_level": log.level } try: queue = self.get_task_queue() result = queue.send_task( task, queue=worker, args=args, kwargs=context, ) # @TODO: Evaluate if there's a better way to handle sub-tasks. # See: https://docs.celeryq.dev/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks return result.get(timeout=timeout, disable_sync_subtasks=False) except TimeoutError as e: log_exception(e) raise RuggedTimeoutError except AccessRefused as e: log_exception(e) raise RuggedAccessRefused except OperationalError as e: log_exception(e) raise RuggedConnectionError