Get the values in the object store associated with the IDs. Return the values from the local object store for object_refs. This will block until all the values for object_refs have been written to the local object store. Args: object_refs: A list of the
(
self,
object_refs: list,
timeout: Optional[float] = None,
return_exceptions: bool = False,
skip_deserialization: bool = False,
use_object_store: bool = False,
)
| 959 | ) |
| 960 | |
| 961 | def get_objects( |
| 962 | self, |
| 963 | object_refs: list, |
| 964 | timeout: Optional[float] = None, |
| 965 | return_exceptions: bool = False, |
| 966 | skip_deserialization: bool = False, |
| 967 | use_object_store: bool = False, |
| 968 | ) -> Tuple[List[serialization.SerializedRayObject], bytes]: |
| 969 | """Get the values in the object store associated with the IDs. |
| 970 | |
| 971 | Return the values from the local object store for object_refs. This |
| 972 | will block until all the values for object_refs have been written to |
| 973 | the local object store. |
| 974 | |
| 975 | Args: |
| 976 | object_refs: A list of the object refs |
| 977 | whose values should be retrieved. |
| 978 | timeout: The maximum amount of time in |
| 979 | seconds to wait before returning. |
| 980 | return_exceptions: If any of the objects deserialize to an |
| 981 | Exception object, whether to return them as values in the |
| 982 | returned list. If False, then the first found exception will be |
| 983 | raised. |
| 984 | skip_deserialization: If true, only the buffer will be released and |
| 985 | the object associated with the buffer will not be deserialized. |
| 986 | use_object_store: [Alpha] To fetch an RDT object through the object store. |
| 987 | Returns: |
| 988 | list: List of deserialized objects or None if skip_deserialization is True. |
| 989 | bytes: UUID of the debugger breakpoint we should drop |
| 990 | into or b"" if there is no breakpoint. |
| 991 | """ |
| 992 | # Make sure that the values are object refs. |
| 993 | for object_ref in object_refs: |
| 994 | if not isinstance(object_ref, ObjectRef): |
| 995 | raise TypeError( |
| 996 | f"Attempting to call `get` on the value {object_ref}, " |
| 997 | "which is not an ray.ObjectRef." |
| 998 | ) |
| 999 | timeout_ms = ( |
| 1000 | int(timeout * 1000) if timeout is not None and timeout != -1 else -1 |
| 1001 | ) |
| 1002 | serialized_objects: List[ |
| 1003 | serialization.SerializedRayObject |
| 1004 | ] = self.core_worker.get_objects( |
| 1005 | object_refs, |
| 1006 | timeout_ms, |
| 1007 | ) |
| 1008 | |
| 1009 | debugger_breakpoint = b"" |
| 1010 | for _, metadata, _ in serialized_objects: |
| 1011 | if metadata: |
| 1012 | metadata_fields = metadata.split(b",") |
| 1013 | if len(metadata_fields) >= 2 and metadata_fields[1].startswith( |
| 1014 | ray_constants.OBJECT_METADATA_DEBUG_PREFIX |
| 1015 | ): |
| 1016 | debugger_breakpoint = metadata_fields[1][ |
| 1017 | len(ray_constants.OBJECT_METADATA_DEBUG_PREFIX) : |
| 1018 | ] |
no test coverage detected