Compute the exported payload and GCS key for a setup hook callable. Args: setup_func: The setup hook function to export. job_id: The job ID to export the setup hook for. Returns: A tuple of (pickled_function, function_id, key).
(
setup_func: Callable, job_id: JobID
)
| 57 | |
| 58 | |
| 59 | def build_setup_hook_export_entry( |
| 60 | setup_func: Callable, job_id: JobID |
| 61 | ) -> tuple[bytes, bytes, bytes]: |
| 62 | """Compute the exported payload and GCS key for a setup hook callable. |
| 63 | |
| 64 | Args: |
| 65 | setup_func: The setup hook function to export. |
| 66 | job_id: The job ID to export the setup hook for. |
| 67 | |
| 68 | Returns: |
| 69 | A tuple of (pickled_function, function_id, key). |
| 70 | """ |
| 71 | pickled_function = pickle_dumps( |
| 72 | setup_func, |
| 73 | "Cannot serialize the worker_process_setup_hook " f"{setup_func.__name__}", |
| 74 | ) |
| 75 | function_to_run_id = hashlib.shake_128(pickled_function).digest( |
| 76 | ray_constants.ID_SIZE |
| 77 | ) |
| 78 | key = make_function_table_key( |
| 79 | # This value should match with gcs_function_manager.h. |
| 80 | # Otherwise, it won't be GC'ed. |
| 81 | WORKER_PROCESS_SETUP_HOOK_KEY_NAME_GCS.encode(), |
| 82 | # b"FunctionsToRun", |
| 83 | job_id, |
| 84 | function_to_run_id, |
| 85 | ) |
| 86 | return pickled_function, function_to_run_id, key |
| 87 | |
| 88 | |
| 89 | class FunctionActorManager: |
searching dependent graphs…