MCPcopy
hub / github.com/ray-project/ray / write_datasink

Method write_datasink

python/ray/data/dataset.py:5853–5937  ·  view source on GitHub ↗

Writes the dataset to a custom :class:`~ray.data.Datasink`. Time complexity: O(dataset size / parallelism) Args: datasink: The :class:`~ray.data.Datasink` to write to. ray_remote_args: Kwargs passed to :func:`ray.remote` in the write tasks. concu

(
        self,
        datasink: Datasink,
        *,
        ray_remote_args: Dict[str, Any] = None,
        concurrency: Optional[int] = None,
    )

Source from the content-addressed store, hash-verified

5851
5852 @ConsumptionAPI(pattern="Time complexity:")
5853 def write_datasink(
5854 self,
5855 datasink: Datasink,
5856 *,
5857 ray_remote_args: Dict[str, Any] = None,
5858 concurrency: Optional[int] = None,
5859 ) -> None:
5860 """Writes the dataset to a custom :class:`~ray.data.Datasink`.
5861
5862 Time complexity: O(dataset size / parallelism)
5863
5864 Args:
5865 datasink: The :class:`~ray.data.Datasink` to write to.
5866 ray_remote_args: Kwargs passed to :func:`ray.remote` in the write tasks.
5867 concurrency: The maximum number of Ray tasks to run concurrently. Set this
5868 to control number of tasks to run concurrently. This doesn't change the
5869 total number of tasks run. By default, concurrency is dynamically
5870 decided based on the available resources.
5871 """ # noqa: E501
5872 if ray_remote_args is None:
5873 ray_remote_args = {}
5874
5875 if not datasink.supports_distributed_writes:
5876 if ray.util.client.ray.is_connected():
5877 raise ValueError(
5878 "If you're using Ray Client, Ray Data won't schedule write tasks "
5879 "on the driver's node."
5880 )
5881 label_selector = ray_remote_args.get("label_selector", {})
5882 label_selector[
5883 ray._raylet.RAY_NODE_ID_KEY
5884 ] = ray.get_runtime_context().get_node_id()
5885 ray_remote_args["label_selector"] = label_selector
5886 ray_remote_args.pop("scheduling_strategy", None)
5887
5888 _validate_head_node_resources_for_local_scheduling(
5889 ray_remote_args,
5890 op_description="Writing to a local:// path",
5891 )
5892
5893 write_op = Write(
5894 datasink,
5895 input_dependencies=[self._logical_plan.dag],
5896 ray_remote_args=ray_remote_args,
5897 compute=TaskPoolStrategy(concurrency),
5898 )
5899 logical_plan = LogicalPlan(write_op, self.context)
5900
5901 try:
5902 # Call on_write_start for _FileDatasink before execution to handle
5903 # SaveMode checks (ERROR raises, OVERWRITE deletes contents, IGNORE skips)
5904 # and directory creation. For other datasinks, on_write_start is called
5905 # automatically by the Write operator when the first input bundle arrives.
5906 if isinstance(datasink, _FileDatasink):
5907 datasink.on_write_start()
5908 # TODO (https://github.com/ray-project/ray/issues/59326): There should be no special handling for skipping writes.
5909 if datasink._skip_write:
5910 logger.info(

Callers 15

write_parquetMethod · 0.95
write_jsonMethod · 0.95
write_icebergMethod · 0.95
write_imagesMethod · 0.95
write_csvMethod · 0.95
write_tfrecordsMethod · 0.95
write_webdatasetMethod · 0.95
write_numpyMethod · 0.95
write_sqlMethod · 0.95
write_mongoMethod · 0.95
write_bigqueryMethod · 0.95
write_clickhouseMethod · 0.95

Calls 15

WriteClass · 0.90
TaskPoolStrategyClass · 0.90
LogicalPlanClass · 0.90
memory_stringFunction · 0.90
get_runtime_contextMethod · 0.80
_from_parentMethod · 0.80
_execute_to_iteratorMethod · 0.80
getMethod · 0.65
is_connectedMethod · 0.45
get_node_idMethod · 0.45