Module rugged.workers.test-worker

Classes

class TestWorker
Expand source code
class TestWorker(BaseWorker):
    """
    Rugged (Celery) worker for testing of base/common functionality.

    N.B. This worker has access to the targets signing key.

    This worker should not be active in production environments. It is only
    intended for development and testing.
    """

    @staticmethod
    @worker.task(name='sleep', queue=queue)
    def echo(interval, **context):
        """ Dummy task to test task timeouts. """
        log.info(f"Test-worker received 'sleep' task with interval of '{interval}'.")
        sleep(interval)
        return f"Test worker went to sleep for '{interval}' seconds."

Rugged (Celery) worker for testing of base/common functionality.

N.B. This worker has access to the targets signing key.

This worker should not be active in production environments. It is only intended for development and testing.

Ancestors

Static methods

def echo(interval, **context)
Expand source code
@staticmethod
@worker.task(name='sleep', queue=queue)
def echo(interval, **context):
    """ Dummy task to test task timeouts. """
    log.info(f"Test-worker received 'sleep' task with interval of '{interval}'.")
    sleep(interval)
    return f"Test worker went to sleep for '{interval}' seconds."

Dummy task to test task timeouts.

Inherited members