MCPcopy Index your code
hub / github.com/pathwaycom/pathway / CheckKeyConsistentInStreamCallback

Class CheckKeyConsistentInStreamCallback

python/pathway/tests/utils.py:237–262  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

235
236
237class CheckKeyConsistentInStreamCallback(CheckKeyEntriesInStreamCallback):
238 def __call__(
239 self,
240 key: api.Pointer,
241 row: dict[str, api.Value],
242 time: int,
243 is_addition: bool,
244 ) -> Any:
245 q = self.state.get(key)
246 assert (
247 q
248 ), f"Got unexpected entry {key=} {row=} {time=} {is_addition=}, expected entries= {self.state!r}"
249
250 while True:
251 entry = q.popleft()
252 if (is_addition, row) == (entry.insertion, entry.row):
253 if not q:
254 self.state.pop(key)
255 break
256 else:
257 assert (
258 q
259 ), f"Skipping over entries emptied the set of expected entries for {key=} and state = {self.state!r}"
260
261 def on_end(self):
262 assert not self.state, f"Non empty final state = {self.state!r}"
263
264
265# this callback does not verify the order of entries, only that all of them were present

Calls

no outgoing calls

Tested by

no test coverage detected