MCPcopy Index your code
hub / github.com/pathwaycom/pathway / write

Function write

python/pathway/io/rabbitmq/__init__.py:252–387  ·  view source on GitHub ↗

Writes data into the specified RabbitMQ stream. The produced messages consist of the payload, corresponding to the values of the table that are serialized according to the chosen format. Two AMQP 1.0 application properties are always added: ``pathway_time`` (processing time) and ``pathw

(
    table: Table,
    uri: str,
    stream_name: str | ColumnReference,
    *,
    format: Literal["json", "plaintext", "raw"] = "json",
    value: ColumnReference | None = None,
    headers: Iterable[ColumnReference] | None = None,
    name: str | None = None,
    sort_by: Iterable[ColumnReference] | None = None,
    tls_settings: TLSSettings | None = None,
)

Source from the content-addressed store, hash-verified

250@check_arg_types
251@trace_user_frame
252def write(
253 table: Table,
254 uri: str,
255 stream_name: str | ColumnReference,
256 *,
257 format: Literal["json", "plaintext", "raw"] = "json",
258 value: ColumnReference | None = None,
259 headers: Iterable[ColumnReference] | None = None,
260 name: str | None = None,
261 sort_by: Iterable[ColumnReference] | None = None,
262 tls_settings: TLSSettings | None = None,
263) -> None:
264 """Writes data into the specified RabbitMQ stream.
265
266 The produced messages consist of the payload, corresponding to the values of the
267 table that are serialized according to the chosen format. Two AMQP 1.0 application
268 properties are always added: ``pathway_time`` (processing time) and ``pathway_diff``
269 (either ``1`` or ``-1``). If ``headers`` parameter is used, additional properties
270 can be added to the message.
271
272 There are several serialization formats supported: ``"json"``,
273 ``"plaintext"`` and ``"raw"``.
274
275 If the selected format is either ``"plaintext"`` or ``"raw"``, you also need to
276 specify which column of the table corresponds to the payload of the produced message.
277 It can be done by providing the ``value`` parameter.
278
279 Args:
280 table: The table for output.
281 uri: The URI of the RabbitMQ server with Streams enabled, e.g.
282 ``"rabbitmq-stream://guest:guest@localhost:5552"``.
283 stream_name: The RabbitMQ stream where data will be written. The stream must
284 already exist on the server. Can be a column reference for dynamic
285 routing — each row will be written to the stream named by that column's
286 value. All target streams must be pre-created.
287 format: Format in which the data is put into RabbitMQ. Currently ``"json"``,
288 ``"plaintext"`` and ``"raw"`` are supported.
289 value: Reference to the column that should be used as a payload in the produced
290 message in ``"plaintext"`` or ``"raw"`` format.
291 headers: References to the table fields that must be provided as AMQP 1.0
292 application properties (analogous to Kafka headers). Values are
293 serialized as AMQP strings using their JSON representation, following
294 the same encoding as :py:func:`pw.io.jsonlines.write` (e.g. ``42``
295 for an int, ``"\"hello\""`` for a string, ``null`` for None,
296 base64-encoded string for bytes). RabbitMQ Streams does not reliably
297 confirm messages with non-string application property values, so all
298 types are JSON-encoded. On the reader side, header values are
299 available in ``_metadata.application_properties`` (when
300 ``with_metadata=True``).
301 name: A unique name for the connector. If provided, this name will be used in
302 logs and monitoring dashboards.
303 sort_by: If specified, the output will be sorted in ascending order based on the
304 values of the given columns within each minibatch.
305 tls_settings: TLS connection settings. Use ``TLSSettings`` to configure
306 root certificates, client certificates, and verification mode.
307
308 Examples:
309

Callers

nothing calls this directly

Calls 4

_check_entitlementsFunction · 0.90
constructMethod · 0.80
itemsMethod · 0.80
toMethod · 0.80

Tested by

no test coverage detected