Module rugged.lib.semaphores.pause
Functions
def create_pause_processing_flag()
-
Expand source code
def create_pause_processing_flag(): """ Create 'pause-processing' semaphore directory. """ flag_path = get_pause_processing_flag_path() log.debug(f"Creating 'pause-processing' semaphore directory: {flag_path}") if not monitor_worker_is_paused(): os.mkdir(flag_path) else: log.warning(f"Detected an existing pause-processing flag at: {flag_path}")
Create 'pause-processing' semaphore directory.
def delete_pause_processing_flag()
-
Expand source code
def delete_pause_processing_flag(): """ Delete 'pause-processing' semaphore directory. """ flag_path = get_pause_processing_flag_path() log.debug(f"Deleting 'pause-processing' semaphore directory: {flag_path}") if monitor_worker_is_paused(): os.rmdir(flag_path) else: log.warning(f"No pause-processing flag was found at: {flag_path}")
Delete 'pause-processing' semaphore directory.
def get_pause_processing_flag_path()
-
Expand source code
def get_pause_processing_flag_path(): """ Return the path to the 'pause-processing' task semaphore directory. """ return os.path.join(get_post_to_tuf_path(), RUGGED_MONITOR_TUF_PAUSED_FLAG)
Return the path to the 'pause-processing' task semaphore directory.
def monitor_worker_is_paused()
-
Expand source code
def monitor_worker_is_paused(): """ Determine whether the 'pause-processing' semaphore exists. """ flag_path = get_pause_processing_flag_path() if os.path.exists(flag_path): log.debug(f"Detected pause-processing flag at: {flag_path}") return True return False
Determine whether the 'pause-processing' semaphore exists.
def pause_processing_flag_is_stale() ‑> bool
-
Expand source code
def pause_processing_flag_is_stale() -> bool: """ Determine whether the 'pause-processing' task semaphore is stale. """ # If the flag doesn't exist, then it also isn't stale. if not monitor_worker_is_paused(): return False thresholds = config["stale_semaphore_age_thresholds"].get() now = time.time() changed = os.path.getmtime(get_pause_processing_flag_path()) age = now - changed return age > thresholds[RUGGED_MONITOR_TUF_PAUSED_FLAG]
Determine whether the 'pause-processing' task semaphore is stale.