Module rugged.workers.monitor-worker

Classes

class MonitorWorker
Expand source code
class MonitorWorker(BaseWorker):
    """
    Rugged (Celery) worker for monitoring a directory and dispatching `add-targets` tasks.

    N.B. This worker should not have access to any keys.
    """

    # This is invoked before each task starts execution.
    # This is special-cased because scheduled tasks don't have a log_level
    # passed in, they use the configured scheduler_log_level.
    @staticmethod
    @task_prerun.connect
    def set_log_level(sender: Celery, **kwargs):
        """ Set log level for regular and scheduled tasks. """
        try:
            log_level = kwargs['kwargs']['log_level']
        except KeyError:
            log_level = config["scheduler_log_level"].get()

        log.setLevel(log_level)

    # pyrefly: ignore[missing-attribute]
    @worker.on_after_configure.connect
    def schedule_periodic_tasks(sender: Celery, **kwargs):
        """ Schedule periodic tasks for the monitor-worker. """

        period = config["scheduler_scan_period"].get()
        if period > 0:
            log.info(f"Scheduling find-new-targets task to run every {period} seconds.")
            sender.add_periodic_task(period, MonitorWorker.find_new_targets_task)

        period = config["scheduler_refresh_period"].get()
        if period > 0:
            log.info(f"Scheduling refresh-expiry task to run every {period} seconds.")
            delete_refreshing_flag()
            sender.add_periodic_task(period, MonitorWorker.refresh_expiry_task)

        period = config["scheduler_reset_period"].get()
        if period > 0:
            log.info(f"Scheduling reset-semaphores task to run every {period} seconds.")
            sender.add_periodic_task(period, MonitorWorker.reset_semaphores_task)

    @worker.task(name='refresh_expiry_task', queue=queue)
    def refresh_expiry_task(**context):
        """ Task to refresh imminently expiring metadata. """
        log.debug("Received refresh-expiry task.")

        if monitor_worker_is_paused():
            log.warning("The monitor worker is paused.")
            log.info("Skipping expiry refresh task.")
            return

        if monitor_worker_is_refreshing():
            flag_path = get_refreshing_flag_path()
            log.warning(f"A possibly stale refresh-expiry task semaphore was found at: {flag_path}")
            log.info("Skipping expiry refresh task.")
            return

        create_refreshing_flag()

        wait_for_processing_task_to_complete()

        try:
            log.info("Fetching expiring metadata.")
            metadata = _fetch_expiring_metadata()
            for tuf_worker, role_list in metadata.items():
                log.info(f"Expiring metadata from {tuf_worker}:")
                if not role_list:
                    log.info("  No expiring metadata.")
                for role in role_list:
                    log.info(f"  {role}.json")
            log.info("Refreshing any imminently expiring metadata.")
            _refresh_expiring_metadata(metadata)
            delete_refreshing_flag()
        except RuggedMetadataError as e:
            log_exception(e)
            log.error("Failed to fetch expiring metadata.")
            delete_refreshing_flag()

    @worker.task(name='find_new_targets_task', queue=queue)
    def find_new_targets_task(**context):
        """ Task to check for new targets in post-to-tuf directory. """
        log.debug("Received find-new-targets task.")

        if monitor_worker_is_paused():
            log.warning("The monitor worker is paused.")
            log.info("Skipping find-new-targets task.")
            return

        files = get_post_to_tuf_dir_contents()
        if not files:
            log.debug("No new content found.")
            return

        log.info("New content found in post-to-tuf directory.")

        log.info(f"Count of post-to-tuf content: {len(files)}")

        age = round(get_age_of_oldest_post_to_tuf_dir_content())
        log.info(f"Age of oldest post-to-tuf content: {age} seconds")
        log.debug("List of post-to-tuf content:")
        for file in files:
            log.debug(f"  {file}")

        if monitor_worker_is_refreshing():
            log.info("Monitor worker is refreshing expiry periods. Waiting for next scan to continue.")
            return

        ready_targets = _match_targets(files)
        if not ready_targets:
            log.info("No new targets found.")
            return

        if monitor_worker_is_processing():
            log.info("Monitor worker is already processing targets. Waiting for next scan to continue.")
            return

        log.info("New targets found in post-to-tuf directory:")
        for ready_target in ready_targets:
            log.info(f"  {ready_target}")

        ready_target = ready_targets[0]
        log.info(f"Processing: {ready_target}")

        ready_target_dir = _get_ready_target_dir(ready_target)
        processing_target = _get_processing_target(ready_target)
        processing_target_dir = _get_processing_target_dir(processing_target)
        log.debug(f"Renaming '{ready_target}' to '{processing_target}'.")
        os.rename(ready_target_dir, processing_target_dir)

        log.debug(f"Moving targets for '{processing_target}' from post-to-tuf directory to inbound directory.")
        _stage_target_dir(processing_target)
        log.debug(f"Renaming targets for '{processing_target}' within inbound directory.")
        _rename_target_files(processing_target)
        try:
            log.info(f"Dispatching 'add-targets' task for: {processing_target}")
            _dispatch_add_targets(processing_target)
            log.debug("Updating snapshot metadata")
            _dispatch_update_snapshot()
            log.debug("Updating timestamp metadata")
            _dispatch_update_timestamp()
        except RuggedMetadataError as e:
            log_exception(e)
            _reset_processing_target(processing_target)
            sys.exit("Check the logs for more detailed error reporting.")

        try:
            log.debug(f"Removing directory '{processing_target_dir}'")
            shutil.rmtree(processing_target_dir)
        except OSError as e:
            log_exception(e)
            log.error(f"Error removing directory '{processing_target_dir}'")
            sys.exit("Check the logs for more detailed error reporting.")

    @worker.task(name='reset_semaphores_task', queue=queue)
    def reset_semaphores_task(**context):
        """ Task to reset stale semaphores. """
        log.debug("Received reset-semaphores task.")

        log.debug("Checking for stale processing directories.")
        if processing_dir_is_stale():
            log.warning("Found stale processing directory. Resetting.")
            reset_stale_processing_dirs()

        log.debug("Checking for stale expiry-refreshing flag.")
        if refreshing_flag_is_stale():
            log.warning("Found stale expiry-refreshing flag. Resetting.")
            reset_stale_refreshing_dir()

    @worker.task(name='check_monitor_task', queue=queue)
    def check_monitor_task(**context):
        """ Task to check that the monitor worker has proper filesystem permissions. """
        log.info("Received check-monitor task.")

        results = {
            'successes': [],
            'warnings': [],
            'failures': [],
        }

        _check_monitor_filesystem_access(results)
        _check_monitor_status(results)
        _check_monitor_semaphores(results)

        return results

