| 1740 | "snapshot_access", [api.SnapshotAccess.FULL, api.SnapshotAccess.OFFSETS_ONLY] |
| 1741 | ) |
| 1742 | def test_persistent_subscribe(snapshot_access, tmp_path): |
| 1743 | pstorage_dir = tmp_path / "PStorage" |
| 1744 | input_path = tmp_path / "input.csv" |
| 1745 | |
| 1746 | class TestSchema(pw.Schema): |
| 1747 | k: int = pw.column_definition(primary_key=True) |
| 1748 | v: str |
| 1749 | |
| 1750 | data = """ |
| 1751 | k | v |
| 1752 | 1 | foo |
| 1753 | """ |
| 1754 | |
| 1755 | write_csv(input_path, data) |
| 1756 | table = pw.io.csv.read( |
| 1757 | str(input_path), |
| 1758 | schema=TestSchema, |
| 1759 | name="1", |
| 1760 | mode="static", |
| 1761 | ) |
| 1762 | |
| 1763 | root = mock.Mock() |
| 1764 | pw.io.subscribe(table, on_change=root.on_change, on_end=root.on_end) |
| 1765 | pw.run( |
| 1766 | persistence_config=pw.persistence.Config( |
| 1767 | pw.persistence.Backend.filesystem(pstorage_dir), |
| 1768 | snapshot_access=snapshot_access, |
| 1769 | ), |
| 1770 | ) |
| 1771 | |
| 1772 | root.assert_has_calls( |
| 1773 | [ |
| 1774 | mock.call.on_change( |
| 1775 | key=mock.ANY, |
| 1776 | row={"k": 1, "v": "foo"}, |
| 1777 | time=mock.ANY, |
| 1778 | is_addition=True, |
| 1779 | ), |
| 1780 | mock.call.on_end(), |
| 1781 | ] |
| 1782 | ) |
| 1783 | assert root.on_change.call_count == 1 |
| 1784 | assert root.on_end.call_count == 1 |
| 1785 | |
| 1786 | G.clear() |
| 1787 | |
| 1788 | data = """ |
| 1789 | k | v |
| 1790 | 1 | foo |
| 1791 | 2 | bar |
| 1792 | """ |
| 1793 | |
| 1794 | write_csv(input_path, data) |
| 1795 | table = pw.io.csv.read( |
| 1796 | str(input_path), |
| 1797 | schema=TestSchema, |
| 1798 | name="1", |
| 1799 | mode="static", |