MCPcopy Index your code
hub / github.com/pathwaycom/pathway / test_postgres_input

Function test_postgres_input

integration_tests/db_connectors/test_postgres.py:1003–1166  ·  view source on GitHub ↗
(tmp_path, postgres, postgres_with_tls, with_tls)

Source from the content-addressed store, hash-verified

1001
1002@pytest.mark.parametrize("with_tls", [False, True])
1003def test_postgres_input(tmp_path, postgres, postgres_with_tls, with_tls):
1004 class InputSchema(pw.Schema):
1005 id: int = pw.column_definition(primary_key=True)
1006 v_none: bool | None
1007 v_bool: bool
1008 v_int: int
1009 v_float: float
1010 v_string: str
1011 v_bytes: bytes
1012 v_int_array: np.ndarray[None, int] # type: ignore
1013 v_float_array: np.ndarray[None, float] # type: ignore
1014 v_datetime_naive: pw.DateTimeNaive
1015 v_datetime_utc: pw.DateTimeUtc
1016 v_duration: pw.Duration
1017 v_json: pw.Json
1018 v_pyobject: bytes
1019 v_int_list: list[list[int]]
1020 v_float_list: list[list[float]]
1021 v_string_matrix: list[list[str]]
1022 v_bytes_matrix: list[list[bytes]]
1023
1024 if with_tls:
1025 postgres = postgres_with_tls
1026 postgres_settings = copy.deepcopy(POSTGRES_WITH_TLS_SETTINGS)
1027 postgres_settings["sslmode"] = "verify-ca"
1028 postgres_settings["sslrootcert"] = str(CREDENTIALS_DIR / "ca.crt")
1029 else:
1030 postgres_settings = POSTGRES_SETTINGS
1031
1032 output_path = tmp_path / "output.jsonl"
1033 table_name = postgres.random_table_name()
1034 create_table_sql = f"""
1035 CREATE TABLE {table_name} (
1036 id BIGSERIAL PRIMARY KEY,
1037 v_none BOOLEAN,
1038 v_bool BOOLEAN,
1039 v_int BIGINT,
1040 v_float DOUBLE PRECISION,
1041 v_string TEXT,
1042 v_bytes BYTEA,
1043 v_int_array BIGINT[],
1044 v_float_array DOUBLE PRECISION[],
1045 v_datetime_naive TIMESTAMP,
1046 v_datetime_utc TIMESTAMPTZ,
1047 v_duration INTERVAL,
1048 v_json JSONB,
1049 v_pyobject BYTEA,
1050 v_int_list BIGINT[],
1051 v_float_list DOUBLE PRECISION[],
1052 v_string_matrix TEXT[],
1053 v_bytes_matrix BYTEA[]
1054 );
1055 """
1056 postgres.execute_sql(create_table_sql)
1057
1058 with postgres.publication(table_name) as publication_name:
1059 json_value_dumped = json.dumps({"key": "value"})
1060 insert_row_sql = f"""

Callers

nothing calls this directly

Calls 11

wait_result_with_checkerFunction · 0.90
publicationMethod · 0.80
writeMethod · 0.80
random_table_nameMethod · 0.45
execute_sqlMethod · 0.45
dumpsMethod · 0.45
startMethod · 0.45
loadsMethod · 0.45
addMethod · 0.45
removeMethod · 0.45

Tested by

no test coverage detected