(tmp_path: Path, serialization: str, port: int)
| 219 | ) |
| 220 | @needs_multiprocessing_fork |
| 221 | def test_serialization(tmp_path: Path, serialization: str, port: int) -> None: |
| 222 | input_path = tmp_path / "in.csv" |
| 223 | output_path = tmp_path / "out.csv" |
| 224 | |
| 225 | with open(input_path, "w") as f: |
| 226 | f.write( |
| 227 | """instance,a |
| 228 | 0,ab |
| 229 | 2,def |
| 230 | 2,x |
| 231 | """ |
| 232 | ) |
| 233 | |
| 234 | class InputSchema(pw.Schema): |
| 235 | instance: int |
| 236 | a: str |
| 237 | |
| 238 | class OutputSchema(pw.Schema): |
| 239 | res: str |
| 240 | |
| 241 | def target(): |
| 242 | t = pw.io.csv.read(input_path, schema=InputSchema, mode="static") |
| 243 | z = t.copy() |
| 244 | |
| 245 | serializer = get_serializer(serialization) |
| 246 | |
| 247 | @pw.udf |
| 248 | def create_simple(a: str) -> pw.PyObjectWrapper[SimpleStr]: |
| 249 | return pw.wrap_py_object(SimpleStr(a), serializer=serializer) |
| 250 | |
| 251 | @pw.udf |
| 252 | def use_python_object(a: pw.PyObjectWrapper[SimpleStr], x: str) -> str: |
| 253 | return a.value.concat(x) |
| 254 | |
| 255 | res = ( |
| 256 | t.with_columns(s=create_simple(pw.this.a)) |
| 257 | .join(z, left_instance=pw.left.instance, right_instance=pw.right.instance) |
| 258 | .select(res=use_python_object(pw.left.s, pw.right.a)) |
| 259 | ) |
| 260 | pw.io.csv.write(res, output_path) |
| 261 | run() |
| 262 | |
| 263 | @dataclass |
| 264 | class Checker: |
| 265 | error: AssertionError | None = None |
| 266 | |
| 267 | def __call__(self) -> bool: |
| 268 | if not output_path.exists(): |
| 269 | return False |
| 270 | try: |
| 271 | G.clear() |
| 272 | result = pw.io.csv.read(output_path, schema=OutputSchema, mode="static") |
| 273 | expected = pw.debug.table_from_markdown( |
| 274 | """ |
| 275 | res |
| 276 | abab |
| 277 | defx |
| 278 | defdef |
nothing calls this directly
no test coverage detected