Module rugged.lib.semaphores.processing
Functions
def get_processing_dir_path()
-
Expand source code
def get_processing_dir_path(): """ Return the path to the directory of currently processing target files. """ processing_dirs = get_processing_dirs() if processing_dirs: return processing_dirs[0] return ''
Return the path to the directory of currently processing target files.
def get_processing_dirs()
-
Expand source code
def get_processing_dirs(): """ Return paths to any processing directories. """ processing_dirs = [] for file in get_post_to_tuf_dir_contents(): if file.startswith(RUGGED_MONITOR_TUF_PROCESSING_PREFIX): path = os.path.join(get_post_to_tuf_path(), file) log.debug(f"Detected processing directory at: {path}") processing_dirs.append(path) return processing_dirs
Return paths to any processing directories.
def get_stale_processing_dirs()
-
Expand source code
def get_stale_processing_dirs(): """ Return paths to any stale processing directories. """ thresholds = config["stale_semaphore_age_thresholds"].get() max_age = thresholds[RUGGED_MONITOR_TUF_PROCESSING_PREFIX] now = time.time() stale_dirs = [] for processing_dir in get_processing_dirs(): changed = os.path.getmtime(processing_dir) age = now - changed if age > max_age: stale_dirs.append(processing_dir) return stale_dirs
Return paths to any stale processing directories.
def monitor_worker_is_processing()
-
Expand source code
def monitor_worker_is_processing(): """ Determine whether new targets are currently being processed. """ return bool(get_processing_dirs())
Determine whether new targets are currently being processed.
def processing_dir_is_stale() ‑> bool
-
Expand source code
def processing_dir_is_stale() -> bool: """ Determine whether the current processing targets directory is stale. """ return bool(get_stale_processing_dirs())
Determine whether the current processing targets directory is stale.
def reset_processing_dir(processing_dir: str) ‑> None
-
Expand source code
def reset_processing_dir(processing_dir: str) -> None: """ Move a processing directory back to ready, so that it can be retried. """ ready_path = processing_dir.replace(RUGGED_MONITOR_TUF_PROCESSING_PREFIX, RUGGED_MONITOR_TUF_READY_PREFIX) log.warning(f"Resetting processing directory ('{processing_dir}') to ready state.") try: log.debug(f"Renaming '{processing_dir}' to '{ready_path}'.") os.rename(processing_dir, ready_path) except OSError as e: log_exception(e) log.error(f"Error renaming '{processing_dir}' to '{ready_path}'.") sys.exit("Check the logs for more detailed error reporting.") log.info(f"Renamed '{processing_dir}' to '{ready_path}'.")
Move a processing directory back to ready, so that it can be retried.
def reset_stale_processing_dirs(force: bool = False) ‑> None
-
Expand source code
def reset_stale_processing_dirs(force: bool = False) -> None: """ Move any stale processing directories back to ready, so that they can be retried. """ reset_dirs = get_processing_dirs() if force else get_stale_processing_dirs() if not reset_dirs: path = os.path.join(get_post_to_tuf_path(), RUGGED_MONITOR_TUF_PROCESSING_PREFIX) log.info(f"No stale processing directories were found at: {path}") return for reset_dir in reset_dirs: reset_processing_dir(reset_dir)
Move any stale processing directories back to ready, so that they can be retried.
def wait_for_processing_task_to_complete(timeout: int = 0)
-
Expand source code
def wait_for_processing_task_to_complete(timeout: int = 0): """ Detect if there's a scheduled 'add-targets' task processing, and wait for it to complete. """ if monitor_worker_is_processing(): log.info("Waiting for currently processing 'add-targets' task to complete.") seconds = 0 while monitor_worker_is_processing(): sleep(1) seconds += 1 if seconds % 5 == 0: log.info(f"Waited {seconds} seconds for currently processing 'add-targets' task to complete.") if timeout and seconds >= timeout: raise RuggedTimeoutError("Timeout expired waiting for currently processing 'add-targets' task to " "complete.") log.info("The 'add-targets' task is complete.")
Detect if there's a scheduled 'add-targets' task processing, and wait for it to complete.