MCPcopy
hub / github.com/pathwaycom/pathway / write

Function write

python/pathway/io/postgres/__init__.py:605–893  ·  view source on GitHub ↗

Writes ``table`` to a Postgres table. Two types of output tables are supported: **stream of changes** and **snapshot**. When using **stream of changes**, the output table contains a log of all changes that occurred in the Pathway Live Data Framework table. In this case, it is expected t

(
    table: Table,
    postgres_settings: dict,
    table_name: str,
    *,
    schema_name: str | None = "public",
    max_batch_size: int | None = None,
    init_mode: Literal["default", "create_if_not_exists", "replace"] = "default",
    output_table_type: Literal["stream_of_changes", "snapshot"] = "stream_of_changes",
    primary_key: list[ColumnReference] | None = None,
    name: str | None = None,
    sort_by: Iterable[ColumnReference] | None = None,
    _external_diff_column: ColumnReference | None = None,
)

Source from the content-addressed store, hash-verified

603@check_arg_types
604@trace_user_frame
605def write(
606 table: Table,
607 postgres_settings: dict,
608 table_name: str,
609 *,
610 schema_name: str | None = "public",
611 max_batch_size: int | None = None,
612 init_mode: Literal["default", "create_if_not_exists", "replace"] = "default",
613 output_table_type: Literal["stream_of_changes", "snapshot"] = "stream_of_changes",
614 primary_key: list[ColumnReference] | None = None,
615 name: str | None = None,
616 sort_by: Iterable[ColumnReference] | None = None,
617 _external_diff_column: ColumnReference | None = None,
618) -> None:
619 """Writes ``table`` to a Postgres table. Two types of output tables are supported:
620 **stream of changes** and **snapshot**.
621
622 When using **stream of changes**, the output table contains a log of all changes that
623 occurred in the Pathway Live Data Framework table. In this case, it is expected to have two additional columns,
624 ``time`` and ``diff``, both of integer type. ``time`` indicates the transactional
625 minibatch time in which the row change occurred. ``diff`` can be either ``1`` for
626 row insertion or ``-1`` for row deletion.
627
628 When using **snapshot**, the set of columns in the output table matches the set of
629 columns in the table you are writing. No additional columns are created.
630
631 Args:
632 table: Table to be written.
633 postgres_settings: Components for the connection string for Postgres. The string is
634 formed by joining key-value pairs from the given dictionary with spaces,
635 with each pair formatted as `key=value`. Keys must be strings. Values can be
636 of any type; if a value is not a string, it will be converted using Python's
637 `str()` function.
638 The Pathway Live Data Framework injects conservative TCP-keepalive defaults (``keepalives``,
639 ``keepalives_idle=300``, ``keepalives_interval=30``, ``keepalives_count=3``,
640 and ``tcp_user_timeout=300000``) so that an unreachable
641 the Pathway Live Data Framework process is detected by PostgreSQL within minutes rather than
642 the OS-inherited ~2-hour default; any of these can be overridden by
643 passing the same key in ``postgres_settings``.
644 table_name: Name of the target table. Any PostgreSQL identifier is
645 accepted — the connector quotes the name before interpolating it
646 into generated SQL, so hyphens, mixed case, and reserved words
647 round-trip as-is. Column names in ``table`` and in ``primary_key``
648 are quoted the same way.
649 schema_name: Name of the PostgreSQL schema that owns the target table.
650 Defaults to ``"public"``. Set this when writing to a non-default
651 schema; the name is quoted identically to ``table_name``.
652 max_batch_size: Maximum number of entries allowed to be committed within a
653 single transaction.
654 init_mode: "default": The default initialization mode;
655 "create_if_not_exists": initializes the SQL writer by creating the necessary table
656 if they do not already exist;
657 "replace": Initializes the SQL writer by replacing any existing table.
658 output_table_type: Defines how the output table manages its data. If set to ``"stream_of_changes"``
659 (the default), the system outputs a stream of modifications to the target table.
660 This stream includes two additional integer columns: ``time``, representing the computation
661 minibatch, and ``diff``, indicating the type of change (``1`` for row addition and
662 ``-1`` for row deletion). If set to ``"snapshot"``, the table maintains the current

Callers

nothing calls this directly

Calls 10

init_mode_from_strFunction · 0.90
get_column_indexFunction · 0.90
_build_tls_settingsFunction · 0.85
lowerMethod · 0.80
countMethod · 0.80
toMethod · 0.80
column_namesMethod · 0.45

Tested by

no test coverage detected