MCPcopy Index your code
hub / github.com/togethercomputer/RedPajama-Data / __process_uri

Method __process_uri

app/src/core/worker.py:291–356  ·  view source on GitHub ↗
(self, docs_to_fetch: int, uri: str)

Source from the content-addressed store, hash-verified

289 return record_data, minhash_signatures, doc_id, doc_id_int
290
291 def __process_uri(self, docs_to_fetch: int, uri: str):
292 num_docs = 0
293 docs_added = 0
294 snapshot_id = self.snapsh_re.search(uri).group(0)
295 uri_id = self.uri_id_re.search(uri).group(0)
296
297 # signal writer
298 signal_uri = os.path.join(
299 self._output_base_uri,
300 self.shard_pattern_signals.format(shard_id=uri_id.split(".")[0]),
301 )
302 signal_writer = Writer(uri=signal_uri, schema=RP_SIGNAL_SCHEMA)
303 self._logger.info(f"Initialized jsonl writer to {signal_uri}")
304
305 # init minhash writer
306 minhash_uri = os.path.join(
307 self._output_base_uri,
308 self.shard_pattern_minhash.format(shard_id=uri_id.split(".")[0]),
309 )
310 minhash_writer = ParquetBatchWriter(
311 output_fp=minhash_uri, schema=self._minhash_schema
312 )
313 self._logger.info(f"Initialized parquet writer to {minhash_uri}")
314
315 for idx, record in self._reader.read(
316 uri=uri, max_samples=docs_to_fetch, return_idx=True
317 ):
318 # compute signals
319 (
320 record_data, minhash_signatures, doc_id, doc_id_int
321 ) = self.__process_record(
322 idx=idx, record=record, uri_id=uri_id, snapshot_id=snapshot_id
323 )
324 num_docs += 1
325 docs_added += 1
326
327 # write quality signals
328 signal_writer.write(record_data)
329
330 # record minhash signatures
331 minhash_writer.update_batch(
332 obj={"shard_id": uri_id, "id_int": doc_id_int, "id": doc_id,
333 **minhash_signatures}
334 )
335
336 # send to monitor
337 if num_docs % self._flush_interval == 0:
338 minhash_writer.write_batch()
339 signal_writer.flush()
340 self._monitor_queue.put({
341 "lang": self._lang, "num_docs": docs_added
342 })
343 docs_added = 0
344
345 if docs_added > 0:
346 self._monitor_queue.put({
347 "lang": self._lang, "num_docs": docs_added
348 })

Callers 1

runMethod · 0.95

Calls 10

__process_recordMethod · 0.95
writeMethod · 0.95
update_batchMethod · 0.95
write_batchMethod · 0.95
flushMethod · 0.95
closeMethod · 0.95
closeMethod · 0.95
WriterClass · 0.90
ParquetBatchWriterClass · 0.90
readMethod · 0.80

Tested by

no test coverage detected