Module rugged.workers.test-worker
Classes
class TestWorker
-
Expand source code
class TestWorker(BaseWorker): """ Rugged (Celery) worker for testing of base/common functionality. N.B. This worker has access to the targets signing key. This worker should not be active in production environments. It is only intended for development and testing. """ @staticmethod @worker.task(name='sleep', queue=queue) def echo(interval, **context): """ Dummy task to test task timeouts. """ log.info(f"Test-worker received 'sleep' task with interval of '{interval}'.") sleep(interval) return f"Test worker went to sleep for '{interval}' seconds."
Rugged (Celery) worker for testing of base/common functionality.
N.B. This worker has access to the targets signing key.
This worker should not be active in production environments. It is only intended for development and testing.
Ancestors
Static methods
def echo(interval, **context)
-
Expand source code
@staticmethod @worker.task(name='sleep', queue=queue) def echo(interval, **context): """ Dummy task to test task timeouts. """ log.info(f"Test-worker received 'sleep' task with interval of '{interval}'.") sleep(interval) return f"Test worker went to sleep for '{interval}' seconds."
Dummy task to test task timeouts.
Inherited members