(self, *, consumer)
| 499 | |
| 500 | @pytest.mark.asyncio |
| 501 | async def test_getmany(self, *, consumer): |
| 502 | def to_message(tp, record): |
| 503 | return record |
| 504 | |
| 505 | consumer._to_message = to_message |
| 506 | self._setup_records( |
| 507 | consumer, |
| 508 | active_partitions={TP1, TP2}, |
| 509 | records={ |
| 510 | TP1: ["A", "B", "C"], |
| 511 | TP2: ["D", "E", "F", "G"], |
| 512 | TP3: ["H", "I", "J"], |
| 513 | }, |
| 514 | ) |
| 515 | assert not consumer.should_stop |
| 516 | consumer.flow_active = False |
| 517 | consumer.can_resume_flow.set() |
| 518 | assert [a async for a in consumer.getmany(1.0)] == [] |
| 519 | assert not consumer.should_stop |
| 520 | consumer.flow_active = True |
| 521 | assert [a async for a in consumer.getmany(1.0)] == [ |
| 522 | (TP1, "A"), |
| 523 | (TP2, "D"), |
| 524 | (TP1, "B"), |
| 525 | (TP2, "E"), |
| 526 | (TP1, "C"), |
| 527 | (TP2, "F"), |
| 528 | (TP2, "G"), |
| 529 | ] |
| 530 | |
| 531 | @pytest.mark.asyncio |
| 532 | async def test_getmany_buffered(self, *, consumer): |
nothing calls this directly
no test coverage detected