Submit the remote function for execution.
(
self,
args=None,
kwargs=None,
serialized_runtime_env_info: Optional[str] = None,
**task_options,
)
| 312 | @wrap_auto_init |
| 313 | @_tracing_task_invocation |
| 314 | def _remote( |
| 315 | self, |
| 316 | args=None, |
| 317 | kwargs=None, |
| 318 | serialized_runtime_env_info: Optional[str] = None, |
| 319 | **task_options, |
| 320 | ): |
| 321 | """Submit the remote function for execution.""" |
| 322 | # We pop the "max_calls" coming from "@ray.remote" here. We no longer need |
| 323 | # it in "_remote()". |
| 324 | task_options.pop("max_calls", None) |
| 325 | if client_mode_should_convert(): |
| 326 | return client_mode_convert_function(self, args, kwargs, **task_options) |
| 327 | |
| 328 | worker = ray._private.worker.global_worker |
| 329 | worker.check_connected() |
| 330 | |
| 331 | if worker.mode != ray._private.worker.WORKER_MODE: |
| 332 | # Only need to record on the driver side |
| 333 | # since workers are created via tasks or actors |
| 334 | # launched from the driver. |
| 335 | from ray._common.usage import usage_lib |
| 336 | |
| 337 | usage_lib.record_library_usage("core") |
| 338 | |
| 339 | # We cannot do this when the function is first defined, because we need |
| 340 | # ray.init() to have been called when this executes |
| 341 | with self._inject_lock: |
| 342 | if self._function_signature is None: |
| 343 | self._function = _inject_tracing_into_function(self._function) |
| 344 | self._function_signature = ray._common.signature.extract_signature( |
| 345 | self._function |
| 346 | ) |
| 347 | |
| 348 | # If this function was not exported in this cluster and job, we need to |
| 349 | # export this function again, because the current GCS doesn't have it. |
| 350 | if ( |
| 351 | not self._is_cross_language |
| 352 | and self._last_export_cluster_and_job != worker.current_cluster_and_job |
| 353 | ): |
| 354 | self._function_descriptor = PythonFunctionDescriptor.from_function( |
| 355 | self._function, self._uuid |
| 356 | ) |
| 357 | # There is an interesting question here. If the remote function is |
| 358 | # used by a subsequent driver (in the same script), should the |
| 359 | # second driver pickle the function again? If yes, then the remote |
| 360 | # function definition can differ in the second driver (e.g., if |
| 361 | # variables in its closure have changed). We probably want the |
| 362 | # behavior of the remote function in the second driver to be |
| 363 | # independent of whether or not the function was invoked by the |
| 364 | # first driver. This is an argument for repickling the function, |
| 365 | # which we do here. |
| 366 | self._pickled_function = pickle_dumps( |
| 367 | self._function, |
| 368 | f"Could not serialize the function {self._function_descriptor.repr}", |
| 369 | ) |
| 370 | |
| 371 | self._last_export_cluster_and_job = worker.current_cluster_and_job |
no test coverage detected