Rugged (Celery) worker for monitoring a directory and dispatching add-targets tasks.

N.B. This worker should not have access to any keys.

Ancestors

Static methods

def set_log_level(sender: celery.app.base.Celery, **kwargs)
Expand source code
@staticmethod
@task_prerun.connect
def set_log_level(sender: Celery, **kwargs):
    """ Set log level for regular and scheduled tasks. """
    try:
        log_level = kwargs['kwargs']['log_level']
    except KeyError:
        log_level = config["scheduler_log_level"].get()

    log.setLevel(log_level)

Set log level for regular and scheduled tasks.

Methods

def check_monitor_task(**context)
Expand source code
@worker.task(name='check_monitor_task', queue=queue)
def check_monitor_task(**context):
    """ Task to check that the monitor worker has proper filesystem permissions. """
    log.info("Received check-monitor task.")

    results = {
        'successes': [],
        'warnings': [],
        'failures': [],
    }

    _check_monitor_filesystem_access(results)
    _check_monitor_status(results)
    _check_monitor_semaphores(results)

    return results

Task to check that the monitor worker has proper filesystem permissions.

def find_new_targets_task(**context)
Expand source code
@worker.task(name='find_new_targets_task', queue=queue)
def find_new_targets_task(**context):
    """ Task to check for new targets in post-to-tuf directory. """
    log.debug("Received find-new-targets task.")

    if monitor_worker_is_paused():
        log.warning("The monitor worker is paused.")
        log.info("Skipping find-new-targets task.")
        return

    files = get_post_to_tuf_dir_contents()
    if not files:
        log.debug("No new content found.")
        return

    log.info("New content found in post-to-tuf directory.")

    log.info(f"Count of post-to-tuf content: {len(files)}")

    age = round(get_age_of_oldest_post_to_tuf_dir_content())
    log.info(f"Age of oldest post-to-tuf content: {age} seconds")
    log.debug("List of post-to-tuf content:")
    for file in files:
        log.debug(f"  {file}")

    if monitor_worker_is_refreshing():
        log.info("Monitor worker is refreshing expiry periods. Waiting for next scan to continue.")
        return

    ready_targets = _match_targets(files)
    if not ready_targets:
        log.info("No new targets found.")
        return

    if monitor_worker_is_processing():
        log.info("Monitor worker is already processing targets. Waiting for next scan to continue.")
        return

    log.info("New targets found in post-to-tuf directory:")
    for ready_target in ready_targets:
        log.info(f"  {ready_target}")

    ready_target = ready_targets[0]
    log.info(f"Processing: {ready_target}")

    ready_target_dir = _get_ready_target_dir(ready_target)
    processing_target = _get_processing_target(ready_target)
    processing_target_dir = _get_processing_target_dir(processing_target)
    log.debug(f"Renaming '{ready_target}' to '{processing_target}'.")
    os.rename(ready_target_dir, processing_target_dir)

    log.debug(f"Moving targets for '{processing_target}' from post-to-tuf directory to inbound directory.")
    _stage_target_dir(processing_target)
    log.debug(f"Renaming targets for '{processing_target}' within inbound directory.")
    _rename_target_files(processing_target)
    try:
        log.info(f"Dispatching 'add-targets' task for: {processing_target}")
        _dispatch_add_targets(processing_target)
        log.debug("Updating snapshot metadata")
        _dispatch_update_snapshot()
        log.debug("Updating timestamp metadata")
        _dispatch_update_timestamp()
    except RuggedMetadataError as e:
        log_exception(e)
        _reset_processing_target(processing_target)
        sys.exit("Check the logs for more detailed error reporting.")

    try:
        log.debug(f"Removing directory '{processing_target_dir}'")
        shutil.rmtree(processing_target_dir)
    except OSError as e:
        log_exception(e)
        log.error(f"Error removing directory '{processing_target_dir}'")
        sys.exit("Check the logs for more detailed error reporting.")

Task to check for new targets in post-to-tuf directory.

def refresh_expiry_task(**context)
Expand source code
@worker.task(name='refresh_expiry_task', queue=queue)
def refresh_expiry_task(**context):
    """ Task to refresh imminently expiring metadata. """
    log.debug("Received refresh-expiry task.")

    if monitor_worker_is_paused():
        log.warning("The monitor worker is paused.")
        log.info("Skipping expiry refresh task.")
        return

    if monitor_worker_is_refreshing():
        flag_path = get_refreshing_flag_path()
        log.warning(f"A possibly stale refresh-expiry task semaphore was found at: {flag_path}")
        log.info("Skipping expiry refresh task.")
        return

    create_refreshing_flag()

    wait_for_processing_task_to_complete()

    try:
        log.info("Fetching expiring metadata.")
        metadata = _fetch_expiring_metadata()
        for tuf_worker, role_list in metadata.items():
            log.info(f"Expiring metadata from {tuf_worker}:")
            if not role_list:
                log.info("  No expiring metadata.")
            for role in role_list:
                log.info(f"  {role}.json")
        log.info("Refreshing any imminently expiring metadata.")
        _refresh_expiring_metadata(metadata)
        delete_refreshing_flag()
    except RuggedMetadataError as e:
        log_exception(e)
        log.error("Failed to fetch expiring metadata.")
        delete_refreshing_flag()

Task to refresh imminently expiring metadata.

def reset_semaphores_task(**context)
Expand source code
@worker.task(name='reset_semaphores_task', queue=queue)
def reset_semaphores_task(**context):
    """ Task to reset stale semaphores. """
    log.debug("Received reset-semaphores task.")

    log.debug("Checking for stale processing directories.")
    if processing_dir_is_stale():
        log.warning("Found stale processing directory. Resetting.")
        reset_stale_processing_dirs()

    log.debug("Checking for stale expiry-refreshing flag.")
    if refreshing_flag_is_stale():
        log.warning("Found stale expiry-refreshing flag. Resetting.")
        reset_stale_refreshing_dir()

Task to reset stale semaphores.

def schedule_periodic_tasks(sender: celery.app.base.Celery, **kwargs)
Expand source code
@worker.on_after_configure.connect
def schedule_periodic_tasks(sender: Celery, **kwargs):
    """ Schedule periodic tasks for the monitor-worker. """

    period = config["scheduler_scan_period"].get()
    if period > 0:
        log.info(f"Scheduling find-new-targets task to run every {period} seconds.")
        sender.add_periodic_task(period, MonitorWorker.find_new_targets_task)

    period = config["scheduler_refresh_period"].get()
    if period > 0:
        log.info(f"Scheduling refresh-expiry task to run every {period} seconds.")
        delete_refreshing_flag()
        sender.add_periodic_task(period, MonitorWorker.refresh_expiry_task)

    period = config["scheduler_reset_period"].get()
    if period > 0:
        log.info(f"Scheduling reset-semaphores task to run every {period} seconds.")
        sender.add_periodic_task(period, MonitorWorker.reset_semaphores_task)

Schedule periodic tasks for the monitor-worker.

Inherited members