MCPcopy
hub / github.com/pathwaycom/pathway / pipeline

Function pipeline

python/pathway/tests/test_async_transformer.py:216–254  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

214 counter = mock.Mock()
215
216 def pipeline():
217 G.clear()
218
219 class OutputSchema(pw.Schema):
220 ret: int
221
222 class TestAsyncTransformer(pw.AsyncTransformer, output_schema=OutputSchema):
223 async def invoke(self, value: int) -> dict[str, Any]:
224 counter()
225 await asyncio.sleep(random.uniform(0, 0.1))
226 return dict(ret=value + 1)
227
228 input = T(
229 """
230 | value
231 1 | 1
232 2 | 2
233 3 | 3
234 """
235 )
236 expected = T(
237 """
238 | ret
239 1 | 2
240 2 | 3
241 3 | 4
242 """
243 )
244
245 result = TestAsyncTransformer(input_table=input).successful
246
247 assert_table_equality(
248 result,
249 expected,
250 persistence_config=pw.persistence.Config(
251 pw.persistence.Backend.filesystem(cache_dir),
252 persistence_mode=pw.PersistenceMode.SELECTIVE_PERSISTING,
253 ),
254 )
255
256 # run twice to check if cache is used
257 pipeline()

Callers 1

test_disk_cacheFunction · 0.70

Calls 4

TFunction · 0.90
clearMethod · 0.80
filesystemMethod · 0.80

Tested by

no test coverage detected