MCPcopy
hub / github.com/faust-streaming/faust / agent

Method agent

tests/unit/test_streams.py:179–190  ·  view source on GitHub ↗
(stream)

Source from the content-addressed store, hash-verified

177
178 @app.agent(channel)
179 async def agent(stream):
180 with ExitStack() as stack:
181 if raises:
182 stack.enter_context(pytest.raises(raises))
183 async for value in stream:
184 if value is sentinel:
185 got_sentinel.set()
186 break
187 assert value is event.value
188 if side_effect:
189 got_sentinel.set()
190 raise side_effect
191
192 s: faust.StreamT = None
193 async with agent as _agent:

Callers 2

on_startMethod · 0.45

Calls 1

setMethod · 0.45

Tested by

no test coverage detected