Module rugged.commands.status

Expand source code
import click
import humanize
import sys
from rugged.lib.host_header import print_host_header
from rugged.lib.logger import get_logger, log_exception
from rugged.lib.task_queue import run_task
from rugged.workers import get_workers
from tabulate import tabulate
from rugged.tuf.repo import RuggedRepository

log = get_logger()


@click.command("status")
@click.option(
    "--local",
    is_flag=True,
    default=False,
    help="Print the local repo status. Defaults to true, unless a worker is "
         "specified.",
)
@click.option(
    "--worker",
    default=[],
    multiple=True,
    help="The specific worker from which to retrieve repo status. Can be "
         "passed multiple times.",
)
def status_cmd(local, worker):
    """Print a status message for a TUF repository."""
    try:
        repo = RuggedRepository()
        repo.load()
    except Exception as e:
        log_exception(e)
        log.error("Failed to instantiate repository.")
        sys.exit("Check the logs for more detailed error reporting.")
    if local or not worker:
        _print_status('local operations', repo.status())
    if local and not worker:
        return
    for worker in get_workers(worker):
        status = run_task(worker, 'status')
        _print_status(worker, status)


def _print_status(host, status):
    """ Print formatted status for a given host. """
    print_host_header("Repository status", host)
    _print_targets_info(status)
    _print_role_info(status)
    _print_key_info(status)


def _print_targets_info(status):
    """ Print repo targets information. """
    headers = [
        "Targets",
        "Total Size",
    ]
    table = [
        [
            status['targets']['count'],
            humanize.naturalsize(status['targets']['size']),
        ]
    ]
    print(tabulate(table, headers=headers) + "\n")


def _print_role_info(status):
    """ Print table of repo role information. """
    headers = [
        "Role",
        "Capability",
        "Signatures",
        "Version",
        "TUF Spec",
        "Expires",
    ]
    table = []
    for role_name, role_info in status['roles'].items():
        key_type = "Verification"
        for key_info in role_info['keys'].values():
            if 'private' in key_info['types']:
                key_type = "Signing"
                break
        table.append([
            role_name,
            key_type,
            f"{role_info['signatures']} / {role_info['threshold']}",
            role_info['version'],
            role_info['tuf_spec'],
            role_info['expires'],
        ])
    print(tabulate(table, headers=headers) + "\n")


def _print_key_info(status):
    """ Print table of repo key information. """
    headers = [
        "Key name",
        "Role",
        "Key type(s)",
        "Scheme",
        "Path",
    ]
    table = []
    for role_name, role_info in status['roles'].items():
        for key_name, key_info in role_info['keys'].items():
            table.append([
                key_name,
                role_name,
                key_info['types'],
                key_info['scheme'],
                key_info['key_path'],
            ])
    print(tabulate(table, headers=headers) + "\n")