(
ext,
files,
batch_size=None,
jobs=1,
on_start=None,
on_result=None,
manifest=None,
)
| 134 | writes_at_callback: list[int] = [] |
| 135 | |
| 136 | def _fake_run( |
| 137 | ext, |
| 138 | files, |
| 139 | batch_size=None, |
| 140 | jobs=1, |
| 141 | on_start=None, |
| 142 | on_result=None, |
| 143 | manifest=None, |
| 144 | ): |
| 145 | batch = BatchResult() |
| 146 | for gz_path in files: |
| 147 | if on_start: |
| 148 | on_start(gz_path) |
| 149 | source = mock_source(gz_path) |
| 150 | mp = _make_manpage_from_source(source) |
| 151 | raw = _make_raw(sha256=gz_path) |
| 152 | entry = ExtractionResult( |
| 153 | gz_path=gz_path, |
| 154 | outcome=ExtractionOutcome.SUCCESS, |
| 155 | mp=mp, |
| 156 | raw=raw, |
| 157 | stats=ExtractionStats(), |
| 158 | ) |
| 159 | batch.n_succeeded += 1 |
| 160 | if on_result: |
| 161 | writes_at_callback.append( |
| 162 | Store(db_path, read_only=True).counts()["manpages"] |
| 163 | ) |
| 164 | on_result(gz_path, entry) |
| 165 | return batch |
| 166 | |
| 167 | mock_run.side_effect = _fake_run |
| 168 |
nothing calls this directly
no test coverage detected