Module rugged.lib.task_queue

Expand source code
import os
import sys
from amqp.exceptions import AccessRefused
from celery import Celery
from celery.exceptions import TimeoutError
from kombu.exceptions import OperationalError
from rugged.lib.config import get_config
from rugged.exceptions.access_refused import RuggedAccessRefused
from rugged.exceptions.hostname_not_found import RuggedHostnameNotFound
from rugged.exceptions.timeout_error import RuggedTimeoutError
from rugged.lib.logger import get_logger

log = get_logger()
config = get_config()


class TaskQueue:
    """ Return a fully initialized task queue. """

    def __init__(self, username=None, password=None, host=None):
        self.log = log
        self.config = config
        username = self.get_rabbitmq_username(username)
        password = self.get_rabbitmq_password(password)
        host = self.get_rabbitmq_host(host)
        try:
            self.log.debug("Initializing connection to RabbitMQ.")
            self.celery = Celery(
                '',
                broker=f'pyamqp://{ username }:{ password }@{ host }//',
                backend='rpc://',
            )
        except AccessRefused:
            self.log.error("Failed to authenticate to RabbitMQ.")
            raise RuggedAccessRefused()
        self.celery.conf.update(
            task_serializer='json',
            result_serializer='json',
            broker_transport_options={'max_retries': 1}
        )

    def get_rabbitmq_username(self, username):
        if username is not None:
            return username
        return self.config['username'].get()

    def get_rabbitmq_password(self, password):
        if password is not None:
            return password
        return self.config['password'].get()

    def get_rabbitmq_host(self, host):
        if host is not None:
            return host
        return self.config['host'].get()

    def get_task_queue(self):
        return self.celery

    def send_task(self, task=None, worker=None, args=[], timeout=10):
        # Pass context as keyword arguments.
        context = {
            "log_level": self.log.level
        }
        try:
            queue = self.get_task_queue()
            result = queue.send_task(
                task,
                queue=worker,
                args=args,
                kwargs=context,
            )
            return result.get(timeout=timeout)
        except TimeoutError:
            raise RuggedTimeoutError
        except AccessRefused:
            raise RuggedAccessRefused
        except OperationalError:
            raise RuggedHostnameNotFound


def run_task(
    worker,
    task,
    args=[],
    timeout=10,
    username=None,
    password=None,
    host=None,
):
    """ Run a task on a worker and return the response. """
    try:
        queue = TaskQueue(username, password, host)
        return queue.send_task(
            task=f"{ task }",
            worker=worker,
            args=args,
            timeout=timeout,
            )
    except RuggedTimeoutError:
        error = "The operation timed out. "\
                f"Check status of { worker }."
        log.error(error)
        sys.exit(os.EX_TEMPFAIL)
    except RuggedHostnameNotFound:
        error = "Failed to resolve worker queue hostname. "\
                "Check configuration."
        log.error(error)
        sys.exit(os.EX_NOHOST)
    except RuggedAccessRefused:
        error = "Failed to authenticate to worker queue. "\
                "Check credentials."
        log.error(error)
        sys.exit(os.EX_NOPERM)

Functions

def run_task(worker, task, args=[], timeout=10, username=None, password=None, host=None)

Run a task on a worker and return the response.

Expand source code
def run_task(
    worker,
    task,
    args=[],
    timeout=10,
    username=None,
    password=None,
    host=None,
):
    """ Run a task on a worker and return the response. """
    try:
        queue = TaskQueue(username, password, host)
        return queue.send_task(
            task=f"{ task }",
            worker=worker,
            args=args,
            timeout=timeout,
            )
    except RuggedTimeoutError:
        error = "The operation timed out. "\
                f"Check status of { worker }."
        log.error(error)
        sys.exit(os.EX_TEMPFAIL)
    except RuggedHostnameNotFound:
        error = "Failed to resolve worker queue hostname. "\
                "Check configuration."
        log.error(error)
        sys.exit(os.EX_NOHOST)
    except RuggedAccessRefused:
        error = "Failed to authenticate to worker queue. "\
                "Check credentials."
        log.error(error)
        sys.exit(os.EX_NOPERM)

Classes

class TaskQueue (username=None, password=None, host=None)

Return a fully initialized task queue.

Expand source code
class TaskQueue:
    """ Return a fully initialized task queue. """

    def __init__(self, username=None, password=None, host=None):
        self.log = log
        self.config = config
        username = self.get_rabbitmq_username(username)
        password = self.get_rabbitmq_password(password)
        host = self.get_rabbitmq_host(host)
        try:
            self.log.debug("Initializing connection to RabbitMQ.")
            self.celery = Celery(
                '',
                broker=f'pyamqp://{ username }:{ password }@{ host }//',
                backend='rpc://',
            )
        except AccessRefused:
            self.log.error("Failed to authenticate to RabbitMQ.")
            raise RuggedAccessRefused()
        self.celery.conf.update(
            task_serializer='json',
            result_serializer='json',
            broker_transport_options={'max_retries': 1}
        )

    def get_rabbitmq_username(self, username):
        if username is not None:
            return username
        return self.config['username'].get()

    def get_rabbitmq_password(self, password):
        if password is not None:
            return password
        return self.config['password'].get()

    def get_rabbitmq_host(self, host):
        if host is not None:
            return host
        return self.config['host'].get()

    def get_task_queue(self):
        return self.celery

    def send_task(self, task=None, worker=None, args=[], timeout=10):
        # Pass context as keyword arguments.
        context = {
            "log_level": self.log.level
        }
        try:
            queue = self.get_task_queue()
            result = queue.send_task(
                task,
                queue=worker,
                args=args,
                kwargs=context,
            )
            return result.get(timeout=timeout)
        except TimeoutError:
            raise RuggedTimeoutError
        except AccessRefused:
            raise RuggedAccessRefused
        except OperationalError:
            raise RuggedHostnameNotFound

Methods

def get_rabbitmq_host(self, host)
Expand source code
def get_rabbitmq_host(self, host):
    if host is not None:
        return host
    return self.config['host'].get()
def get_rabbitmq_password(self, password)
Expand source code
def get_rabbitmq_password(self, password):
    if password is not None:
        return password
    return self.config['password'].get()
def get_rabbitmq_username(self, username)
Expand source code
def get_rabbitmq_username(self, username):
    if username is not None:
        return username
    return self.config['username'].get()
def get_task_queue(self)
Expand source code
def get_task_queue(self):
    return self.celery
def send_task(self, task=None, worker=None, args=[], timeout=10)
Expand source code
def send_task(self, task=None, worker=None, args=[], timeout=10):
    # Pass context as keyword arguments.
    context = {
        "log_level": self.log.level
    }
    try:
        queue = self.get_task_queue()
        result = queue.send_task(
            task,
            queue=worker,
            args=args,
            kwargs=context,
        )
        return result.get(timeout=timeout)
    except TimeoutError:
        raise RuggedTimeoutError
    except AccessRefused:
        raise RuggedAccessRefused
    except OperationalError:
        raise RuggedHostnameNotFound