Request a resource from tracker and run the func. This function safe-guard rare server node dropout during execution. In such case, a new resource will be requested and func will be ran again. Parameters ---------- key : str The type key of the d
(self, key, func, priority=1, session_timeout=0, max_retry=2)
| 437 | raise RuntimeError(f"Cannot request {key} after {max_retry} retry, last_error:{last_err!s}") |
| 438 | |
| 439 | def request_and_run(self, key, func, priority=1, session_timeout=0, max_retry=2): |
| 440 | """Request a resource from tracker and run the func. |
| 441 | |
| 442 | This function safe-guard rare server node dropout during execution. |
| 443 | In such case, a new resource will be requested and func will be ran again. |
| 444 | |
| 445 | Parameters |
| 446 | ---------- |
| 447 | key : str |
| 448 | The type key of the device. |
| 449 | |
| 450 | func : function of session -> value |
| 451 | A stateless function |
| 452 | |
| 453 | priority : int, optional |
| 454 | The priority of the request. |
| 455 | |
| 456 | session_timeout : float, optional |
| 457 | The duration of the session, allows server to kill |
| 458 | the connection when duration is longer than this value. |
| 459 | When duration is zero, it means the request must always be kept alive. |
| 460 | |
| 461 | max_retry : int, optional |
| 462 | Maximum number of times to retry the function before give up. |
| 463 | """ |
| 464 | last_err = None |
| 465 | for _ in range(max_retry): |
| 466 | try: |
| 467 | sess = self.request(key, priority=priority, session_timeout=session_timeout) |
| 468 | tstart = time.time() |
| 469 | return func(sess) |
| 470 | except RuntimeError as err: |
| 471 | duration = time.time() - tstart |
| 472 | # roughly estimate if the error is due to timeout termination |
| 473 | if session_timeout and duration >= session_timeout * 0.95: |
| 474 | raise RuntimeError(f"Session timeout when running {func.__name__}") |
| 475 | last_err = err |
| 476 | raise RuntimeError( |
| 477 | f"Failed to run on {key} after {max_retry} retry, last_error:{last_err!s}" |
| 478 | ) |
| 479 | |
| 480 | |
| 481 | def connect( |