MCPcopy
hub / github.com/pathwaycom/pathway / run_computation

Function run_computation

python/pathway/tests/test_deduplicate.py:375–408  ·  view source on GitHub ↗
(nb_rows: int, offset: int, expected: list[int])

Source from the content-addressed store, hash-verified

373 )
374
375 def run_computation(nb_rows: int, offset: int, expected: list[int]):
376 G.clear()
377
378 def acceptor(new_value, old_value) -> bool:
379 return new_value >= old_value + 2
380
381 class InputSchema(pw.Schema):
382 value: float
383
384 class StreamSubject(pw.io.python.ConnectorSubject):
385 def run(self):
386 for index in range(offset, offset + nb_rows):
387 self.next_json({"value": index})
388 # Make sure that the data can't be batched
389 time.sleep(0.01)
390 self.commit()
391
392 table = pw.io.python.read(
393 StreamSubject(),
394 schema=InputSchema,
395 format="json",
396 )
397 result = table.deduplicate(value=pw.this.value, acceptor=acceptor)
398
399 emit = mock.Mock()
400
401 def on_change(key: pw.Pointer, row: dict, time: int, is_addition: bool):
402 if is_addition:
403 emit(row["value"])
404
405 pw.io.subscribe(result, on_change)
406 run(persistence_config=persistence_config)
407
408 emit.assert_has_calls([mock.call(i) for i in expected])
409
410 run_computation(6, 0, [0, 2, 4])
411 run_computation(5, 6, [6, 8, 10])

Calls 5

runFunction · 0.90
StreamSubjectClass · 0.85
clearMethod · 0.80
deduplicateMethod · 0.80
subscribeMethod · 0.80

Tested by

no test coverage detected