(
self,
gen_kwargs: dict,
fpath: str,
file_format: str,
max_shard_size: int,
split_info: SplitInfo,
job_id: int,
)
| 1489 | self.info.features = features |
| 1490 | |
| 1491 | def _prepare_split_single( |
| 1492 | self, |
| 1493 | gen_kwargs: dict, |
| 1494 | fpath: str, |
| 1495 | file_format: str, |
| 1496 | max_shard_size: int, |
| 1497 | split_info: SplitInfo, |
| 1498 | job_id: int, |
| 1499 | ) -> Iterator[tuple[int, bool, tuple[int, int, Features, int, int, int]]]: |
| 1500 | generator = self._generate_examples(**gen_kwargs) |
| 1501 | writer_class = ParquetWriter if file_format == "parquet" else ArrowWriter |
| 1502 | embed_local_files = file_format == "parquet" |
| 1503 | shard_lengths = [] |
| 1504 | original_shard_lengths = [] |
| 1505 | total_num_examples, total_num_bytes = 0, 0 |
| 1506 | |
| 1507 | shard_id = 0 |
| 1508 | original_shard_id = 0 |
| 1509 | num_examples_progress_update = 0 |
| 1510 | try: |
| 1511 | writer = writer_class( |
| 1512 | features=self.info.features, |
| 1513 | path=fpath.replace("SSSSS", f"{shard_id:05d}").replace("JJJJJ", f"{job_id:05d}"), |
| 1514 | writer_batch_size=self._writer_batch_size, |
| 1515 | storage_options=self._fs.storage_options, |
| 1516 | embed_local_files=embed_local_files, |
| 1517 | ) |
| 1518 | try: |
| 1519 | _time = time.time() |
| 1520 | for key, record in generator: |
| 1521 | if isinstance(key, Key): # old custom builders may not use Key |
| 1522 | original_shard_id = key.original_shard_id |
| 1523 | if max_shard_size is not None and writer._num_bytes > max_shard_size: |
| 1524 | num_examples, num_bytes = writer.finalize() |
| 1525 | writer.close() |
| 1526 | shard_lengths.append(num_examples) |
| 1527 | total_num_examples += num_examples |
| 1528 | total_num_bytes += num_bytes |
| 1529 | shard_id += 1 |
| 1530 | writer = writer_class( |
| 1531 | features=writer._features, |
| 1532 | path=fpath.replace("SSSSS", f"{shard_id:05d}").replace("JJJJJ", f"{job_id:05d}"), |
| 1533 | writer_batch_size=self._writer_batch_size, |
| 1534 | storage_options=self._fs.storage_options, |
| 1535 | embed_local_files=embed_local_files, |
| 1536 | ) |
| 1537 | example = self.info.features.encode_example(record) if self.info.features is not None else record |
| 1538 | writer.write(example) |
| 1539 | if len(original_shard_lengths) <= original_shard_id: |
| 1540 | original_shard_lengths.extend([0] * (1 + original_shard_id - len(original_shard_lengths))) |
| 1541 | original_shard_lengths[original_shard_id] += 1 |
| 1542 | num_examples_progress_update += 1 |
| 1543 | if time.time() > _time + config.PBAR_REFRESH_TIME_INTERVAL: |
| 1544 | _time = time.time() |
| 1545 | yield job_id, False, num_examples_progress_update |
| 1546 | num_examples_progress_update = 0 |
| 1547 | finally: |
| 1548 | yield job_id, False, num_examples_progress_update |
no test coverage detected