MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_serialization

Function test_serialization

python/pathway/tests/test_py_object_wrapper.py:221–300  ·  view source on GitHub ↗
(tmp_path: Path, serialization: str, port: int)

Source from the content-addressed store, hash-verified

219)
220@needs_multiprocessing_fork
221def test_serialization(tmp_path: Path, serialization: str, port: int) -> None:
222 input_path = tmp_path / "in.csv"
223 output_path = tmp_path / "out.csv"
224
225 with open(input_path, "w") as f:
226 f.write(
227 """instance,a
2280,ab
2292,def
2302,x
231"""
232 )
233
234 class InputSchema(pw.Schema):
235 instance: int
236 a: str
237
238 class OutputSchema(pw.Schema):
239 res: str
240
241 def target():
242 t = pw.io.csv.read(input_path, schema=InputSchema, mode="static")
243 z = t.copy()
244
245 serializer = get_serializer(serialization)
246
247 @pw.udf
248 def create_simple(a: str) -> pw.PyObjectWrapper[SimpleStr]:
249 return pw.wrap_py_object(SimpleStr(a), serializer=serializer)
250
251 @pw.udf
252 def use_python_object(a: pw.PyObjectWrapper[SimpleStr], x: str) -> str:
253 return a.value.concat(x)
254
255 res = (
256 t.with_columns(s=create_simple(pw.this.a))
257 .join(z, left_instance=pw.left.instance, right_instance=pw.right.instance)
258 .select(res=use_python_object(pw.left.s, pw.right.a))
259 )
260 pw.io.csv.write(res, output_path)
261 run()
262
263 @dataclass
264 class Checker:
265 error: AssertionError | None = None
266
267 def __call__(self) -> bool:
268 if not output_path.exists():
269 return False
270 try:
271 G.clear()
272 result = pw.io.csv.read(output_path, schema=OutputSchema, mode="static")
273 expected = pw.debug.table_from_markdown(
274 """
275 res
276 abab
277 defx
278 defdef

Callers

nothing calls this directly

Calls 3

wait_result_with_checkerFunction · 0.90
writeMethod · 0.80
CheckerClass · 0.70

Tested by

no test coverage detected