MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_disk_cache

Function test_disk_cache

python/pathway/tests/test_async_transformer.py:212–260  ·  view source on GitHub ↗
(tmp_path: pathlib.Path)

Source from the content-addressed store, hash-verified

210
211
212def test_disk_cache(tmp_path: pathlib.Path):
213 cache_dir = tmp_path / "test_cache"
214 counter = mock.Mock()
215
216 def pipeline():
217 G.clear()
218
219 class OutputSchema(pw.Schema):
220 ret: int
221
222 class TestAsyncTransformer(pw.AsyncTransformer, output_schema=OutputSchema):
223 async def invoke(self, value: int) -> dict[str, Any]:
224 counter()
225 await asyncio.sleep(random.uniform(0, 0.1))
226 return dict(ret=value + 1)
227
228 input = T(
229 """
230 | value
231 1 | 1
232 2 | 2
233 3 | 3
234 """
235 )
236 expected = T(
237 """
238 | ret
239 1 | 2
240 2 | 3
241 3 | 4
242 """
243 )
244
245 result = TestAsyncTransformer(input_table=input).successful
246
247 assert_table_equality(
248 result,
249 expected,
250 persistence_config=pw.persistence.Config(
251 pw.persistence.Backend.filesystem(cache_dir),
252 persistence_mode=pw.PersistenceMode.SELECTIVE_PERSISTING,
253 ),
254 )
255
256 # run twice to check if cache is used
257 pipeline()
258 pipeline()
259 assert os.path.exists(cache_dir)
260 assert counter.call_count == 3
261
262
263def test_with_instance():

Callers

nothing calls this directly

Calls 1

pipelineFunction · 0.70

Tested by

no test coverage detected