Evaluate all provided samples in parallel.
(
self,
recorder: RecorderBase,
samples,
show_progress=True,
record_raw_sample=True,
**_kwargs: Any,
)
| 110 | await future |
| 111 | |
| 112 | def eval_all_samples( |
| 113 | self, |
| 114 | recorder: RecorderBase, |
| 115 | samples, |
| 116 | show_progress=True, |
| 117 | record_raw_sample=True, |
| 118 | **_kwargs: Any, |
| 119 | ): |
| 120 | """ |
| 121 | Evaluate all provided samples in parallel. |
| 122 | """ |
| 123 | work_items = _index_samples(samples) |
| 124 | threads = int(os.environ.get("EVALS_THREADS", "10")) |
| 125 | show_progress = bool(os.environ.get("EVALS_SHOW_EVAL_PROGRESS", show_progress)) |
| 126 | |
| 127 | def eval_sample(args): |
| 128 | """ |
| 129 | Evaluate a single sample. |
| 130 | """ |
| 131 | sample, idx = args |
| 132 | base_name, split = self.name.split(".")[0:2] |
| 133 | sample_id = f"{base_name}.{split}.{idx}" |
| 134 | with recorder.as_default_recorder(sample_id): |
| 135 | seed = f"{sample_id}:{self.seed}".encode("utf-8") |
| 136 | rng = random.Random(seed) |
| 137 | return idx, self.eval_sample(sample, rng) |
| 138 | |
| 139 | with ThreadPool(threads) as pool: |
| 140 | if os.environ.get("EVALS_SEQUENTIAL", "0") in {"1", "true", "yes"}: |
| 141 | logger.info("Running in sequential mode!") |
| 142 | iter = map(eval_sample, work_items) |
| 143 | else: |
| 144 | logger.info(f"Running in threaded mode with {threads} threads!") |
| 145 | iter = pool.imap_unordered(eval_sample, work_items) |
| 146 | idx_and_result = list(tqdm(iter, total=len(work_items), disable=not show_progress)) |
| 147 | return [r for _, r in sorted(idx_and_result)] |
| 148 | |
| 149 | def get_samples(self): |
| 150 | if self.samples_jsonl is None: |