MCPcopy
hub / github.com/ag2ai/faststream / test_queue_obj

Method test_queue_obj

tests/brokers/rabbit/test_router.py:107–137  ·  view source on GitHub ↗
(
        self,
        queue: str,
    )

Source from the content-addressed store, hash-verified

105 mock.assert_called_once_with(name="john", id=2)
106
107 async def test_queue_obj(
108 self,
109 queue: str,
110 ) -> None:
111 broker = self.get_broker()
112 router = self.get_router(prefix="test/")
113
114 r_queue = RabbitQueue(queue)
115
116 event = asyncio.Event()
117
118 @router.subscriber(r_queue)
119 def subscriber(m) -> None:
120 event.set()
121
122 broker.include_router(router)
123
124 async with broker:
125 await broker.start()
126
127 await asyncio.wait(
128 (
129 asyncio.create_task(
130 broker.publish("hello", f"test/{r_queue.name}"),
131 ),
132 asyncio.create_task(event.wait()),
133 ),
134 timeout=3,
135 )
136
137 assert event.is_set()
138
139 async def test_queue_obj_with_routing_key(
140 self,

Callers

nothing calls this directly

Calls 7

RabbitQueueClass · 0.90
waitMethod · 0.80
get_brokerMethod · 0.45
get_routerMethod · 0.45
include_routerMethod · 0.45
startMethod · 0.45
publishMethod · 0.45

Tested by

no test coverage detected