MCPcopy
hub / github.com/ag2ai/faststream / test_invalid

Method test_invalid

tests/brokers/base/fastapi.py:354–369  ·  view source on GitHub ↗
(self, queue: str)

Source from the content-addressed store, hash-verified

352 assert await r.decode() == "hi", r
353
354 async def test_invalid(self, queue: str) -> None:
355 router = self.router_class()
356
357 app = FastAPI()
358
359 args, kwargs = self.get_subscriber_params(queue)
360
361 @router.subscriber(*args, **kwargs)
362 async def hello(msg: int) -> None: ...
363
364 app.include_router(router)
365
366 async with self.patch_broker(router.broker) as br:
367 with TestClient(app):
368 with pytest.raises(RequestValidationError):
369 await br.publish("hi", queue)
370
371 async def test_headers(self, queue: str) -> None:
372 router = self.router_class()

Callers

nothing calls this directly

Calls 4

get_subscriber_paramsMethod · 0.45
include_routerMethod · 0.45
patch_brokerMethod · 0.45
publishMethod · 0.45

Tested by

no test coverage detected