Create a block from user-facing data formats.
(
cls,
batch: DataBatch,
block_type: Optional[BlockType] = None,
)
| 551 | |
| 552 | @classmethod |
| 553 | def batch_to_block( |
| 554 | cls, |
| 555 | batch: DataBatch, |
| 556 | block_type: Optional[BlockType] = None, |
| 557 | ) -> Block: |
| 558 | """Create a block from user-facing data formats.""" |
| 559 | import pandas |
| 560 | |
| 561 | if isinstance(batch, np.ndarray): |
| 562 | raise ValueError( |
| 563 | f"Error validating {_truncated_repr(batch)}: " |
| 564 | "Standalone numpy arrays are not " |
| 565 | "allowed in Ray 2.5. Return a dict of field -> array, " |
| 566 | "e.g., `{'data': array}` instead of `array`." |
| 567 | ) |
| 568 | |
| 569 | # Handle cudf.DataFrame before Mapping check, since cudf.DataFrame |
| 570 | # implements the Mapping protocol. Use bulk GPU->CPU transfer via |
| 571 | # to_arrow() instead of the slow column-by-column Mapping path. |
| 572 | elif _is_cudf_dataframe(batch): |
| 573 | return batch.to_arrow(preserve_index=False) |
| 574 | |
| 575 | elif isinstance(batch, pandas.DataFrame): |
| 576 | if (block_type == BlockType.ARROW) or ( |
| 577 | block_type is None |
| 578 | and DataContext.get_current().batch_to_block_arrow_format |
| 579 | ): |
| 580 | return cls.for_block(batch).to_arrow() |
| 581 | return batch |
| 582 | |
| 583 | elif isinstance(batch, collections.abc.Mapping): |
| 584 | if block_type is None or block_type == BlockType.ARROW: |
| 585 | from ray.data._internal.tensor_extensions.arrow import ( |
| 586 | ArrowConversionError, |
| 587 | ) |
| 588 | |
| 589 | try: |
| 590 | return cls.batch_to_arrow_block(batch) |
| 591 | except ArrowConversionError as e: |
| 592 | if log_once("_fallback_to_pandas_block_warning"): |
| 593 | logger.debug( |
| 594 | f"Failed to convert batch to Arrow due to: {e}; " |
| 595 | f"falling back to Pandas block" |
| 596 | ) |
| 597 | |
| 598 | if block_type is None: |
| 599 | return cls.batch_to_pandas_block(batch) |
| 600 | else: |
| 601 | raise e |
| 602 | else: |
| 603 | assert block_type == BlockType.PANDAS |
| 604 | return cls.batch_to_pandas_block(batch) |
| 605 | |
| 606 | return batch |
| 607 | |
| 608 | @classmethod |
| 609 | def batch_to_arrow_block(cls, batch: Dict[str, Any]) -> Block: |