Writes ``table`` to a Postgres table. Two types of output tables are supported: **stream of changes** and **snapshot**. When using **stream of changes**, the output table contains a log of all changes that occurred in the Pathway Live Data Framework table. In this case, it is expected t
(
table: Table,
postgres_settings: dict,
table_name: str,
*,
schema_name: str | None = "public",
max_batch_size: int | None = None,
init_mode: Literal["default", "create_if_not_exists", "replace"] = "default",
output_table_type: Literal["stream_of_changes", "snapshot"] = "stream_of_changes",
primary_key: list[ColumnReference] | None = None,
name: str | None = None,
sort_by: Iterable[ColumnReference] | None = None,
_external_diff_column: ColumnReference | None = None,
)
| 603 | @check_arg_types |
| 604 | @trace_user_frame |
| 605 | def write( |
| 606 | table: Table, |
| 607 | postgres_settings: dict, |
| 608 | table_name: str, |
| 609 | *, |
| 610 | schema_name: str | None = "public", |
| 611 | max_batch_size: int | None = None, |
| 612 | init_mode: Literal["default", "create_if_not_exists", "replace"] = "default", |
| 613 | output_table_type: Literal["stream_of_changes", "snapshot"] = "stream_of_changes", |
| 614 | primary_key: list[ColumnReference] | None = None, |
| 615 | name: str | None = None, |
| 616 | sort_by: Iterable[ColumnReference] | None = None, |
| 617 | _external_diff_column: ColumnReference | None = None, |
| 618 | ) -> None: |
| 619 | """Writes ``table`` to a Postgres table. Two types of output tables are supported: |
| 620 | **stream of changes** and **snapshot**. |
| 621 | |
| 622 | When using **stream of changes**, the output table contains a log of all changes that |
| 623 | occurred in the Pathway Live Data Framework table. In this case, it is expected to have two additional columns, |
| 624 | ``time`` and ``diff``, both of integer type. ``time`` indicates the transactional |
| 625 | minibatch time in which the row change occurred. ``diff`` can be either ``1`` for |
| 626 | row insertion or ``-1`` for row deletion. |
| 627 | |
| 628 | When using **snapshot**, the set of columns in the output table matches the set of |
| 629 | columns in the table you are writing. No additional columns are created. |
| 630 | |
| 631 | Args: |
| 632 | table: Table to be written. |
| 633 | postgres_settings: Components for the connection string for Postgres. The string is |
| 634 | formed by joining key-value pairs from the given dictionary with spaces, |
| 635 | with each pair formatted as `key=value`. Keys must be strings. Values can be |
| 636 | of any type; if a value is not a string, it will be converted using Python's |
| 637 | `str()` function. |
| 638 | The Pathway Live Data Framework injects conservative TCP-keepalive defaults (``keepalives``, |
| 639 | ``keepalives_idle=300``, ``keepalives_interval=30``, ``keepalives_count=3``, |
| 640 | and ``tcp_user_timeout=300000``) so that an unreachable |
| 641 | the Pathway Live Data Framework process is detected by PostgreSQL within minutes rather than |
| 642 | the OS-inherited ~2-hour default; any of these can be overridden by |
| 643 | passing the same key in ``postgres_settings``. |
| 644 | table_name: Name of the target table. Any PostgreSQL identifier is |
| 645 | accepted — the connector quotes the name before interpolating it |
| 646 | into generated SQL, so hyphens, mixed case, and reserved words |
| 647 | round-trip as-is. Column names in ``table`` and in ``primary_key`` |
| 648 | are quoted the same way. |
| 649 | schema_name: Name of the PostgreSQL schema that owns the target table. |
| 650 | Defaults to ``"public"``. Set this when writing to a non-default |
| 651 | schema; the name is quoted identically to ``table_name``. |
| 652 | max_batch_size: Maximum number of entries allowed to be committed within a |
| 653 | single transaction. |
| 654 | init_mode: "default": The default initialization mode; |
| 655 | "create_if_not_exists": initializes the SQL writer by creating the necessary table |
| 656 | if they do not already exist; |
| 657 | "replace": Initializes the SQL writer by replacing any existing table. |
| 658 | output_table_type: Defines how the output table manages its data. If set to ``"stream_of_changes"`` |
| 659 | (the default), the system outputs a stream of modifications to the target table. |
| 660 | This stream includes two additional integer columns: ``time``, representing the computation |
| 661 | minibatch, and ``diff``, indicating the type of change (``1`` for row addition and |
| 662 | ``-1`` for row deletion). If set to ``"snapshot"``, the table maintains the current |
nothing calls this directly
no test coverage detected