| 469 | ) |
| 470 | |
| 471 | def read(self, timeout: Optional[float] = None) -> Any: |
| 472 | assert ( |
| 473 | timeout is None or timeout >= 0 or timeout == -1 |
| 474 | ), "Timeout must be non-negative or -1." |
| 475 | self.ensure_registered_as_reader() |
| 476 | |
| 477 | start_time = time.monotonic() |
| 478 | ret = self._worker.get_objects( |
| 479 | [self._local_reader_ref], timeout=timeout, return_exceptions=True |
| 480 | )[0][0] |
| 481 | |
| 482 | if isinstance(ret, _ResizeChannel): |
| 483 | self._node_id_to_reader_ref_info = ret._node_id_to_reader_ref_info |
| 484 | self._local_reader_ref = self._get_local_reader_ref( |
| 485 | self._node_id_to_reader_ref_info |
| 486 | ) |
| 487 | # We need to register the new reader_ref. |
| 488 | self._reader_registered = False |
| 489 | self.ensure_registered_as_reader() |
| 490 | if timeout is not None: |
| 491 | timeout -= time.monotonic() - start_time |
| 492 | timeout = max(timeout, 0) |
| 493 | ret = self._worker.get_objects( |
| 494 | [self._local_reader_ref], timeout=timeout, return_exceptions=True |
| 495 | )[0][0] |
| 496 | |
| 497 | return ret |
| 498 | |
| 499 | def release_buffer(self, timeout: Optional[float] = None) -> None: |
| 500 | assert ( |