MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_synchronization_group

Function test_synchronization_group

python/pathway/tests/test_io.py:3985–4071  ·  view source on GitHub ↗
(tmp_path, plan, type_)

Source from the content-addressed store, hash-verified

3983)
3984@pytest.mark.parametrize("type_", ["Int", "Duration", "DateTimeUtc", "DateTimeNaive"])
3985def test_synchronization_group(tmp_path, plan, type_):
3986 input_path_1 = tmp_path / "input_1"
3987 input_path_2 = tmp_path / "input_2"
3988 os.mkdir(input_path_1)
3989 os.mkdir(input_path_2)
3990
3991 if type_ == "Int":
3992 max_difference = 10
3993 elif type_ in ("DateTimeUtc", "DateTimeNaive", "Duration"):
3994 max_difference = datetime.timedelta(seconds=10)
3995 else:
3996 raise ValueError(f"Unexpected type: {type_}")
3997
3998 def stream_inputs(input_path: pathlib.Path, plan: list[dict]):
3999 time.sleep(1)
4000 for index, entry in enumerate(plan):
4001 raw_value = entry["k"]
4002 prepared_entry = {"v": entry["v"]}
4003 if type_ == "Int":
4004 prepared_entry["k"] = raw_value
4005 elif type_ == "Duration":
4006 prepared_entry["k"] = raw_value * 1_000_000_000
4007 elif type_ == "DateTimeUtc":
4008 prepared_entry["k"] = datetime.datetime.fromtimestamp(
4009 raw_value, tz=datetime.timezone.utc
4010 ).isoformat()
4011 elif type_ == "DateTimeNaive":
4012 prepared_entry["k"] = datetime.datetime.fromtimestamp(
4013 raw_value
4014 ).isoformat()
4015 else:
4016 raise ValueError(f"Unexpected type: {type_}")
4017 with open(input_path / f"{index}.jsonl", "w") as f:
4018 json.dump(prepared_entry, f)
4019 time.sleep(0.5)
4020
4021 output_path = tmp_path / "output.csv"
4022
4023 if type_ == "Int":
4024 TrackedFieldType = int
4025 elif type_ == "Duration":
4026 TrackedFieldType = pw.Duration
4027 elif type_ == "DateTimeUtc":
4028 TrackedFieldType = pw.DateTimeUtc
4029 elif type_ == "DateTimeNaive":
4030 TrackedFieldType = pw.DateTimeNaive
4031 else:
4032 raise ValueError(f"Unexpected type: {type_}")
4033
4034 class InputSchema(pw.Schema):
4035 k: TrackedFieldType
4036 v: str
4037
4038 table_1 = pw.io.jsonlines.read(
4039 input_path_1, schema=InputSchema, autocommit_duration_ms=20
4040 )
4041 table_2 = pw.io.jsonlines.read(
4042 input_path_2, schema=InputSchema, autocommit_duration_ms=20

Callers

nothing calls this directly

Calls 9

wait_result_with_checkerFunction · 0.90
writeMethod · 0.80
checkerFunction · 0.70
concatMethod · 0.45
getMethod · 0.45
startMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected