MCPcopy Index your code
hub / github.com/pathwaycom/pathway / read

Function read

python/pathway/io/python/__init__.py:510–618  ·  view source on GitHub ↗

Reads a table from a ConnectorSubject. Args: subject: An instance of a :py:class:`~pathway.python.ConnectorSubject`. schema: Schema of the resulting table. format: Deprecated. Pass values of proper types to ``subject``'s ``next`` instead. Format of the data p

(
    subject: ConnectorSubject,
    *,
    schema: type[Schema] | None = None,
    format: Literal["json", "raw", "binary"] | None = None,
    autocommit_duration_ms: int | None = 1500,
    debug_data=None,
    name: str | None = None,
    max_backlog_size: int | None = None,
    _stacklevel: int = 1,
    **kwargs,
)

Source from the content-addressed store, hash-verified

508@check_arg_types
509@trace_user_frame
510def read(
511 subject: ConnectorSubject,
512 *,
513 schema: type[Schema] | None = None,
514 format: Literal["json", "raw", "binary"] | None = None,
515 autocommit_duration_ms: int | None = 1500,
516 debug_data=None,
517 name: str | None = None,
518 max_backlog_size: int | None = None,
519 _stacklevel: int = 1,
520 **kwargs,
521) -> Table:
522 """Reads a table from a ConnectorSubject.
523
524 Args:
525 subject: An instance of a :py:class:`~pathway.python.ConnectorSubject`.
526 schema: Schema of the resulting table.
527 format: Deprecated. Pass values of proper types to ``subject``'s ``next`` instead.
528 Format of the data produced by a subject, "json", "raw" or "binary". In case of
529 a "raw"/"binary" format, table with single "data" column will be produced.
530 debug_data: Static data replacing original one when debug mode is active.
531 autocommit_duration_ms: the maximum time between two commits. Every
532 autocommit_duration_ms milliseconds, the updates received by the connector are
533 committed and pushed into Pathway Live Data Framework's computation graph
534 name: A unique name for the connector. If provided, this name will be used in
535 logs and monitoring dashboards. Additionally, if persistence is enabled, it
536 will be used as the name for the snapshot that stores the connector's progress.
537 max_backlog_size: Limit on the number of entries read from the input source and kept
538 in processing at any moment. Reading pauses when the limit is reached and resumes
539 as processing of some entries completes. Useful with large sources that
540 emit an initial burst of data to avoid memory spikes. **Note**: The ``next``,
541 ``next_json``, ``next_str``, and ``next_bytes`` methods of ``subject`` will
542 block when the internal queue holding events before they are sent to the
543 processing reaches ``max_backlog_size``. These methods will resume only when
544 the queue size drops below this limit.
545
546 Returns:
547 Table: The table read.
548
549 Example:
550
551 >>> import pathway as pw
552 >>> from pathway.io.python import ConnectorSubject
553 >>>
554 >>> class MySchema(pw.Schema):
555 ... a: int
556 ... b: str
557 ...
558 >>>
559 >>> class MySubject(ConnectorSubject):
560 ... def run(self) -> None:
561 ... for i in range(4):
562 ... self.next(a=i, b=f"x{i}")
563 ... @property
564 ... def _deletions_enabled(self) -> bool:
565 ... return False
566 ...
567 >>>

Callers

nothing calls this directly

Calls 4

get_data_format_typeFunction · 0.90
assert_schema_not_noneFunction · 0.90
table_from_datasourceFunction · 0.90

Tested by

no test coverage detected