Module rugged.workers.root-worker
Classes
class RootWorker
-
Expand source code
class RootWorker(BaseWorker): """ Rugged (Celery) worker that fulfills the TUF 'root' role. N.B. This worker has access to the root signing key. This worker should not be active during normal operations of the Rugged system. It should only be brought online by administrators, to perform 'root' functions (eg. key rotation). Otherwise, it should be completely removed from the system. """ @worker.task(name='initialize_task', queue=queue) def initialize_task(**context): """ Initialize a new TUF repository. """ log.debug("Received 'initialize' task.") result, message = init_repo() if result: log.info(message) else: log.error(message) return (result, message) @worker.task(name='generate_keypair_task', queue=queue) def generate_keypair_task(key: str, role: str, **context): """ Generate keypairs for use in a TUF repository. """ log.debug("Received 'generate_keypair' task.") log.info(f"Generating '{key}' keypair for '{role}' role.") return KeyManager().generate_keypair(key, role)
Rugged (Celery) worker that fulfills the TUF 'root' role.
N.B. This worker has access to the root signing key.
This worker should not be active during normal operations of the Rugged system. It should only be brought online by administrators, to perform 'root' functions (eg. key rotation). Otherwise, it should be completely removed from the system.
Ancestors
Methods
def generate_keypair_task(key: str, role: str, **context)
-
Expand source code
@worker.task(name='generate_keypair_task', queue=queue) def generate_keypair_task(key: str, role: str, **context): """ Generate keypairs for use in a TUF repository. """ log.debug("Received 'generate_keypair' task.") log.info(f"Generating '{key}' keypair for '{role}' role.") return KeyManager().generate_keypair(key, role)
Generate keypairs for use in a TUF repository.
def initialize_task(**context)
-
Expand source code
@worker.task(name='initialize_task', queue=queue) def initialize_task(**context): """ Initialize a new TUF repository. """ log.debug("Received 'initialize' task.") result, message = init_repo() if result: log.info(message) else: log.error(message) return (result, message)
Initialize a new TUF repository.
Inherited members