Module rugged.tuf.targets

Functions

def add_targets() ‑> Tuple[bool, Dict[str, Any]]
Expand source code
def add_targets() -> Tuple[bool, Dict[str, Any]]:
    """ Add targets to a TUF repository. """
    inbound_targets = get_inbound_targets()
    repo = RuggedRepository()
    if hashed_bins_is_enabled():
        repo.load_metadata_for_hashed_bins_targets(inbound_targets)
    else:
        repo.load()
    added_targets = repo.add_targets()
    failed_targets = set(inbound_targets).difference(set(added_targets))
    if failed_targets:
        result = False
        data = {'failed_targets': list(failed_targets)}
    else:
        if hashed_bins_is_enabled():
            for added_target in added_targets:
                bin_to_update = find_hash_bin(added_target)
                repo.write_metadata(bin_to_update)
        else:
            repo.write_metadata('targets')
        result = True
        data = {'added_targets': added_targets}

    delete_empty_inbound_target_dirs()

    return (result, data)

Add targets to a TUF repository.

def delete_empty_inbound_target_dirs() ‑> None
Expand source code
def delete_empty_inbound_target_dirs() -> None:
    """ Delete empty directories from inbound targets directory. """
    inbound_targets_dir = config['inbound_targets_path'].get()
    log.debug(f"Scanning for empty directories in inbound targets: '{inbound_targets_dir}'")
    # We walk the tree depth-first (bottom-up) to be able to prune the leaves first.
    for current_dir, dirs, files in walk(inbound_targets_dir, topdown=False):
        if current_dir == inbound_targets_dir:
            # Don't delete the inbound targets directory itself.
            continue
        if files:
            # Skip any directories with files in them.
            continue

        # If there were any directories in the initial walk, then rescan to get
        # the latest contents, because previous iterations should have deleted
        # directories.
        if dirs and listdir(current_dir):
            log.warning(f"Found unexpected files or directories in '{current_dir}'.")
            continue

        try:
            rmdir(current_dir)
            log.debug(f"Cleaned up empty directory '{current_dir}'.")
        except Exception as e:
            log_exception(e)
            log.warning(f"Failed to clean up empty directory '{current_dir}'.")

Delete empty directories from inbound targets directory.

def remove_targets(targets: List[str]) ‑> Tuple[bool, Dict[str, Any]]
Expand source code
def remove_targets(targets: List[str]) -> Tuple[bool, Dict[str, Any]]:
    """ Remove targets from a TUF repository. """
    repo = RuggedRepository()
    if hashed_bins_is_enabled():
        repo.load_metadata_for_hashed_bins_targets(targets)
    else:
        repo.load()
    removed_targets = repo.remove_targets(targets)
    failed_targets = set(targets).difference(set(removed_targets))
    if failed_targets:
        result = False
        data = {'failed_targets': list(failed_targets)}
    else:
        if hashed_bins_is_enabled():
            for removed_target in removed_targets:
                bin_to_update = find_hash_bin(removed_target)
                repo.write_metadata(bin_to_update)
        else:
            repo.write_metadata('targets')
        result = True
        data = {'removed_targets': removed_targets}
    return (result, data)

Remove targets from a TUF repository.