MCPcopy
hub / github.com/raiden-network/raiden / GMatrixHttpApi

Class GMatrixHttpApi

raiden/network/transport/matrix/client.py:64–196  ·  view source on GitHub ↗

A wrapper around MatrixHttpApi to limit the number of concurrent requests we make to the number of connections available to us in requests.Session connection pool size. Args: pool_maxsize: max size of underlying/session connection pool retry_timeout: for how long sh

Source from the content-addressed store, hash-verified

62
63
64class GMatrixHttpApi(MatrixHttpApi):
65 """
66 A wrapper around MatrixHttpApi to limit the number
67 of concurrent requests we make to the number of connections
68 available to us in requests.Session connection pool size.
69
70 Args:
71 pool_maxsize: max size of underlying/session connection pool
72 retry_timeout: for how long should a single request be retried if it errors
73 retry_delay: callable which returns an iterable of delays
74 """
75
76 def __init__(
77 self,
78 *args: Any,
79 pool_maxsize: int = 10,
80 retry_timeout: int = 60,
81 retry_delay: Callable[[], Iterable[float]] = None,
82 long_paths: Container[str] = (),
83 user_agent: str = None,
84 **kwargs: Any,
85 ) -> None:
86 super().__init__(*args, **kwargs)
87
88 self.server_ident: Optional[str] = None
89
90 http_adapter = HTTPAdapter(pool_maxsize=pool_maxsize)
91 https_adapter = HTTPAdapter(pool_maxsize=pool_maxsize)
92 self.session.mount("http://", http_adapter)
93 self.session.mount("https://", https_adapter)
94 self.session.hooks["response"].append(self._record_server_ident)
95 if user_agent:
96 self.session.headers.update({"User-Agent": user_agent})
97
98 self._long_paths = long_paths
99 if long_paths:
100 self._semaphore = Semaphore(pool_maxsize - 1)
101 self._priority_lock = Semaphore()
102 else:
103 self._semaphore = Semaphore(pool_maxsize)
104
105 self.retry_timeout = retry_timeout
106 if retry_delay is None:
107 self.retry_delay: Callable[[], Iterable[float]] = lambda: repeat(1)
108 else:
109 self.retry_delay = retry_delay
110
111 def _send(self, method: str, path: str, *args: Any, **kwargs: Any) -> Dict:
112 # we use an infinite loop + time + sleep instead of gevent.Timeout
113 # to be able to re-raise the last exception instead of declaring one beforehand
114 started = time.monotonic()
115
116 # paths in long_paths have a reserved slot in the pool, and aren't error-handled
117 # to avoid them getting stuck when listener greenlet is killed
118 if path in self._long_paths:
119 with self._priority_lock:
120 return super()._send(method, path, *args, **kwargs)
121 last_ex = None

Callers 2

matrix_api_shellFunction · 0.90
__init__Method · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected