()
| 214 | counter = mock.Mock() |
| 215 | |
| 216 | def pipeline(): |
| 217 | G.clear() |
| 218 | |
| 219 | class OutputSchema(pw.Schema): |
| 220 | ret: int |
| 221 | |
| 222 | class TestAsyncTransformer(pw.AsyncTransformer, output_schema=OutputSchema): |
| 223 | async def invoke(self, value: int) -> dict[str, Any]: |
| 224 | counter() |
| 225 | await asyncio.sleep(random.uniform(0, 0.1)) |
| 226 | return dict(ret=value + 1) |
| 227 | |
| 228 | input = T( |
| 229 | """ |
| 230 | | value |
| 231 | 1 | 1 |
| 232 | 2 | 2 |
| 233 | 3 | 3 |
| 234 | """ |
| 235 | ) |
| 236 | expected = T( |
| 237 | """ |
| 238 | | ret |
| 239 | 1 | 2 |
| 240 | 2 | 3 |
| 241 | 3 | 4 |
| 242 | """ |
| 243 | ) |
| 244 | |
| 245 | result = TestAsyncTransformer(input_table=input).successful |
| 246 | |
| 247 | assert_table_equality( |
| 248 | result, |
| 249 | expected, |
| 250 | persistence_config=pw.persistence.Config( |
| 251 | pw.persistence.Backend.filesystem(cache_dir), |
| 252 | persistence_mode=pw.PersistenceMode.SELECTIVE_PERSISTING, |
| 253 | ), |
| 254 | ) |
| 255 | |
| 256 | # run twice to check if cache is used |
| 257 | pipeline() |
no test coverage detected