Module rugged.lib.task_queue
Expand source code
import os
import sys
from amqp.exceptions import AccessRefused
from celery import Celery
from celery.exceptions import TimeoutError
from kombu.exceptions import OperationalError
from rugged.lib.config import get_config
from rugged.exceptions.access_refused import RuggedAccessRefused
from rugged.exceptions.hostname_not_found import RuggedHostnameNotFound
from rugged.exceptions.timeout_error import RuggedTimeoutError
from rugged.lib.logger import get_logger
log = get_logger()
config = get_config()
class TaskQueue:
""" Return a fully initialized task queue. """
def __init__(self, username=None, password=None, host=None):
self.log = log
self.config = config
username = self.get_rabbitmq_username(username)
password = self.get_rabbitmq_password(password)
host = self.get_rabbitmq_host(host)
try:
self.log.debug("Initializing connection to RabbitMQ.")
self.celery = Celery(
'',
broker=f'pyamqp://{ username }:{ password }@{ host }//',
backend='rpc://',
)
except AccessRefused:
self.log.error("Failed to authenticate to RabbitMQ.")
raise RuggedAccessRefused()
self.celery.conf.update(
task_serializer='json',
result_serializer='json',
broker_transport_options={'max_retries': 1}
)
def get_rabbitmq_username(self, username):
if username is not None:
return username
return self.config['username'].get()
def get_rabbitmq_password(self, password):
if password is not None:
return password
return self.config['password'].get()
def get_rabbitmq_host(self, host):
if host is not None:
return host
return self.config['host'].get()
def get_task_queue(self):
return self.celery
def send_task(self, task=None, worker=None, args=[], timeout=10):
# Pass context as keyword arguments.
context = {
"log_level": self.log.level
}
try:
queue = self.get_task_queue()
result = queue.send_task(
task,
queue=worker,
args=args,
kwargs=context,
)
return result.get(timeout=timeout)
except TimeoutError:
raise RuggedTimeoutError
except AccessRefused:
raise RuggedAccessRefused
except OperationalError:
raise RuggedHostnameNotFound
def run_task(
worker,
task,
args=[],
timeout=10,
username=None,
password=None,
host=None,
):
""" Run a task on a worker and return the response. """
try:
queue = TaskQueue(username, password, host)
return queue.send_task(
task=f"{ task }",
worker=worker,
args=args,
timeout=timeout,
)
except RuggedTimeoutError:
error = "The operation timed out. "\
f"Check status of { worker }."
log.error(error)
sys.exit(os.EX_TEMPFAIL)
except RuggedHostnameNotFound:
error = "Failed to resolve worker queue hostname. "\
"Check configuration."
log.error(error)
sys.exit(os.EX_NOHOST)
except RuggedAccessRefused:
error = "Failed to authenticate to worker queue. "\
"Check credentials."
log.error(error)
sys.exit(os.EX_NOPERM)
Functions
def run_task(worker, task, args=[], timeout=10, username=None, password=None, host=None)
-
Run a task on a worker and return the response.
Expand source code
def run_task( worker, task, args=[], timeout=10, username=None, password=None, host=None, ): """ Run a task on a worker and return the response. """ try: queue = TaskQueue(username, password, host) return queue.send_task( task=f"{ task }", worker=worker, args=args, timeout=timeout, ) except RuggedTimeoutError: error = "The operation timed out. "\ f"Check status of { worker }." log.error(error) sys.exit(os.EX_TEMPFAIL) except RuggedHostnameNotFound: error = "Failed to resolve worker queue hostname. "\ "Check configuration." log.error(error) sys.exit(os.EX_NOHOST) except RuggedAccessRefused: error = "Failed to authenticate to worker queue. "\ "Check credentials." log.error(error) sys.exit(os.EX_NOPERM)
Classes
class TaskQueue (username=None, password=None, host=None)
-
Return a fully initialized task queue.
Expand source code
class TaskQueue: """ Return a fully initialized task queue. """ def __init__(self, username=None, password=None, host=None): self.log = log self.config = config username = self.get_rabbitmq_username(username) password = self.get_rabbitmq_password(password) host = self.get_rabbitmq_host(host) try: self.log.debug("Initializing connection to RabbitMQ.") self.celery = Celery( '', broker=f'pyamqp://{ username }:{ password }@{ host }//', backend='rpc://', ) except AccessRefused: self.log.error("Failed to authenticate to RabbitMQ.") raise RuggedAccessRefused() self.celery.conf.update( task_serializer='json', result_serializer='json', broker_transport_options={'max_retries': 1} ) def get_rabbitmq_username(self, username): if username is not None: return username return self.config['username'].get() def get_rabbitmq_password(self, password): if password is not None: return password return self.config['password'].get() def get_rabbitmq_host(self, host): if host is not None: return host return self.config['host'].get() def get_task_queue(self): return self.celery def send_task(self, task=None, worker=None, args=[], timeout=10): # Pass context as keyword arguments. context = { "log_level": self.log.level } try: queue = self.get_task_queue() result = queue.send_task( task, queue=worker, args=args, kwargs=context, ) return result.get(timeout=timeout) except TimeoutError: raise RuggedTimeoutError except AccessRefused: raise RuggedAccessRefused except OperationalError: raise RuggedHostnameNotFound
Methods
def get_rabbitmq_host(self, host)
-
Expand source code
def get_rabbitmq_host(self, host): if host is not None: return host return self.config['host'].get()
def get_rabbitmq_password(self, password)
-
Expand source code
def get_rabbitmq_password(self, password): if password is not None: return password return self.config['password'].get()
def get_rabbitmq_username(self, username)
-
Expand source code
def get_rabbitmq_username(self, username): if username is not None: return username return self.config['username'].get()
def get_task_queue(self)
-
Expand source code
def get_task_queue(self): return self.celery
def send_task(self, task=None, worker=None, args=[], timeout=10)
-
Expand source code
def send_task(self, task=None, worker=None, args=[], timeout=10): # Pass context as keyword arguments. context = { "log_level": self.log.level } try: queue = self.get_task_queue() result = queue.send_task( task, queue=worker, args=args, kwargs=context, ) return result.get(timeout=timeout) except TimeoutError: raise RuggedTimeoutError except AccessRefused: raise RuggedAccessRefused except OperationalError: raise RuggedHostnameNotFound