(self, docs_to_fetch: int, uri: str)
| 289 | return record_data, minhash_signatures, doc_id, doc_id_int |
| 290 | |
| 291 | def __process_uri(self, docs_to_fetch: int, uri: str): |
| 292 | num_docs = 0 |
| 293 | docs_added = 0 |
| 294 | snapshot_id = self.snapsh_re.search(uri).group(0) |
| 295 | uri_id = self.uri_id_re.search(uri).group(0) |
| 296 | |
| 297 | # signal writer |
| 298 | signal_uri = os.path.join( |
| 299 | self._output_base_uri, |
| 300 | self.shard_pattern_signals.format(shard_id=uri_id.split(".")[0]), |
| 301 | ) |
| 302 | signal_writer = Writer(uri=signal_uri, schema=RP_SIGNAL_SCHEMA) |
| 303 | self._logger.info(f"Initialized jsonl writer to {signal_uri}") |
| 304 | |
| 305 | # init minhash writer |
| 306 | minhash_uri = os.path.join( |
| 307 | self._output_base_uri, |
| 308 | self.shard_pattern_minhash.format(shard_id=uri_id.split(".")[0]), |
| 309 | ) |
| 310 | minhash_writer = ParquetBatchWriter( |
| 311 | output_fp=minhash_uri, schema=self._minhash_schema |
| 312 | ) |
| 313 | self._logger.info(f"Initialized parquet writer to {minhash_uri}") |
| 314 | |
| 315 | for idx, record in self._reader.read( |
| 316 | uri=uri, max_samples=docs_to_fetch, return_idx=True |
| 317 | ): |
| 318 | # compute signals |
| 319 | ( |
| 320 | record_data, minhash_signatures, doc_id, doc_id_int |
| 321 | ) = self.__process_record( |
| 322 | idx=idx, record=record, uri_id=uri_id, snapshot_id=snapshot_id |
| 323 | ) |
| 324 | num_docs += 1 |
| 325 | docs_added += 1 |
| 326 | |
| 327 | # write quality signals |
| 328 | signal_writer.write(record_data) |
| 329 | |
| 330 | # record minhash signatures |
| 331 | minhash_writer.update_batch( |
| 332 | obj={"shard_id": uri_id, "id_int": doc_id_int, "id": doc_id, |
| 333 | **minhash_signatures} |
| 334 | ) |
| 335 | |
| 336 | # send to monitor |
| 337 | if num_docs % self._flush_interval == 0: |
| 338 | minhash_writer.write_batch() |
| 339 | signal_writer.flush() |
| 340 | self._monitor_queue.put({ |
| 341 | "lang": self._lang, "num_docs": docs_added |
| 342 | }) |
| 343 | docs_added = 0 |
| 344 | |
| 345 | if docs_added > 0: |
| 346 | self._monitor_queue.put({ |
| 347 | "lang": self._lang, "num_docs": docs_added |
| 348 | }) |
no test coverage detected