Writes data into the specified RabbitMQ stream. The produced messages consist of the payload, corresponding to the values of the table that are serialized according to the chosen format. Two AMQP 1.0 application properties are always added: ``pathway_time`` (processing time) and ``pathw
(
table: Table,
uri: str,
stream_name: str | ColumnReference,
*,
format: Literal["json", "plaintext", "raw"] = "json",
value: ColumnReference | None = None,
headers: Iterable[ColumnReference] | None = None,
name: str | None = None,
sort_by: Iterable[ColumnReference] | None = None,
tls_settings: TLSSettings | None = None,
)
| 250 | @check_arg_types |
| 251 | @trace_user_frame |
| 252 | def write( |
| 253 | table: Table, |
| 254 | uri: str, |
| 255 | stream_name: str | ColumnReference, |
| 256 | *, |
| 257 | format: Literal["json", "plaintext", "raw"] = "json", |
| 258 | value: ColumnReference | None = None, |
| 259 | headers: Iterable[ColumnReference] | None = None, |
| 260 | name: str | None = None, |
| 261 | sort_by: Iterable[ColumnReference] | None = None, |
| 262 | tls_settings: TLSSettings | None = None, |
| 263 | ) -> None: |
| 264 | """Writes data into the specified RabbitMQ stream. |
| 265 | |
| 266 | The produced messages consist of the payload, corresponding to the values of the |
| 267 | table that are serialized according to the chosen format. Two AMQP 1.0 application |
| 268 | properties are always added: ``pathway_time`` (processing time) and ``pathway_diff`` |
| 269 | (either ``1`` or ``-1``). If ``headers`` parameter is used, additional properties |
| 270 | can be added to the message. |
| 271 | |
| 272 | There are several serialization formats supported: ``"json"``, |
| 273 | ``"plaintext"`` and ``"raw"``. |
| 274 | |
| 275 | If the selected format is either ``"plaintext"`` or ``"raw"``, you also need to |
| 276 | specify which column of the table corresponds to the payload of the produced message. |
| 277 | It can be done by providing the ``value`` parameter. |
| 278 | |
| 279 | Args: |
| 280 | table: The table for output. |
| 281 | uri: The URI of the RabbitMQ server with Streams enabled, e.g. |
| 282 | ``"rabbitmq-stream://guest:guest@localhost:5552"``. |
| 283 | stream_name: The RabbitMQ stream where data will be written. The stream must |
| 284 | already exist on the server. Can be a column reference for dynamic |
| 285 | routing — each row will be written to the stream named by that column's |
| 286 | value. All target streams must be pre-created. |
| 287 | format: Format in which the data is put into RabbitMQ. Currently ``"json"``, |
| 288 | ``"plaintext"`` and ``"raw"`` are supported. |
| 289 | value: Reference to the column that should be used as a payload in the produced |
| 290 | message in ``"plaintext"`` or ``"raw"`` format. |
| 291 | headers: References to the table fields that must be provided as AMQP 1.0 |
| 292 | application properties (analogous to Kafka headers). Values are |
| 293 | serialized as AMQP strings using their JSON representation, following |
| 294 | the same encoding as :py:func:`pw.io.jsonlines.write` (e.g. ``42`` |
| 295 | for an int, ``"\"hello\""`` for a string, ``null`` for None, |
| 296 | base64-encoded string for bytes). RabbitMQ Streams does not reliably |
| 297 | confirm messages with non-string application property values, so all |
| 298 | types are JSON-encoded. On the reader side, header values are |
| 299 | available in ``_metadata.application_properties`` (when |
| 300 | ``with_metadata=True``). |
| 301 | name: A unique name for the connector. If provided, this name will be used in |
| 302 | logs and monitoring dashboards. |
| 303 | sort_by: If specified, the output will be sorted in ascending order based on the |
| 304 | values of the given columns within each minibatch. |
| 305 | tls_settings: TLS connection settings. Use ``TLSSettings`` to configure |
| 306 | root certificates, client certificates, and verification mode. |
| 307 | |
| 308 | Examples: |
| 309 |
nothing calls this directly
no test coverage detected