(coverage_dir: str, dataset: str)
| 74 | |
| 75 | |
| 76 | def collect_coverage_info(coverage_dir: str, dataset: str) -> Dict[str, Dict[str, Any]]: |
| 77 | os.makedirs(coverage_dir, exist_ok=True) |
| 78 | problems = get_problems(dataset) |
| 79 | task_ids = get_task_ids(dataset) |
| 80 | coverage_info = {task_id: {} for task_id in task_ids} |
| 81 | for task_id in track(task_ids, description="Testing gt coverage..."): |
| 82 | coverage_cache_path = os.path.join(coverage_dir, f"{to_path(task_id)}.pkl") |
| 83 | if os.path.isfile(coverage_cache_path): |
| 84 | with open(coverage_cache_path, "rb") as f: |
| 85 | coverage_info[task_id] = pickle.load(f) |
| 86 | continue |
| 87 | groundtruth_code = ( |
| 88 | problems[task_id]["prompt"] + problems[task_id]["canonical_solution"] |
| 89 | ) |
| 90 | plus_tests = problems[task_id]["plus_input"] |
| 91 | entry_point = problems[task_id]["entry_point"] |
| 92 | for i, plus_test in enumerate(plus_tests): |
| 93 | per, branch, branch_covered = test_code_coverage( |
| 94 | to_path(task_id), groundtruth_code, [plus_test], entry_point |
| 95 | ) |
| 96 | test_id = f"plus_{i}" |
| 97 | coverage_info[task_id].setdefault(test_id, []).extend( |
| 98 | [(br, "gt") for br in branch_covered] |
| 99 | ) |
| 100 | with open(coverage_cache_path, "wb") as f: |
| 101 | pickle.dump(coverage_info[task_id], f) |
| 102 | |
| 103 | return coverage_info |
| 104 | |
| 105 | |
| 106 | if __name__ == "__main__": |
no test coverage detected