()
| 610 | |
| 611 | |
| 612 | def test_subscribe(): |
| 613 | class TestSubject(pw.io.python.ConnectorSubject): |
| 614 | def run(self): |
| 615 | self.next(data="foo") |
| 616 | |
| 617 | table = pw.io.python.read(TestSubject(), format="raw") |
| 618 | |
| 619 | root = mock.Mock() |
| 620 | |
| 621 | pw.io.subscribe( |
| 622 | table, |
| 623 | on_change=root.on_change, |
| 624 | on_time_end=root.on_time_end, |
| 625 | on_end=root.on_end, |
| 626 | ) |
| 627 | |
| 628 | run() |
| 629 | |
| 630 | root.assert_has_calls( |
| 631 | [ |
| 632 | mock.call.on_change( |
| 633 | key=mock.ANY, row={"data": "foo"}, time=mock.ANY, is_addition=True |
| 634 | ), |
| 635 | mock.call.on_time_end(mock.ANY), |
| 636 | mock.call.on_end(), |
| 637 | ] |
| 638 | ) |
| 639 | |
| 640 | |
| 641 | def test_python_write(): |
nothing calls this directly
no test coverage detected