MCPcopy
hub / github.com/ag2ai/faststream / test_publisher_mock

Method test_publisher_mock

tests/brokers/base/fastapi.py:706–721  ·  view source on GitHub ↗
(self, queue: str)

Source from the content-addressed store, hash-verified

704 m.mock.assert_called_once_with("hello")
705
706 async def test_publisher_mock(self, queue: str) -> None:
707 router = self.router_class()
708
709 publisher = router.publisher(queue + "resp")
710
711 args, kwargs = self.get_subscriber_params(queue)
712 sub = router.subscriber(*args, **kwargs)
713
714 @publisher
715 @sub
716 async def m() -> str:
717 return "response"
718
719 async with self.patch_broker(router.broker) as rb:
720 await rb.publish("hello", queue)
721 publisher.mock.assert_called_with("response")
722
723 async def test_include(self, queue: str) -> None:
724 router = self.router_class()

Callers

nothing calls this directly

Calls 5

publisherMethod · 0.45
get_subscriber_paramsMethod · 0.45
subscriberMethod · 0.45
patch_brokerMethod · 0.45
publishMethod · 0.45

Tested by

no test coverage detected