| 2101 | |
| 2102 | |
| 2103 | def test_mock_snapshot_reader(): |
| 2104 | class InputSchema(pw.Schema): |
| 2105 | number: int |
| 2106 | |
| 2107 | events = { |
| 2108 | ("1", 0): [ |
| 2109 | api.SnapshotEvent.advance_time(2), |
| 2110 | api.SnapshotEvent.insert(api.ref_scalar(0), [1]), |
| 2111 | api.SnapshotEvent.insert(api.ref_scalar(1), [1]), |
| 2112 | api.SnapshotEvent.advance_time(4), |
| 2113 | api.SnapshotEvent.insert(api.ref_scalar(2), [4]), |
| 2114 | api.SnapshotEvent.delete(api.ref_scalar(0), [1]), |
| 2115 | api.SnapshotEvent.FINISHED, |
| 2116 | ] |
| 2117 | } |
| 2118 | |
| 2119 | t = pw.demo.generate_custom_stream( |
| 2120 | {}, |
| 2121 | schema=InputSchema, |
| 2122 | nb_rows=0, |
| 2123 | input_rate=15, |
| 2124 | autocommit_duration_ms=50, |
| 2125 | name="1", |
| 2126 | ) |
| 2127 | |
| 2128 | on_change = mock.Mock() |
| 2129 | pw.io.subscribe(t, on_change=on_change) |
| 2130 | |
| 2131 | run( |
| 2132 | persistence_config=pw.persistence.Config( |
| 2133 | pw.persistence.Backend.mock(events), |
| 2134 | persistence_mode=api.PersistenceMode.SPEEDRUN_REPLAY, |
| 2135 | snapshot_access=api.SnapshotAccess.REPLAY, |
| 2136 | ) |
| 2137 | ) |
| 2138 | |
| 2139 | on_change.assert_has_calls( |
| 2140 | [ |
| 2141 | mock.call.on_change( |
| 2142 | key=api.ref_scalar(0), |
| 2143 | row={"number": 1}, |
| 2144 | time=2, |
| 2145 | is_addition=True, |
| 2146 | ), |
| 2147 | mock.call.on_change( |
| 2148 | key=api.ref_scalar(1), |
| 2149 | row={"number": 1}, |
| 2150 | time=2, |
| 2151 | is_addition=True, |
| 2152 | ), |
| 2153 | mock.call.on_change( |
| 2154 | key=api.ref_scalar(2), |
| 2155 | row={"number": 4}, |
| 2156 | time=4, |
| 2157 | is_addition=True, |
| 2158 | ), |
| 2159 | mock.call.on_change( |
| 2160 | key=api.ref_scalar(0), |