MCPcopy
hub / github.com/pathwaycom/pathway / stream_to_table

Method stream_to_table

python/pathway/internals/table.py:2911–2962  ·  view source on GitHub ↗

Converts a stream of changes (updates and deletions) into a table. In the Pathway Live Data Framework, a stream is a sequence of row changes, where each row has an id and a boolean column (e.g., "is_upsert") indicating whether the row is an update (``True``) or a de

(self, is_upsert: expr.ColumnExpression)

Source from the content-addressed store, hash-verified

2909 @check_arg_types
2910 @contextualized_operator
2911 def stream_to_table(self, is_upsert: expr.ColumnExpression) -> Table[TSchema]:
2912 """
2913 Converts a stream of changes (updates and deletions) into a table.
2914
2915 In the Pathway Live Data Framework, a stream is a sequence of row changes, where each row
2916 has an id and a boolean column
2917 (e.g., "is_upsert") indicating whether the row is an update (``True``) or a deletion (``False``).
2918
2919 This method reconstructs the current state of the table from such a stream by applying the updates
2920 and deletions in order. It is a stateful operation: the operator keeps track of the latest value for each id.
2921 If there are multiple events for a single id in a single batch in a stream, the order of applying
2922 the actions is not specified.
2923 For deletions, only ids are important. The values in columns are ignored.
2924
2925 Args:
2926 is_upsert: An expression that evaluates to a boolean value. ``True`` means the row
2927 is an upsert (insert or update), ``False`` means the row is a deletion.
2928
2929 Returns:
2930 Table: A table with the same columns as the original stream, representing the current state.
2931
2932 Example:
2933
2934 >>> import pathway as pw
2935 >>> t1 = pw.debug.table_from_markdown(
2936 ... '''
2937 ... id | pet | age | is_upsert | __time__
2938 ... 1 | cat | 3 | True | 2
2939 ... 2 | dog | 11 | True | 2
2940 ... 1 | cat | 4 | True | 4
2941 ... 2 | dog | 0 | False | 4
2942 ... '''
2943 ... )
2944 >>> t2 = t1.stream_to_table(pw.this.is_upsert)
2945 >>> pw.debug.compute_and_print_update_stream(t2, include_id=False)
2946 pet | age | is_upsert | __time__ | __diff__
2947 cat | 3 | True | 2 | 1
2948 dog | 11 | True | 2 | 1
2949 cat | 3 | True | 4 | -1
2950 dog | 11 | True | 4 | -1
2951 cat | 4 | True | 4 | 1
2952 """
2953 is_upsert_type = self.eval_type(is_upsert)
2954 if is_upsert_type != dt.BOOL:
2955 raise TypeError(
2956 f"Expected 'is_upsert' to be of type 'bool', got '{is_upsert_type.typehint}'"
2957 )
2958 self._validate_expression(is_upsert)
2959 is_upsert_column = self._eval(is_upsert)
2960 assert self._universe == is_upsert_column.universe
2961 context = clmn.StreamToTableContext(self._id_column, is_upsert_column)
2962 return self._table_with_context(context)
2963
2964 @trace_user_frame
2965 @contextualized_operator

Callers 6

runMethod · 0.80
add_update_timestamp_utcFunction · 0.80
test_table_to_streamFunction · 0.80
test_stream_to_tableFunction · 0.80
logicFunction · 0.80
test_stream_to_tableFunction · 0.80

Calls 4

eval_typeMethod · 0.95
_validate_expressionMethod · 0.95
_evalMethod · 0.95
_table_with_contextMethod · 0.95

Tested by 4

test_table_to_streamFunction · 0.64
test_stream_to_tableFunction · 0.64
logicFunction · 0.64
test_stream_to_tableFunction · 0.64