(
binary, call_site, owner_address, object_status, tensor_transport
)
| 65 | |
| 66 | |
| 67 | def _object_ref_deserializer( |
| 68 | binary, call_site, owner_address, object_status, tensor_transport |
| 69 | ): |
| 70 | # NOTE(suquark): This function should be a global function so |
| 71 | # cloudpickle can access it directly. Otherwise cloudpickle |
| 72 | # has to dump the whole function definition, which is inefficient. |
| 73 | |
| 74 | # NOTE(swang): Must deserialize the object first before asking |
| 75 | # the core worker to resolve the value. This is to make sure |
| 76 | # that the ref count for the ObjectRef is greater than 0 by the |
| 77 | # time the core worker resolves the value of the object. |
| 78 | obj_ref = ray.ObjectRef( |
| 79 | binary, owner_address, call_site, tensor_transport=tensor_transport |
| 80 | ) |
| 81 | |
| 82 | # TODO(edoakes): we should be able to just capture a reference |
| 83 | # to 'self' here instead, but this function is itself pickled |
| 84 | # somewhere, which causes an error. |
| 85 | if owner_address: |
| 86 | worker = ray._private.worker.global_worker |
| 87 | worker.check_connected() |
| 88 | context = worker.get_serialization_context() |
| 89 | outer_id = context.get_outer_object_ref() |
| 90 | # outer_id is None in the case that this ObjectRef was closed |
| 91 | # over in a function or pickled directly using pickle.dumps(). |
| 92 | if outer_id is None: |
| 93 | outer_id = ray.ObjectRef.nil() |
| 94 | worker.core_worker.deserialize_and_register_object_ref( |
| 95 | obj_ref.binary(), outer_id, owner_address, object_status |
| 96 | ) |
| 97 | return obj_ref |
| 98 | |
| 99 | |
| 100 | def _rdt_ref_deserializer( |
no test coverage detected
searching dependent graphs…