(
self,
language,
function,
function_descriptor,
task_options,
)
| 89 | """ |
| 90 | |
| 91 | def __init__( |
| 92 | self, |
| 93 | language, |
| 94 | function, |
| 95 | function_descriptor, |
| 96 | task_options, |
| 97 | ): |
| 98 | if inspect.iscoroutinefunction(function): |
| 99 | raise ValueError( |
| 100 | "'async def' should not be used for remote tasks. You can wrap the " |
| 101 | "async function with `asyncio.run(f())`. See more at:" |
| 102 | "https://docs.ray.io/en/latest/ray-core/actors/async_api.html " |
| 103 | ) |
| 104 | self._default_options = task_options |
| 105 | |
| 106 | # When gpu is used, set the task non-recyclable by default. |
| 107 | # https://github.com/ray-project/ray/issues/29624 for more context. |
| 108 | # Note: Ray task worker process is not being reused when nsight |
| 109 | # profiler is running, as nsight/rocprof-sys generate report |
| 110 | # once the process exit. |
| 111 | num_gpus = self._default_options.get("num_gpus") or 0 |
| 112 | if ( |
| 113 | num_gpus > 0 and self._default_options.get("max_calls", None) is None |
| 114 | ) or any( |
| 115 | [ |
| 116 | s in (self._default_options.get(s) or {}) |
| 117 | for s in ["nsight", "rocprof-sys"] |
| 118 | ] |
| 119 | ): |
| 120 | self._default_options["max_calls"] = 1 |
| 121 | |
| 122 | # TODO(suquark): This is a workaround for class attributes of options. |
| 123 | # They are being used in some other places, mostly tests. Need cleanup later. |
| 124 | # E.g., actors uses "__ray_metadata__" to collect options, we can so something |
| 125 | # similar for remote functions. |
| 126 | for k, v in ray_option_utils.task_options.items(): |
| 127 | setattr(self, "_" + k, task_options.get(k, v.default_value)) |
| 128 | self._runtime_env = parse_runtime_env_for_task_or_actor(self._runtime_env) |
| 129 | if "runtime_env" in self._default_options: |
| 130 | self._default_options["runtime_env"] = self._runtime_env |
| 131 | |
| 132 | # Pre-calculate runtime env info, to avoid re-calculation at `remote` |
| 133 | # invocation. When `remote` call has specified extra `option` field, |
| 134 | # runtime env will be overwritten and re-serialized. |
| 135 | # |
| 136 | # Caveat: To support dynamic runtime envs in |
| 137 | # `func.option(runtime_env={...}).remote()`, we recalculate the serialized |
| 138 | # runtime env info in the `option` call. But it's acceptable since |
| 139 | # pre-calculation here only happens once at `RemoteFunction` initialization. |
| 140 | self._serialized_base_runtime_env_info = "" |
| 141 | if self._runtime_env: |
| 142 | self._serialized_base_runtime_env_info = get_runtime_env_info( |
| 143 | self._runtime_env, |
| 144 | is_job_runtime_env=False, |
| 145 | serialize=True, |
| 146 | ) |
| 147 | |
| 148 | self._language = language |
nothing calls this directly
no test coverage detected