Module rugged.workers.monitor-worker
Classes
class MonitorWorker
-
Expand source code
class MonitorWorker(BaseWorker): """ Rugged (Celery) worker for monitoring a directory and dispatching `add-targets` tasks. N.B. This worker should not have access to any keys. """ # This is invoked before each task starts execution. # This is special-cased because scheduled tasks don't have a log_level # passed in, they use the configured scheduler_log_level. @staticmethod @task_prerun.connect def set_log_level(sender: Celery, **kwargs): """ Set log level for regular and scheduled tasks. """ try: log_level = kwargs['kwargs']['log_level'] except KeyError: log_level = config["scheduler_log_level"].get() log.setLevel(log_level) # pyrefly: ignore[missing-attribute] @worker.on_after_configure.connect def schedule_periodic_tasks(sender: Celery, **kwargs): """ Schedule periodic tasks for the monitor-worker. """ period = config["scheduler_scan_period"].get() if period > 0: log.info(f"Scheduling find-new-targets task to run every {period} seconds.") sender.add_periodic_task(period, MonitorWorker.find_new_targets_task) period = config["scheduler_refresh_period"].get() if period > 0: log.info(f"Scheduling refresh-expiry task to run every {period} seconds.") delete_refreshing_flag() sender.add_periodic_task(period, MonitorWorker.refresh_expiry_task) period = config["scheduler_reset_period"].get() if period > 0: log.info(f"Scheduling reset-semaphores task to run every {period} seconds.") sender.add_periodic_task(period, MonitorWorker.reset_semaphores_task) @worker.task(name='refresh_expiry_task', queue=queue) def refresh_expiry_task(**context): """ Task to refresh imminently expiring metadata. """ log.debug("Received refresh-expiry task.") if monitor_worker_is_paused(): log.warning("The monitor worker is paused.") log.info("Skipping expiry refresh task.") return if monitor_worker_is_refreshing(): flag_path = get_refreshing_flag_path() log.warning(f"A possibly stale refresh-expiry task semaphore was found at: {flag_path}") log.info("Skipping expiry refresh task.") return create_refreshing_flag() wait_for_processing_task_to_complete() try: log.info("Fetching expiring metadata.") metadata = _fetch_expiring_metadata() for tuf_worker, role_list in metadata.items(): log.info(f"Expiring metadata from {tuf_worker}:") if not role_list: log.info(" No expiring metadata.") for role in role_list: log.info(f" {role}.json") log.info("Refreshing any imminently expiring metadata.") _refresh_expiring_metadata(metadata) delete_refreshing_flag() except RuggedMetadataError as e: log_exception(e) log.error("Failed to fetch expiring metadata.") delete_refreshing_flag() @worker.task(name='find_new_targets_task', queue=queue) def find_new_targets_task(**context): """ Task to check for new targets in post-to-tuf directory. """ log.debug("Received find-new-targets task.") if monitor_worker_is_paused(): log.warning("The monitor worker is paused.") log.info("Skipping find-new-targets task.") return files = get_post_to_tuf_dir_contents() if not files: log.debug("No new content found.") return log.info("New content found in post-to-tuf directory.") log.info(f"Count of post-to-tuf content: {len(files)}") age = round(get_age_of_oldest_post_to_tuf_dir_content()) log.info(f"Age of oldest post-to-tuf content: {age} seconds") log.debug("List of post-to-tuf content:") for file in files: log.debug(f" {file}") if monitor_worker_is_refreshing(): log.info("Monitor worker is refreshing expiry periods. Waiting for next scan to continue.") return ready_targets = _match_targets(files) if not ready_targets: log.info("No new targets found.") return if monitor_worker_is_processing(): log.info("Monitor worker is already processing targets. Waiting for next scan to continue.") return log.info("New targets found in post-to-tuf directory:") for ready_target in ready_targets: log.info(f" {ready_target}") ready_target = ready_targets[0] log.info(f"Processing: {ready_target}") ready_target_dir = _get_ready_target_dir(ready_target) processing_target = _get_processing_target(ready_target) processing_target_dir = _get_processing_target_dir(processing_target) log.debug(f"Renaming '{ready_target}' to '{processing_target}'.") os.rename(ready_target_dir, processing_target_dir) log.debug(f"Moving targets for '{processing_target}' from post-to-tuf directory to inbound directory.") _stage_target_dir(processing_target) log.debug(f"Renaming targets for '{processing_target}' within inbound directory.") _rename_target_files(processing_target) try: log.info(f"Dispatching 'add-targets' task for: {processing_target}") _dispatch_add_targets(processing_target) log.debug("Updating snapshot metadata") _dispatch_update_snapshot() log.debug("Updating timestamp metadata") _dispatch_update_timestamp() except RuggedMetadataError as e: log_exception(e) _reset_processing_target(processing_target) sys.exit("Check the logs for more detailed error reporting.") try: log.debug(f"Removing directory '{processing_target_dir}'") shutil.rmtree(processing_target_dir) except OSError as e: log_exception(e) log.error(f"Error removing directory '{processing_target_dir}'") sys.exit("Check the logs for more detailed error reporting.") @worker.task(name='reset_semaphores_task', queue=queue) def reset_semaphores_task(**context): """ Task to reset stale semaphores. """ log.debug("Received reset-semaphores task.") log.debug("Checking for stale processing directories.") if processing_dir_is_stale(): log.warning("Found stale processing directory. Resetting.") reset_stale_processing_dirs() log.debug("Checking for stale expiry-refreshing flag.") if refreshing_flag_is_stale(): log.warning("Found stale expiry-refreshing flag. Resetting.") reset_stale_refreshing_dir() @worker.task(name='check_monitor_task', queue=queue) def check_monitor_task(**context): """ Task to check that the monitor worker has proper filesystem permissions. """ log.info("Received check-monitor task.") results = { 'successes': [], 'warnings': [], 'failures': [], } _check_monitor_filesystem_access(results) _check_monitor_status(results) _check_monitor_semaphores(results) return results
Rugged (Celery) worker for monitoring a directory and dispatching
add-targets
tasks.N.B. This worker should not have access to any keys.
Ancestors
Static methods
def set_log_level(sender: celery.app.base.Celery, **kwargs)
-
Expand source code
@staticmethod @task_prerun.connect def set_log_level(sender: Celery, **kwargs): """ Set log level for regular and scheduled tasks. """ try: log_level = kwargs['kwargs']['log_level'] except KeyError: log_level = config["scheduler_log_level"].get() log.setLevel(log_level)
Set log level for regular and scheduled tasks.
Methods
def check_monitor_task(**context)
-
Expand source code
@worker.task(name='check_monitor_task', queue=queue) def check_monitor_task(**context): """ Task to check that the monitor worker has proper filesystem permissions. """ log.info("Received check-monitor task.") results = { 'successes': [], 'warnings': [], 'failures': [], } _check_monitor_filesystem_access(results) _check_monitor_status(results) _check_monitor_semaphores(results) return results
Task to check that the monitor worker has proper filesystem permissions.
def find_new_targets_task(**context)
-
Expand source code
@worker.task(name='find_new_targets_task', queue=queue) def find_new_targets_task(**context): """ Task to check for new targets in post-to-tuf directory. """ log.debug("Received find-new-targets task.") if monitor_worker_is_paused(): log.warning("The monitor worker is paused.") log.info("Skipping find-new-targets task.") return files = get_post_to_tuf_dir_contents() if not files: log.debug("No new content found.") return log.info("New content found in post-to-tuf directory.") log.info(f"Count of post-to-tuf content: {len(files)}") age = round(get_age_of_oldest_post_to_tuf_dir_content()) log.info(f"Age of oldest post-to-tuf content: {age} seconds") log.debug("List of post-to-tuf content:") for file in files: log.debug(f" {file}") if monitor_worker_is_refreshing(): log.info("Monitor worker is refreshing expiry periods. Waiting for next scan to continue.") return ready_targets = _match_targets(files) if not ready_targets: log.info("No new targets found.") return if monitor_worker_is_processing(): log.info("Monitor worker is already processing targets. Waiting for next scan to continue.") return log.info("New targets found in post-to-tuf directory:") for ready_target in ready_targets: log.info(f" {ready_target}") ready_target = ready_targets[0] log.info(f"Processing: {ready_target}") ready_target_dir = _get_ready_target_dir(ready_target) processing_target = _get_processing_target(ready_target) processing_target_dir = _get_processing_target_dir(processing_target) log.debug(f"Renaming '{ready_target}' to '{processing_target}'.") os.rename(ready_target_dir, processing_target_dir) log.debug(f"Moving targets for '{processing_target}' from post-to-tuf directory to inbound directory.") _stage_target_dir(processing_target) log.debug(f"Renaming targets for '{processing_target}' within inbound directory.") _rename_target_files(processing_target) try: log.info(f"Dispatching 'add-targets' task for: {processing_target}") _dispatch_add_targets(processing_target) log.debug("Updating snapshot metadata") _dispatch_update_snapshot() log.debug("Updating timestamp metadata") _dispatch_update_timestamp() except RuggedMetadataError as e: log_exception(e) _reset_processing_target(processing_target) sys.exit("Check the logs for more detailed error reporting.") try: log.debug(f"Removing directory '{processing_target_dir}'") shutil.rmtree(processing_target_dir) except OSError as e: log_exception(e) log.error(f"Error removing directory '{processing_target_dir}'") sys.exit("Check the logs for more detailed error reporting.")
Task to check for new targets in post-to-tuf directory.
def refresh_expiry_task(**context)
-
Expand source code
@worker.task(name='refresh_expiry_task', queue=queue) def refresh_expiry_task(**context): """ Task to refresh imminently expiring metadata. """ log.debug("Received refresh-expiry task.") if monitor_worker_is_paused(): log.warning("The monitor worker is paused.") log.info("Skipping expiry refresh task.") return if monitor_worker_is_refreshing(): flag_path = get_refreshing_flag_path() log.warning(f"A possibly stale refresh-expiry task semaphore was found at: {flag_path}") log.info("Skipping expiry refresh task.") return create_refreshing_flag() wait_for_processing_task_to_complete() try: log.info("Fetching expiring metadata.") metadata = _fetch_expiring_metadata() for tuf_worker, role_list in metadata.items(): log.info(f"Expiring metadata from {tuf_worker}:") if not role_list: log.info(" No expiring metadata.") for role in role_list: log.info(f" {role}.json") log.info("Refreshing any imminently expiring metadata.") _refresh_expiring_metadata(metadata) delete_refreshing_flag() except RuggedMetadataError as e: log_exception(e) log.error("Failed to fetch expiring metadata.") delete_refreshing_flag()
Task to refresh imminently expiring metadata.
def reset_semaphores_task(**context)
-
Expand source code
@worker.task(name='reset_semaphores_task', queue=queue) def reset_semaphores_task(**context): """ Task to reset stale semaphores. """ log.debug("Received reset-semaphores task.") log.debug("Checking for stale processing directories.") if processing_dir_is_stale(): log.warning("Found stale processing directory. Resetting.") reset_stale_processing_dirs() log.debug("Checking for stale expiry-refreshing flag.") if refreshing_flag_is_stale(): log.warning("Found stale expiry-refreshing flag. Resetting.") reset_stale_refreshing_dir()
Task to reset stale semaphores.
def schedule_periodic_tasks(sender: celery.app.base.Celery, **kwargs)
-
Expand source code
@worker.on_after_configure.connect def schedule_periodic_tasks(sender: Celery, **kwargs): """ Schedule periodic tasks for the monitor-worker. """ period = config["scheduler_scan_period"].get() if period > 0: log.info(f"Scheduling find-new-targets task to run every {period} seconds.") sender.add_periodic_task(period, MonitorWorker.find_new_targets_task) period = config["scheduler_refresh_period"].get() if period > 0: log.info(f"Scheduling refresh-expiry task to run every {period} seconds.") delete_refreshing_flag() sender.add_periodic_task(period, MonitorWorker.refresh_expiry_task) period = config["scheduler_reset_period"].get() if period > 0: log.info(f"Scheduling reset-semaphores task to run every {period} seconds.") sender.add_periodic_task(period, MonitorWorker.reset_semaphores_task)
Schedule periodic tasks for the monitor-worker.
Inherited members