Streams ``table`` into an `AWS Kinesis stream `_. The connection settings are retrieved from the environment. Args: table: The table to write. stream_name: The Kinesis stream where data will be writte
(
table: Table,
stream_name: str | ColumnReference,
*,
format: Literal["raw", "plaintext", "json"] = "json",
partition_key: ColumnReference | None = None,
data: ColumnReference | None = None,
name: str | None = None,
sort_by: Iterable[ColumnReference] | None = None,
)
| 178 | @check_arg_types |
| 179 | @trace_user_frame |
| 180 | def write( |
| 181 | table: Table, |
| 182 | stream_name: str | ColumnReference, |
| 183 | *, |
| 184 | format: Literal["raw", "plaintext", "json"] = "json", |
| 185 | partition_key: ColumnReference | None = None, |
| 186 | data: ColumnReference | None = None, |
| 187 | name: str | None = None, |
| 188 | sort_by: Iterable[ColumnReference] | None = None, |
| 189 | ) -> None: |
| 190 | """ |
| 191 | Streams ``table`` into an |
| 192 | `AWS Kinesis stream <https://docs.aws.amazon.com/streams/latest/dev/introduction.html>`_. |
| 193 | The connection settings are retrieved from the environment. |
| 194 | |
| 195 | Args: |
| 196 | table: The table to write. |
| 197 | stream_name: The Kinesis stream where data will be written. This can be a |
| 198 | specific stream name or a reference to a column whose values will be used |
| 199 | as the stream for each message. If using a column reference, the column must |
| 200 | contain string values. |
| 201 | format: Format in which the data is put into Kinesis. Currently ``"json"``, |
| 202 | ``"plaintext"``, and ``"raw"`` are supported. If the ``"raw"`` format is selected, |
| 203 | ``table`` must either contain exactly one binary column that will be dumped as it is into the |
| 204 | Kinesis record, or the reference to the target binary column must be specified explicitly |
| 205 | in the ``data`` parameter. Similarly, if ``"plaintext"`` is chosen, the table must consist |
| 206 | of a single column of the string type, or the reference to the target string column |
| 207 | must be specified explicitly in the ``data`` parameter. |
| 208 | partition_key: Reference to the column used as the partition key in the produced message. |
| 209 | It can have any data type, because if it is not a string, the Pathway Live Data Framework will obtain its |
| 210 | string representation and use that value. Note that the maximum length of a partition key |
| 211 | in Kinesis is 256 bytes. If the key is not specified, internal row key will be used. |
| 212 | data: Reference to the column that should be used as data in the produced message in |
| 213 | ``"plaintext"`` or ``"raw"`` format. It can be deduced automatically if the table |
| 214 | has exactly one column. Otherwise it must be specified directly. It also has to be |
| 215 | explicitly specified, if ``partition_key`` is set. The type of the column must |
| 216 | correspond to the format used: ``str`` for the ``"plaintext"`` format and ``binary`` |
| 217 | for the ``"raw"`` format. Note that the maximum length of one message payload in |
| 218 | Kinesis is 1 MiB. |
| 219 | name: A unique name for the connector. If provided, this name will be used in |
| 220 | logs and monitoring dashboards. |
| 221 | sort_by: If specified, the output will be sorted in ascending order based on the |
| 222 | values of the given columns within each minibatch. When multiple columns are provided, |
| 223 | the corresponding value tuples will be compared lexicographically. |
| 224 | |
| 225 | Returns: |
| 226 | None |
| 227 | |
| 228 | Example: |
| 229 | |
| 230 | The local test setup is the same as for the case of the input connector: you need |
| 231 | a way to run Kinesis on your machine. You can use the Docker image |
| 232 | `instructure/kinesalite <https://hub.docker.com/r/instructure/kinesalite/>`_ to spawn |
| 233 | it. |
| 234 | |
| 235 | Start the container as follows: |
| 236 | |
| 237 | .. code-block:: bash |