(tmp_path, postgres, postgres_with_tls, with_tls)
| 1001 | |
| 1002 | @pytest.mark.parametrize("with_tls", [False, True]) |
| 1003 | def test_postgres_input(tmp_path, postgres, postgres_with_tls, with_tls): |
| 1004 | class InputSchema(pw.Schema): |
| 1005 | id: int = pw.column_definition(primary_key=True) |
| 1006 | v_none: bool | None |
| 1007 | v_bool: bool |
| 1008 | v_int: int |
| 1009 | v_float: float |
| 1010 | v_string: str |
| 1011 | v_bytes: bytes |
| 1012 | v_int_array: np.ndarray[None, int] # type: ignore |
| 1013 | v_float_array: np.ndarray[None, float] # type: ignore |
| 1014 | v_datetime_naive: pw.DateTimeNaive |
| 1015 | v_datetime_utc: pw.DateTimeUtc |
| 1016 | v_duration: pw.Duration |
| 1017 | v_json: pw.Json |
| 1018 | v_pyobject: bytes |
| 1019 | v_int_list: list[list[int]] |
| 1020 | v_float_list: list[list[float]] |
| 1021 | v_string_matrix: list[list[str]] |
| 1022 | v_bytes_matrix: list[list[bytes]] |
| 1023 | |
| 1024 | if with_tls: |
| 1025 | postgres = postgres_with_tls |
| 1026 | postgres_settings = copy.deepcopy(POSTGRES_WITH_TLS_SETTINGS) |
| 1027 | postgres_settings["sslmode"] = "verify-ca" |
| 1028 | postgres_settings["sslrootcert"] = str(CREDENTIALS_DIR / "ca.crt") |
| 1029 | else: |
| 1030 | postgres_settings = POSTGRES_SETTINGS |
| 1031 | |
| 1032 | output_path = tmp_path / "output.jsonl" |
| 1033 | table_name = postgres.random_table_name() |
| 1034 | create_table_sql = f""" |
| 1035 | CREATE TABLE {table_name} ( |
| 1036 | id BIGSERIAL PRIMARY KEY, |
| 1037 | v_none BOOLEAN, |
| 1038 | v_bool BOOLEAN, |
| 1039 | v_int BIGINT, |
| 1040 | v_float DOUBLE PRECISION, |
| 1041 | v_string TEXT, |
| 1042 | v_bytes BYTEA, |
| 1043 | v_int_array BIGINT[], |
| 1044 | v_float_array DOUBLE PRECISION[], |
| 1045 | v_datetime_naive TIMESTAMP, |
| 1046 | v_datetime_utc TIMESTAMPTZ, |
| 1047 | v_duration INTERVAL, |
| 1048 | v_json JSONB, |
| 1049 | v_pyobject BYTEA, |
| 1050 | v_int_list BIGINT[], |
| 1051 | v_float_list DOUBLE PRECISION[], |
| 1052 | v_string_matrix TEXT[], |
| 1053 | v_bytes_matrix BYTEA[] |
| 1054 | ); |
| 1055 | """ |
| 1056 | postgres.execute_sql(create_table_sql) |
| 1057 | |
| 1058 | with postgres.publication(table_name) as publication_name: |
| 1059 | json_value_dumped = json.dumps({"key": "value"}) |
| 1060 | insert_row_sql = f""" |
nothing calls this directly
no test coverage detected