Module rugged.lib.semaphores.processing

Functions

def get_processing_dir_path()
Expand source code
def get_processing_dir_path():
    """ Return the path to the directory of currently processing target files. """
    processing_dirs = get_processing_dirs()
    if processing_dirs:
        return processing_dirs[0]
    return ''

Return the path to the directory of currently processing target files.

def get_processing_dirs()
Expand source code
def get_processing_dirs():
    """ Return paths to any processing directories. """
    processing_dirs = []
    for file in get_post_to_tuf_dir_contents():
        if file.startswith(RUGGED_MONITOR_TUF_PROCESSING_PREFIX):
            path = os.path.join(get_post_to_tuf_path(), file)
            log.debug(f"Detected processing directory at: {path}")
            processing_dirs.append(path)
    return processing_dirs

Return paths to any processing directories.

def get_stale_processing_dirs()
Expand source code
def get_stale_processing_dirs():
    """ Return paths to any stale processing directories. """
    thresholds = config["stale_semaphore_age_thresholds"].get()
    max_age = thresholds[RUGGED_MONITOR_TUF_PROCESSING_PREFIX]
    now = time.time()
    stale_dirs = []
    for processing_dir in get_processing_dirs():
        changed = os.path.getmtime(processing_dir)
        age = now - changed
        if age > max_age:
            stale_dirs.append(processing_dir)
    return stale_dirs

Return paths to any stale processing directories.

def monitor_worker_is_processing()
Expand source code
def monitor_worker_is_processing():
    """ Determine whether new targets are currently being processed. """
    return bool(get_processing_dirs())

Determine whether new targets are currently being processed.

def processing_dir_is_stale() ‑> bool
Expand source code
def processing_dir_is_stale() -> bool:
    """ Determine whether the current processing targets directory is stale. """
    return bool(get_stale_processing_dirs())

Determine whether the current processing targets directory is stale.

def reset_processing_dir(processing_dir: str) ‑> None
Expand source code
def reset_processing_dir(processing_dir: str) -> None:
    """ Move a processing directory back to ready, so that it can be retried. """
    ready_path = processing_dir.replace(RUGGED_MONITOR_TUF_PROCESSING_PREFIX, RUGGED_MONITOR_TUF_READY_PREFIX)
    log.warning(f"Resetting processing directory ('{processing_dir}') to ready state.")
    try:
        log.debug(f"Renaming '{processing_dir}' to '{ready_path}'.")
        os.rename(processing_dir, ready_path)
    except OSError as e:
        log_exception(e)
        log.error(f"Error renaming '{processing_dir}' to '{ready_path}'.")
        sys.exit("Check the logs for more detailed error reporting.")
    log.info(f"Renamed '{processing_dir}' to '{ready_path}'.")

Move a processing directory back to ready, so that it can be retried.

def reset_stale_processing_dirs(force: bool = False) ‑> None
Expand source code
def reset_stale_processing_dirs(force: bool = False) -> None:
    """ Move any stale processing directories back to ready, so that they can be retried. """
    reset_dirs = get_processing_dirs() if force else get_stale_processing_dirs()
    if not reset_dirs:
        path = os.path.join(get_post_to_tuf_path(), RUGGED_MONITOR_TUF_PROCESSING_PREFIX)
        log.info(f"No stale processing directories were found at: {path}")
        return
    for reset_dir in reset_dirs:
        reset_processing_dir(reset_dir)

Move any stale processing directories back to ready, so that they can be retried.

def wait_for_processing_task_to_complete(timeout: int = 0)
Expand source code
def wait_for_processing_task_to_complete(timeout: int = 0):
    """ Detect if there's a scheduled 'add-targets' task processing, and wait for it to complete. """
    if monitor_worker_is_processing():
        log.info("Waiting for currently processing 'add-targets' task to complete.")
        seconds = 0
        while monitor_worker_is_processing():
            sleep(1)
            seconds += 1
            if seconds % 5 == 0:
                log.info(f"Waited {seconds} seconds for currently processing 'add-targets' task to complete.")
            if timeout and seconds >= timeout:
                raise RuggedTimeoutError("Timeout expired waiting for currently processing 'add-targets' task to "
                                         "complete.")
        log.info("The 'add-targets' task is complete.")

Detect if there's a scheduled 'add-targets' task processing, and wait for it to complete.