Thread-safe manifest writer for batch extraction runs. Records batch outcomes incrementally and flushes to disk after each update.
| 50 | |
| 51 | |
| 52 | class FileBatchManifestWriter: |
| 53 | """Thread-safe manifest writer for batch extraction runs. |
| 54 | |
| 55 | Records batch outcomes incrementally and flushes to disk after each update. |
| 56 | """ |
| 57 | |
| 58 | def __init__(self, path: str, model: str, batch_size: int) -> None: |
| 59 | self._path = path |
| 60 | self._data = BatchManifest( |
| 61 | version=1, |
| 62 | model=model, |
| 63 | batch_size=batch_size, |
| 64 | total_batches=None, |
| 65 | batches=[], |
| 66 | ) |
| 67 | self._lock = threading.Lock() |
| 68 | os.makedirs(os.path.dirname(path), exist_ok=True) |
| 69 | |
| 70 | def set_total_batches(self, n: int) -> None: |
| 71 | """Set the total number of batches (called once after grouping).""" |
| 72 | self._data.total_batches = n |
| 73 | |
| 74 | def record_batch( |
| 75 | self, |
| 76 | batch_idx: int, |
| 77 | batch_id: str | None, |
| 78 | status: Literal["submitted", "completed", "failed"], |
| 79 | files: list[str], |
| 80 | error: str | None = None, |
| 81 | ) -> None: |
| 82 | """Record a batch outcome and flush to disk. |
| 83 | |
| 84 | If an entry with the same ``batch_idx`` already exists (e.g. from an |
| 85 | earlier "submitted" record), it is replaced in-place. |
| 86 | """ |
| 87 | entry = BatchManifestEntry( |
| 88 | batch_idx=batch_idx, |
| 89 | batch_id=batch_id, |
| 90 | status=status, |
| 91 | error=error, |
| 92 | files=files, |
| 93 | ) |
| 94 | with self._lock: |
| 95 | # Replace existing entry for same batch_idx, or append. |
| 96 | for i, existing in enumerate(self._data.batches): |
| 97 | if existing.batch_idx == batch_idx: |
| 98 | self._data.batches[i] = entry |
| 99 | break |
| 100 | else: |
| 101 | self._data.batches.append(entry) |
| 102 | self._flush() |
| 103 | |
| 104 | def to_dict(self) -> dict: |
| 105 | """Return manifest data as a plain dict for embedding in reports.""" |
| 106 | with self._lock: |
| 107 | return self._data.model_dump() |
| 108 | |
| 109 | def _flush(self) -> None: |
no outgoing calls