Process a single provider batch: submit -> poll -> collect -> finalize. Never raises for normal batch/file failures — returns FAILED entries instead. Only truly unexpected errors (``KeyboardInterrupt``, ``SystemExit``) propagate.
(
extractor: BatchExtractor,
manifest: BatchManifestWriter,
batch_idx: int,
total_batches: int,
batch_items: list[WorkItem],
inflight: _InflightBatches,
)
| 286 | |
| 287 | |
| 288 | def _process_one_batch( |
| 289 | extractor: BatchExtractor, |
| 290 | manifest: BatchManifestWriter, |
| 291 | batch_idx: int, |
| 292 | total_batches: int, |
| 293 | batch_items: list[WorkItem], |
| 294 | inflight: _InflightBatches, |
| 295 | ) -> _BatchOutput: |
| 296 | """Process a single provider batch: submit -> poll -> collect -> finalize. |
| 297 | |
| 298 | Never raises for normal batch/file failures — returns FAILED entries |
| 299 | instead. Only truly unexpected errors (``KeyboardInterrupt``, |
| 300 | ``SystemExit``) propagate. |
| 301 | """ |
| 302 | bp = extractor.batch_provider |
| 303 | entries: list[ExtractionResult] = [] |
| 304 | finalized: set[str] = set() |
| 305 | usage = TokenUsage() |
| 306 | job_id: str | None = None |
| 307 | batch_error: str | None = None |
| 308 | |
| 309 | def _add(entry: ExtractionResult) -> None: |
| 310 | finalized.add(entry.gz_path) |
| 311 | entries.append(entry) |
| 312 | |
| 313 | try: |
| 314 | # Build batch requests. |
| 315 | requests: list[BatchEntry] = [] |
| 316 | for item_idx, (gz_path, prepared) in enumerate(batch_items): |
| 317 | for chunk_idx, user_content in enumerate(prepared.requests): |
| 318 | requests.append(BatchEntry(f"{item_idx}:{chunk_idx}", user_content)) |
| 319 | |
| 320 | total_chars = sum(len(req.user_content) for req in requests) |
| 321 | logger.info( |
| 322 | "submitting batch %d/%d (%d requests, %s chars)...", |
| 323 | batch_idx, |
| 324 | total_batches, |
| 325 | len(requests), |
| 326 | f"{total_chars:,}", |
| 327 | ) |
| 328 | |
| 329 | client = bp.make_poll_client() |
| 330 | job_id = bp.submit_batch(requests) |
| 331 | logger.info("batch %d/%d submitted: %s", batch_idx, total_batches, job_id) |
| 332 | inflight.register(bp, client, job_id) |
| 333 | |
| 334 | # Persist the batch ID immediately so it survives crashes/interrupts. |
| 335 | manifest.record_batch( |
| 336 | batch_idx=batch_idx, |
| 337 | batch_id=job_id, |
| 338 | status="submitted", |
| 339 | files=[item.gz_path for item in batch_items], |
| 340 | ) |
| 341 | |
| 342 | try: |
| 343 | completed_job = bp.poll_batch( |
| 344 | client, job_id, poll_interval=30, stop_event=inflight.stop_event |
| 345 | ) |
no test coverage detected