MCPcopy
hub / github.com/faust-streaming/faust / records_iterator

Method records_iterator

faust/transport/utils.py:55–72  ·  view source on GitHub ↗

Iterate over topic index map in round-robin order.

(self, index: TopicIndexMap)

Source from the content-addressed store, hash-verified

53 return self.records_iterator(self.map_from_records(records))
54
55 def records_iterator(self, index: TopicIndexMap) -> Iterator[Tuple[TP, Any]]:
56 """Iterate over topic index map in round-robin order."""
57 to_remove: Set[str] = set()
58 sentinel = object()
59 _next = next
60 while index:
61 for topic in to_remove:
62 index.pop(topic, None)
63 for topic, messages in index.items():
64 item = _next(messages, sentinel)
65 if item is sentinel:
66 # this topic is now empty,
67 # but we cannot remove from dict while iterating over it,
68 # so move that to the outer loop.
69 to_remove.add(topic)
70 continue
71 tp, record = item # type: ignore
72 yield tp, record
73
74
75class TopicBuffer(Iterator):

Callers 1

iterateMethod · 0.95

Calls 2

itemsMethod · 0.45
addMethod · 0.45

Tested by

no test coverage detected