MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_persistent_subscribe

Function test_persistent_subscribe

python/pathway/tests/test_io.py:1742–1822  ·  view source on GitHub ↗
(snapshot_access, tmp_path)

Source from the content-addressed store, hash-verified

1740 "snapshot_access", [api.SnapshotAccess.FULL, api.SnapshotAccess.OFFSETS_ONLY]
1741)
1742def test_persistent_subscribe(snapshot_access, tmp_path):
1743 pstorage_dir = tmp_path / "PStorage"
1744 input_path = tmp_path / "input.csv"
1745
1746 class TestSchema(pw.Schema):
1747 k: int = pw.column_definition(primary_key=True)
1748 v: str
1749
1750 data = """
1751 k | v
1752 1 | foo
1753 """
1754
1755 write_csv(input_path, data)
1756 table = pw.io.csv.read(
1757 str(input_path),
1758 schema=TestSchema,
1759 name="1",
1760 mode="static",
1761 )
1762
1763 root = mock.Mock()
1764 pw.io.subscribe(table, on_change=root.on_change, on_end=root.on_end)
1765 pw.run(
1766 persistence_config=pw.persistence.Config(
1767 pw.persistence.Backend.filesystem(pstorage_dir),
1768 snapshot_access=snapshot_access,
1769 ),
1770 )
1771
1772 root.assert_has_calls(
1773 [
1774 mock.call.on_change(
1775 key=mock.ANY,
1776 row={"k": 1, "v": "foo"},
1777 time=mock.ANY,
1778 is_addition=True,
1779 ),
1780 mock.call.on_end(),
1781 ]
1782 )
1783 assert root.on_change.call_count == 1
1784 assert root.on_end.call_count == 1
1785
1786 G.clear()
1787
1788 data = """
1789 k | v
1790 1 | foo
1791 2 | bar
1792 """
1793
1794 write_csv(input_path, data)
1795 table = pw.io.csv.read(
1796 str(input_path),
1797 schema=TestSchema,
1798 name="1",
1799 mode="static",

Callers

nothing calls this directly

Calls 8

write_csvFunction · 0.90
runFunction · 0.90
subscribeMethod · 0.80
filesystemMethod · 0.80
clearMethod · 0.80
runMethod · 0.45
on_changeMethod · 0.45
on_endMethod · 0.45

Tested by

no test coverage detected