Pickle a remote function and export it to redis. Args: remote_function: the RemoteFunction object.
(self, remote_function: RemoteFunction)
| 217 | return key |
| 218 | |
| 219 | def export(self, remote_function: RemoteFunction) -> None: |
| 220 | """Pickle a remote function and export it to redis. |
| 221 | |
| 222 | Args: |
| 223 | remote_function: the RemoteFunction object. |
| 224 | """ |
| 225 | if self._worker.load_code_from_local: |
| 226 | function_descriptor = remote_function._function_descriptor |
| 227 | module_name, function_name = ( |
| 228 | function_descriptor.module_name, |
| 229 | function_descriptor.function_name, |
| 230 | ) |
| 231 | # If the function is dynamic, we still export it to GCS |
| 232 | # even if load_code_from_local is set True. |
| 233 | if ( |
| 234 | self.load_function_or_class_from_local(module_name, function_name) |
| 235 | is not None |
| 236 | ): |
| 237 | return |
| 238 | function = remote_function._function |
| 239 | pickled_function = remote_function._pickled_function |
| 240 | |
| 241 | check_oversized_function( |
| 242 | pickled_function, |
| 243 | remote_function._function_name, |
| 244 | "remote function", |
| 245 | self._worker, |
| 246 | ) |
| 247 | key = make_function_table_key( |
| 248 | b"RemoteFunction", |
| 249 | self._worker.current_job_id, |
| 250 | remote_function._function_descriptor.function_id.binary(), |
| 251 | ) |
| 252 | if self._worker.gcs_client.internal_kv_exists(key, KV_NAMESPACE_FUNCTION_TABLE): |
| 253 | return |
| 254 | val = pickle.dumps( |
| 255 | { |
| 256 | "job_id": self._worker.current_job_id.binary(), |
| 257 | "function_id": remote_function._function_descriptor.function_id.binary(), # noqa: E501 |
| 258 | "function_name": remote_function._function_name, |
| 259 | "module": function.__module__, |
| 260 | "function": pickled_function, |
| 261 | "collision_identifier": self.compute_collision_identifier(function), |
| 262 | "max_calls": remote_function._max_calls, |
| 263 | } |
| 264 | ) |
| 265 | self._worker.gcs_client.internal_kv_put( |
| 266 | key, val, True, KV_NAMESPACE_FUNCTION_TABLE |
| 267 | ) |
| 268 | |
| 269 | def fetch_registered_method( |
| 270 | self, key: str, timeout: Optional[int] = None |
no test coverage detected