Source code for mass_driver.activity_run

"""Main activities of Mass-Driver: For each repo, clone it, then scan/migrate it

Variants for sequential or parallel
"""

import logging
from concurrent import futures
from copy import deepcopy

from mass_driver.git import (
    get_cache_folder,
)
from mass_driver.models.activity import (
    ActivityLoaded,
    ActivityOutcome,
    IndexedPatchResult,
    IndexedScanResult,
    ScanResult,
)
from mass_driver.models.patchdriver import PatchOutcome, PatchResult
from mass_driver.models.repository import (
    IndexedClonedRepos,
    IndexedRepos,
)
from mass_driver.process_repo import clone_repo, migrate_repo, scan_repo

LOGGER_PREFIX = "run"


[docs] def sequential_run( activity: ActivityLoaded, repos: IndexedRepos, cache: bool, ) -> ActivityOutcome: """Run the main activity SEQUENTIALLY: over N repos, clone, then scan/patch""" logger = logging.getLogger(LOGGER_PREFIX) repo_count = len(repos.keys()) migration = activity.migration scan = activity.scan cache_folder = get_cache_folder(cache, logger=logger) cloned_repos: IndexedClonedRepos = {} scanner_results: IndexedScanResult | None = None patch_results: IndexedPatchResult | None = None what_array = ["clone"] if scan is not None: what_array.append(f"{len(scan.scanners)} scanners") scanner_results = {} if migration is not None: what_array.append(f"{migration.driver=}") patch_results = {} logger.info(f"Processing {repo_count} with {' and '.join(what_array)}") for repo_index, (repo_id, repo) in enumerate(repos.items(), start=1): repo_logger_name = f"{logger.name}.repo.{repo_id.replace('.','_')}" repo_logger = logging.getLogger(repo_logger_name) try: logger.info(f"[{repo_index:03d}/{repo_count:03d}] Processing {repo_id}...") cloned_repo, repo_gitobj = clone_repo( repo, cache_folder, logger=repo_logger ) cloned_repos[repo_id] = cloned_repo except Exception as e: repo_logger.info(f"Error cloning repo '{repo_id}'\nError was: {e}") # FIXME: Clone failure lacks cloned_repo entry, dropping visibility of fail continue if scan and scanner_results is not None: try: scan_result = scan_repo(scan, cloned_repo) scanner_results[repo_id] = scan_result except Exception as e: repo_logger.error(f"Error scanning repo '{repo_id}'") repo_logger.error(f"Error was: {e}") # Reaching here should be impossible (catch-all in scan) if migration and patch_results is not None: try: # Ensure no driver persistence between repos migration_copy = deepcopy(migration) result, excep = migrate_repo( cloned_repo, repo_gitobj, migration_copy, logger=repo_logger ) patch_results[repo_id] = result except Exception as e: repo_logger.error(f"Error migrating repo '{repo_id}'") repo_logger.error(f"Error was: {e}") patch_results[repo_id] = PatchResult( outcome=PatchOutcome.PATCH_ERROR, details=f"Unhandled exception caught during patching. Error was: {e}", ) logger.info("Action completed: exiting") return ActivityOutcome( repos_sourced=repos, repos_cloned=cloned_repos, scan_result=scanner_results, migration_result=patch_results, )
[docs] def thread_run( activity: ActivityLoaded, repos: IndexedRepos, cache: bool, ) -> ActivityOutcome: """Run the main activity THREADED: over N repos, clone, then scan/patch""" logger = logging.getLogger(LOGGER_PREFIX) repo_count = len(repos.keys()) migration = activity.migration scan = activity.scan cache_folder = get_cache_folder(cache, logger=logger) cloned_repos: IndexedClonedRepos = {} scanner_results: IndexedScanResult | None = None patch_results: IndexedPatchResult | None = None what_array = ["clone"] if scan is not None: what_array.append(f"{len(scan.scanners)} scanners") scanner_results = {} if migration is not None: what_array.append(f"{migration.driver=}") patch_results = {} logger.info(f"Processing {repo_count} with {' and '.join(what_array)}, via Threads") futures_map = {} with futures.ThreadPoolExecutor(max_workers=8) as executor: for repo_id, repo in repos.items(): future_obj = executor.submit( per_repo_process, repo_id, repo, activity, logging.getLogger(f"{logger.name}.repo.{repo_id.replace('.','_')}"), cache_folder, ) futures_map[future_obj] = repo_id # Submitted the jobs: iterate on completion for repo_index, future in enumerate(futures.as_completed(futures_map), start=1): repo_id = futures_map[future] cloned_repo, scan_result, patch_result = future.result() logger.info(f"[{repo_index:04d}/{repo_count:04d}] Processed {repo_id}") cloned_repos[repo_id] = cloned_repo if scanner_results is not None: scanner_results[repo_id] = scan_result if patch_results is not None: patch_results[repo_id] = patch_result logger.info("Action completed: exiting") return ActivityOutcome( repos_sourced=repos, repos_cloned=cloned_repos, scan_result=scanner_results, migration_result=patch_results, )
[docs] def per_repo_process(repo_id, repo, activity, logger, cache_folder): """Process a single repo, in-thread""" try: logger.info(f"Processing {repo_id}...") cloned_repo, repo_gitobj = clone_repo(repo, cache_folder, logger=logger) except Exception as e: logger.info(f"Error cloning repo '{repo_id}'\nError was: {e}") raise e # FIXME: Use custom exeption for capturing error here scan_result: ScanResult | None = None if activity.scan is not None: try: scan_result = scan_repo(activity.scan, cloned_repo) except Exception as e: logger.error(f"Error scanning repo '{repo_id}'") logger.error(f"Error was: {e}") # Reaching here should be impossible (catch-all in scan) patch_result: PatchResult | None = None if activity.migration: try: # Ensure no driver persistence between repos migration_copy = deepcopy(activity.migration) patch_result, excep = migrate_repo( cloned_repo, repo_gitobj, migration_copy, logger=logger ) except Exception as e: logger.error(f"Error migrating repo '{repo_id}'") logger.error(f"Error was: {e}") patch_result = PatchResult( outcome=PatchOutcome.PATCH_ERROR, details=f"Unhandled exception caught during patching. Error was: {e}", ) # FIXME: Catch custom-exception into the PatchResult object return (cloned_repo, scan_result, patch_result)