Iterate over topic index map in round-robin order.
(self, index: TopicIndexMap)
| 53 | return self.records_iterator(self.map_from_records(records)) |
| 54 | |
| 55 | def records_iterator(self, index: TopicIndexMap) -> Iterator[Tuple[TP, Any]]: |
| 56 | """Iterate over topic index map in round-robin order.""" |
| 57 | to_remove: Set[str] = set() |
| 58 | sentinel = object() |
| 59 | _next = next |
| 60 | while index: |
| 61 | for topic in to_remove: |
| 62 | index.pop(topic, None) |
| 63 | for topic, messages in index.items(): |
| 64 | item = _next(messages, sentinel) |
| 65 | if item is sentinel: |
| 66 | # this topic is now empty, |
| 67 | # but we cannot remove from dict while iterating over it, |
| 68 | # so move that to the outer loop. |
| 69 | to_remove.add(topic) |
| 70 | continue |
| 71 | tp, record = item # type: ignore |
| 72 | yield tp, record |
| 73 | |
| 74 | |
| 75 | class TopicBuffer(Iterator): |