MCPcopy
hub / github.com/pathwaycom/pathway / test_mock_snapshot_reader

Function test_mock_snapshot_reader

python/pathway/tests/test_io.py:2103–2168  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

2101
2102
2103def test_mock_snapshot_reader():
2104 class InputSchema(pw.Schema):
2105 number: int
2106
2107 events = {
2108 ("1", 0): [
2109 api.SnapshotEvent.advance_time(2),
2110 api.SnapshotEvent.insert(api.ref_scalar(0), [1]),
2111 api.SnapshotEvent.insert(api.ref_scalar(1), [1]),
2112 api.SnapshotEvent.advance_time(4),
2113 api.SnapshotEvent.insert(api.ref_scalar(2), [4]),
2114 api.SnapshotEvent.delete(api.ref_scalar(0), [1]),
2115 api.SnapshotEvent.FINISHED,
2116 ]
2117 }
2118
2119 t = pw.demo.generate_custom_stream(
2120 {},
2121 schema=InputSchema,
2122 nb_rows=0,
2123 input_rate=15,
2124 autocommit_duration_ms=50,
2125 name="1",
2126 )
2127
2128 on_change = mock.Mock()
2129 pw.io.subscribe(t, on_change=on_change)
2130
2131 run(
2132 persistence_config=pw.persistence.Config(
2133 pw.persistence.Backend.mock(events),
2134 persistence_mode=api.PersistenceMode.SPEEDRUN_REPLAY,
2135 snapshot_access=api.SnapshotAccess.REPLAY,
2136 )
2137 )
2138
2139 on_change.assert_has_calls(
2140 [
2141 mock.call.on_change(
2142 key=api.ref_scalar(0),
2143 row={"number": 1},
2144 time=2,
2145 is_addition=True,
2146 ),
2147 mock.call.on_change(
2148 key=api.ref_scalar(1),
2149 row={"number": 1},
2150 time=2,
2151 is_addition=True,
2152 ),
2153 mock.call.on_change(
2154 key=api.ref_scalar(2),
2155 row={"number": 4},
2156 time=4,
2157 is_addition=True,
2158 ),
2159 mock.call.on_change(
2160 key=api.ref_scalar(0),

Callers

nothing calls this directly

Calls 4

runFunction · 0.90
subscribeMethod · 0.80
mockMethod · 0.80
on_changeMethod · 0.45

Tested by

no test coverage detected