Source code for mass_driver.commands

"""The different main commands of the mass-driver tool"""

import logging
import sys
from argparse import Namespace
from typing import Callable, Optional

from pydantic import ValidationError

from mass_driver.activity_run import sequential_run, thread_run
from mass_driver.discovery import (
    discover_drivers,
    discover_forges,
    discover_scanners,
    discover_sources,
    get_driver_entrypoint,
    get_forge,
    get_forge_entrypoint,
    get_source_entrypoint,
)
from mass_driver.forge_run import main as forge_main
from mass_driver.forge_run import pause_until_ok
from mass_driver.models.activity import ActivityLoaded, ActivityOutcome
from mass_driver.models.repository import IndexedRepos, SourcedRepo
from mass_driver.review_run import review
from mass_driver.summarize import summarize_forge, summarize_migration, summarize_source


[docs] def drivers_command(args: Namespace): """Process the CLI for 'Drivers' subcommand""" return plugins_command(args, "driver", get_driver_entrypoint, discover_drivers)
[docs] def forges_command(args: Namespace): """Process the CLI for 'Forges' subcommand""" return plugins_command(args, "forge", get_forge_entrypoint, discover_forges)
[docs] def sources_command(args: Namespace): """Process the CLI for 'Sources' subcommand""" return plugins_command(args, "source", get_source_entrypoint, discover_sources)
[docs] def plugins_command( args: Namespace, plugin: str, entrypoint: Callable, discover: Callable ): """Process the CLI for a generic plugin subcommand""" if args.info: target_plugin = args.info try: plugin_obj = entrypoint(target_plugin) logging.info( f"Plugin name: {plugin_obj.name}; Import path: " f"{plugin_obj.module}; Class: {plugin_obj.attr}" ) logging.info(plugin_obj.load().__doc__) return except ImportError as e: logging.error("Error importing plugin", exc_info=e) logging.error(f"Try `mass driver {plugin}s --list`") return # if args.list: # Implicit plugins = discover() logging.info(f"Available {plugin}s:") for plugin_obj in plugins: logging.info(plugin_obj.name) return True
[docs] def run_command(args: Namespace) -> ActivityOutcome: """Process the CLI for 'run'""" logging.basicConfig(stream=sys.stdout, level=logging.INFO) logger = logging.getLogger("run") logger.info("Run mode!") activity_str = args.activity_file.read() try: activity = ActivityLoaded.from_config(activity_str) except ValidationError as e: config_error_exit(e) except ImportError as e: logger.exception(e) raise e # Source discovery to know what repos to patch/forge/scan source_config = activity.source repos_sourced = source_repolist_args(args) sum_logger = logging.getLogger("summarize") if repos_sourced is None: # No repo-list from CLI flags: call Source repos_sourced = source_config.source.discover() summarize_source(repos_sourced, sum_logger) if needs_run(activity): run_variant = thread_run if args.parallel else sequential_run run_result = run_variant( activity, repos_sourced, not args.no_cache, ) if activity.migration is not None and run_result.migration_result is not None: summarize_migration(run_result.migration_result, sum_logger) else: logger.info("No clone needed: skipping") run_result = ActivityOutcome(repos_sourced=repos_sourced) logger.info("Main phase complete!") if activity.forge is None: # Nothing else to do, just print completion and exit logger.info("No Forge: end") maybe_save_outcome(args, run_result) return run_result # Now guaranteed to have a Forge: pause + forge if not args.no_pause: logger.info("Review the commits now.") pause_until_ok("Type y/yes/continue to run the Forge\n") result = forge_main(activity.forge, run_result) maybe_save_outcome(args, result) if result.forge_result is not None: summarize_forge(result.forge_result, sum_logger) return result
[docs] def scanners_command(args: Namespace): """Process the CLI for 'scan'""" logging.info("Available scanners:") scanners = discover_scanners() for scanner in scanners: logging.info(scanner.name) return True
[docs] def review_pr_command(args: Namespace): """Review a list of Pull Requests""" logging.info("Pull request review mode!") # FIXME: ALl this can crash, you know! forge_class = get_forge(args.forge) forge = forge_class() # Credentials via env pr_list = args.pr if args.pr_filelist: pr_list = args.pr_filelist.read().strip().split("\n") review(pr_list, forge) return 0
[docs] def config_error_exit(e: ValidationError): """Exit in case of bad config models""" model_class = e.model.__base__ try: # Assume the class failing validation has env prefix env_prefix = model_class.Config.env_prefix except Exception: logging.error("Missing config", exc_info=e) raise # We have a valid env_prefix now, use it to show missing envvar model_class_name = model_class.__name__ for error in e.errors(): if error["type"] == "value_error.missing": envvars = [env_prefix + var.upper() for var in error["loc"]] logging.error( f"Missing {model_class_name} config: Set envvar(s) {', '.join(envvars)}" ) else: logging.error(f"{model_class_name} config validation error: {error}") raise e # exit code = Simulate the argparse behaviour of exiting on bad args
[docs] def source_repolist_args(args) -> Optional[IndexedRepos]: """Read the repo from args, if any""" repos = read_repolist(args) if repos is not None: return ( {url: SourcedRepo(repo_id=url, clone_url=url) for url in repos} if repos else None ) return None
[docs] def read_repolist(args) -> Optional[list[str]]: """Read the repo-list or repo-path arg, if any""" if args.repo_path: return args.repo_path if args.repo_filelist: return args.repo_filelist.read().strip().split("\n") return None
[docs] def maybe_save_outcome(args: Namespace, outcome: ActivityOutcome): """Consider saving the outcome""" if not args.json_outfile: return save_outcome(outcome, args.json_outfile) logging.info("Saved outcome to given JSON file")
[docs] def save_outcome(outcome: ActivityOutcome, out_file): """Save the output to given JSON file handle""" out_file.write(outcome.json(indent=2)) out_file.write("\n")
[docs] def needs_run(activity: ActivityLoaded) -> bool: """Check if we need to call the activity_run command = Clone/Mig/Scan step We usually do if: migration OR scan OR forge with git_push_first=True Last one because git_push requires resolving ssh clone url to local repo path which is what clone step does in activity_run """ got_mig = activity.migration is not None got_scan = activity.scan is not None got_forge_clone = activity.forge is not None and activity.forge.git_push_first # activity-running is only needed if we need some clone: return got_mig or got_scan or got_forge_clone