A wrapper around MatrixHttpApi to limit the number of concurrent requests we make to the number of connections available to us in requests.Session connection pool size. Args: pool_maxsize: max size of underlying/session connection pool retry_timeout: for how long sh
| 62 | |
| 63 | |
| 64 | class GMatrixHttpApi(MatrixHttpApi): |
| 65 | """ |
| 66 | A wrapper around MatrixHttpApi to limit the number |
| 67 | of concurrent requests we make to the number of connections |
| 68 | available to us in requests.Session connection pool size. |
| 69 | |
| 70 | Args: |
| 71 | pool_maxsize: max size of underlying/session connection pool |
| 72 | retry_timeout: for how long should a single request be retried if it errors |
| 73 | retry_delay: callable which returns an iterable of delays |
| 74 | """ |
| 75 | |
| 76 | def __init__( |
| 77 | self, |
| 78 | *args: Any, |
| 79 | pool_maxsize: int = 10, |
| 80 | retry_timeout: int = 60, |
| 81 | retry_delay: Callable[[], Iterable[float]] = None, |
| 82 | long_paths: Container[str] = (), |
| 83 | user_agent: str = None, |
| 84 | **kwargs: Any, |
| 85 | ) -> None: |
| 86 | super().__init__(*args, **kwargs) |
| 87 | |
| 88 | self.server_ident: Optional[str] = None |
| 89 | |
| 90 | http_adapter = HTTPAdapter(pool_maxsize=pool_maxsize) |
| 91 | https_adapter = HTTPAdapter(pool_maxsize=pool_maxsize) |
| 92 | self.session.mount("http://", http_adapter) |
| 93 | self.session.mount("https://", https_adapter) |
| 94 | self.session.hooks["response"].append(self._record_server_ident) |
| 95 | if user_agent: |
| 96 | self.session.headers.update({"User-Agent": user_agent}) |
| 97 | |
| 98 | self._long_paths = long_paths |
| 99 | if long_paths: |
| 100 | self._semaphore = Semaphore(pool_maxsize - 1) |
| 101 | self._priority_lock = Semaphore() |
| 102 | else: |
| 103 | self._semaphore = Semaphore(pool_maxsize) |
| 104 | |
| 105 | self.retry_timeout = retry_timeout |
| 106 | if retry_delay is None: |
| 107 | self.retry_delay: Callable[[], Iterable[float]] = lambda: repeat(1) |
| 108 | else: |
| 109 | self.retry_delay = retry_delay |
| 110 | |
| 111 | def _send(self, method: str, path: str, *args: Any, **kwargs: Any) -> Dict: |
| 112 | # we use an infinite loop + time + sleep instead of gevent.Timeout |
| 113 | # to be able to re-raise the last exception instead of declaring one beforehand |
| 114 | started = time.monotonic() |
| 115 | |
| 116 | # paths in long_paths have a reserved slot in the pool, and aren't error-handled |
| 117 | # to avoid them getting stuck when listener greenlet is killed |
| 118 | if path in self._long_paths: |
| 119 | with self._priority_lock: |
| 120 | return super()._send(method, path, *args, **kwargs) |
| 121 | last_ex = None |
no outgoing calls
no test coverage detected