MCPcopy
hub / github.com/faust-streaming/faust / test_getmany

Method test_getmany

tests/unit/transport/test_consumer.py:501–529  ·  view source on GitHub ↗
(self, *, consumer)

Source from the content-addressed store, hash-verified

499
500 @pytest.mark.asyncio
501 async def test_getmany(self, *, consumer):
502 def to_message(tp, record):
503 return record
504
505 consumer._to_message = to_message
506 self._setup_records(
507 consumer,
508 active_partitions={TP1, TP2},
509 records={
510 TP1: ["A", "B", "C"],
511 TP2: ["D", "E", "F", "G"],
512 TP3: ["H", "I", "J"],
513 },
514 )
515 assert not consumer.should_stop
516 consumer.flow_active = False
517 consumer.can_resume_flow.set()
518 assert [a async for a in consumer.getmany(1.0)] == []
519 assert not consumer.should_stop
520 consumer.flow_active = True
521 assert [a async for a in consumer.getmany(1.0)] == [
522 (TP1, "A"),
523 (TP2, "D"),
524 (TP1, "B"),
525 (TP2, "E"),
526 (TP1, "C"),
527 (TP2, "F"),
528 (TP2, "G"),
529 ]
530
531 @pytest.mark.asyncio
532 async def test_getmany_buffered(self, *, consumer):

Callers

nothing calls this directly

Calls 3

_setup_recordsMethod · 0.95
setMethod · 0.45
getmanyMethod · 0.45

Tested by

no test coverage detected