(tmp_path, plan, type_)
| 3983 | ) |
| 3984 | @pytest.mark.parametrize("type_", ["Int", "Duration", "DateTimeUtc", "DateTimeNaive"]) |
| 3985 | def test_synchronization_group(tmp_path, plan, type_): |
| 3986 | input_path_1 = tmp_path / "input_1" |
| 3987 | input_path_2 = tmp_path / "input_2" |
| 3988 | os.mkdir(input_path_1) |
| 3989 | os.mkdir(input_path_2) |
| 3990 | |
| 3991 | if type_ == "Int": |
| 3992 | max_difference = 10 |
| 3993 | elif type_ in ("DateTimeUtc", "DateTimeNaive", "Duration"): |
| 3994 | max_difference = datetime.timedelta(seconds=10) |
| 3995 | else: |
| 3996 | raise ValueError(f"Unexpected type: {type_}") |
| 3997 | |
| 3998 | def stream_inputs(input_path: pathlib.Path, plan: list[dict]): |
| 3999 | time.sleep(1) |
| 4000 | for index, entry in enumerate(plan): |
| 4001 | raw_value = entry["k"] |
| 4002 | prepared_entry = {"v": entry["v"]} |
| 4003 | if type_ == "Int": |
| 4004 | prepared_entry["k"] = raw_value |
| 4005 | elif type_ == "Duration": |
| 4006 | prepared_entry["k"] = raw_value * 1_000_000_000 |
| 4007 | elif type_ == "DateTimeUtc": |
| 4008 | prepared_entry["k"] = datetime.datetime.fromtimestamp( |
| 4009 | raw_value, tz=datetime.timezone.utc |
| 4010 | ).isoformat() |
| 4011 | elif type_ == "DateTimeNaive": |
| 4012 | prepared_entry["k"] = datetime.datetime.fromtimestamp( |
| 4013 | raw_value |
| 4014 | ).isoformat() |
| 4015 | else: |
| 4016 | raise ValueError(f"Unexpected type: {type_}") |
| 4017 | with open(input_path / f"{index}.jsonl", "w") as f: |
| 4018 | json.dump(prepared_entry, f) |
| 4019 | time.sleep(0.5) |
| 4020 | |
| 4021 | output_path = tmp_path / "output.csv" |
| 4022 | |
| 4023 | if type_ == "Int": |
| 4024 | TrackedFieldType = int |
| 4025 | elif type_ == "Duration": |
| 4026 | TrackedFieldType = pw.Duration |
| 4027 | elif type_ == "DateTimeUtc": |
| 4028 | TrackedFieldType = pw.DateTimeUtc |
| 4029 | elif type_ == "DateTimeNaive": |
| 4030 | TrackedFieldType = pw.DateTimeNaive |
| 4031 | else: |
| 4032 | raise ValueError(f"Unexpected type: {type_}") |
| 4033 | |
| 4034 | class InputSchema(pw.Schema): |
| 4035 | k: TrackedFieldType |
| 4036 | v: str |
| 4037 | |
| 4038 | table_1 = pw.io.jsonlines.read( |
| 4039 | input_path_1, schema=InputSchema, autocommit_duration_ms=20 |
| 4040 | ) |
| 4041 | table_2 = pw.io.jsonlines.read( |
| 4042 | input_path_2, schema=InputSchema, autocommit_duration_ms=20 |
nothing calls this directly
no test coverage detected