MCPcopy Index your code
hub / github.com/ag2ai/faststream / test_rpc_request

Method test_rpc_request

tests/prometheus/basic.py:288–325  ·  view source on GitHub ↗
(
        self,
        queue: str,
    )

Source from the content-addressed store, hash-verified

286class LocalRPCPrometheusTestcase:
287 @pytest.mark.asyncio()
288 async def test_rpc_request(
289 self,
290 queue: str,
291 ) -> None:
292 event = asyncio.Event()
293 registry = CollectorRegistry()
294
295 middleware = self.get_middleware(registry=registry)
296
297 broker = self.get_broker(apply_types=True, middlewares=(middleware,))
298
299 message = None
300
301 @broker.subscriber(queue)
302 async def handle(m=Context("message")):
303 event.set()
304
305 nonlocal message
306 message = m
307
308 return ""
309
310 async with self.patch_broker(broker) as br:
311 await br.start()
312
313 await asyncio.wait_for(
314 br.request("", queue),
315 timeout=3,
316 )
317
318 assert event.is_set()
319
320 self.assert_metrics(
321 registry=registry,
322 message=message,
323 exception_class=None,
324 custom_labels={},
325 )
326
327
328class LocalMetricsSettingsProviderTestcase:

Callers

nothing calls this directly

Calls 6

get_middlewareMethod · 0.45
get_brokerMethod · 0.45
patch_brokerMethod · 0.45
startMethod · 0.45
requestMethod · 0.45
assert_metricsMethod · 0.45

Tested by

no test coverage detected