Writes the dataset to a custom :class:`~ray.data.Datasink`. Time complexity: O(dataset size / parallelism) Args: datasink: The :class:`~ray.data.Datasink` to write to. ray_remote_args: Kwargs passed to :func:`ray.remote` in the write tasks. concu
(
self,
datasink: Datasink,
*,
ray_remote_args: Dict[str, Any] = None,
concurrency: Optional[int] = None,
)
| 5851 | |
| 5852 | @ConsumptionAPI(pattern="Time complexity:") |
| 5853 | def write_datasink( |
| 5854 | self, |
| 5855 | datasink: Datasink, |
| 5856 | *, |
| 5857 | ray_remote_args: Dict[str, Any] = None, |
| 5858 | concurrency: Optional[int] = None, |
| 5859 | ) -> None: |
| 5860 | """Writes the dataset to a custom :class:`~ray.data.Datasink`. |
| 5861 | |
| 5862 | Time complexity: O(dataset size / parallelism) |
| 5863 | |
| 5864 | Args: |
| 5865 | datasink: The :class:`~ray.data.Datasink` to write to. |
| 5866 | ray_remote_args: Kwargs passed to :func:`ray.remote` in the write tasks. |
| 5867 | concurrency: The maximum number of Ray tasks to run concurrently. Set this |
| 5868 | to control number of tasks to run concurrently. This doesn't change the |
| 5869 | total number of tasks run. By default, concurrency is dynamically |
| 5870 | decided based on the available resources. |
| 5871 | """ # noqa: E501 |
| 5872 | if ray_remote_args is None: |
| 5873 | ray_remote_args = {} |
| 5874 | |
| 5875 | if not datasink.supports_distributed_writes: |
| 5876 | if ray.util.client.ray.is_connected(): |
| 5877 | raise ValueError( |
| 5878 | "If you're using Ray Client, Ray Data won't schedule write tasks " |
| 5879 | "on the driver's node." |
| 5880 | ) |
| 5881 | label_selector = ray_remote_args.get("label_selector", {}) |
| 5882 | label_selector[ |
| 5883 | ray._raylet.RAY_NODE_ID_KEY |
| 5884 | ] = ray.get_runtime_context().get_node_id() |
| 5885 | ray_remote_args["label_selector"] = label_selector |
| 5886 | ray_remote_args.pop("scheduling_strategy", None) |
| 5887 | |
| 5888 | _validate_head_node_resources_for_local_scheduling( |
| 5889 | ray_remote_args, |
| 5890 | op_description="Writing to a local:// path", |
| 5891 | ) |
| 5892 | |
| 5893 | write_op = Write( |
| 5894 | datasink, |
| 5895 | input_dependencies=[self._logical_plan.dag], |
| 5896 | ray_remote_args=ray_remote_args, |
| 5897 | compute=TaskPoolStrategy(concurrency), |
| 5898 | ) |
| 5899 | logical_plan = LogicalPlan(write_op, self.context) |
| 5900 | |
| 5901 | try: |
| 5902 | # Call on_write_start for _FileDatasink before execution to handle |
| 5903 | # SaveMode checks (ERROR raises, OVERWRITE deletes contents, IGNORE skips) |
| 5904 | # and directory creation. For other datasinks, on_write_start is called |
| 5905 | # automatically by the Write operator when the first input bundle arrives. |
| 5906 | if isinstance(datasink, _FileDatasink): |
| 5907 | datasink.on_write_start() |
| 5908 | # TODO (https://github.com/ray-project/ray/issues/59326): There should be no special handling for skipping writes. |
| 5909 | if datasink._skip_write: |
| 5910 | logger.info( |