(nb_rows: int, offset: int, expected: list[int])
| 373 | ) |
| 374 | |
| 375 | def run_computation(nb_rows: int, offset: int, expected: list[int]): |
| 376 | G.clear() |
| 377 | |
| 378 | def acceptor(new_value, old_value) -> bool: |
| 379 | return new_value >= old_value + 2 |
| 380 | |
| 381 | class InputSchema(pw.Schema): |
| 382 | value: float |
| 383 | |
| 384 | class StreamSubject(pw.io.python.ConnectorSubject): |
| 385 | def run(self): |
| 386 | for index in range(offset, offset + nb_rows): |
| 387 | self.next_json({"value": index}) |
| 388 | # Make sure that the data can't be batched |
| 389 | time.sleep(0.01) |
| 390 | self.commit() |
| 391 | |
| 392 | table = pw.io.python.read( |
| 393 | StreamSubject(), |
| 394 | schema=InputSchema, |
| 395 | format="json", |
| 396 | ) |
| 397 | result = table.deduplicate(value=pw.this.value, acceptor=acceptor) |
| 398 | |
| 399 | emit = mock.Mock() |
| 400 | |
| 401 | def on_change(key: pw.Pointer, row: dict, time: int, is_addition: bool): |
| 402 | if is_addition: |
| 403 | emit(row["value"]) |
| 404 | |
| 405 | pw.io.subscribe(result, on_change) |
| 406 | run(persistence_config=persistence_config) |
| 407 | |
| 408 | emit.assert_has_calls([mock.call(i) for i in expected]) |
| 409 | |
| 410 | run_computation(6, 0, [0, 2, 4]) |
| 411 | run_computation(5, 6, [6, 8, 10]) |
no test coverage detected