MCPcopy Index your code
hub / github.com/nonebot/nonebot2 / stream_request

Method stream_request

nonebot/drivers/aiohttp.py:180–235  ·  view source on GitHub ↗
(
        self,
        setup: Request,
        *,
        chunk_size: int = 1024,
    )

Source from the content-addressed store, hash-verified

178
179 @override
180 async def stream_request(
181 self,
182 setup: Request,
183 *,
184 chunk_size: int = 1024,
185 ) -> AsyncGenerator[Response, None]:
186 if self._params:
187 url = setup.url.with_query({**self._params, **setup.url.query})
188 else:
189 url = setup.url
190
191 data = setup.data
192 if setup.files:
193 data = aiohttp.FormData(data or {}, quote_fields=False)
194 for name, file in setup.files:
195 data.add_field(name, file[1], content_type=file[2], filename=file[0])
196
197 cookies = (
198 (cookie.name, cookie.value)
199 for cookie in setup.cookies
200 if cookie.value is not None
201 )
202
203 async with self.client.request(
204 setup.method,
205 url,
206 data=setup.content or data,
207 json=setup.json,
208 cookies=cookies,
209 headers=setup.headers,
210 proxy=setup.proxy or self._proxy,
211 timeout=self._get_timeout(setup.timeout),
212 ) as response:
213 response_headers = response.headers.copy()
214 # aiohttp does not guarantee fixed-size chunks; re-chunk to exact size
215 buffer = bytearray()
216 async for chunk in response.content.iter_chunked(chunk_size):
217 if not chunk:
218 continue
219 buffer.extend(chunk)
220 while len(buffer) >= chunk_size:
221 out = bytes(buffer[:chunk_size])
222 del buffer[:chunk_size]
223 yield Response(
224 response.status,
225 headers=response_headers,
226 content=out,
227 request=setup,
228 )
229 if buffer:
230 yield Response(
231 response.status,
232 headers=response_headers,
233 content=bytes(buffer),
234 request=setup,
235 )
236
237 @override

Callers

nothing calls this directly

Calls 6

_get_timeoutMethod · 0.95
ResponseClass · 0.85
iter_chunkedMethod · 0.80
extendMethod · 0.80
requestMethod · 0.45
copyMethod · 0.45

Tested by

no test coverage detected