Converts a stream of changes (updates and deletions) into a table. In the Pathway Live Data Framework, a stream is a sequence of row changes, where each row has an id and a boolean column (e.g., "is_upsert") indicating whether the row is an update (``True``) or a de
(self, is_upsert: expr.ColumnExpression)
| 2909 | @check_arg_types |
| 2910 | @contextualized_operator |
| 2911 | def stream_to_table(self, is_upsert: expr.ColumnExpression) -> Table[TSchema]: |
| 2912 | """ |
| 2913 | Converts a stream of changes (updates and deletions) into a table. |
| 2914 | |
| 2915 | In the Pathway Live Data Framework, a stream is a sequence of row changes, where each row |
| 2916 | has an id and a boolean column |
| 2917 | (e.g., "is_upsert") indicating whether the row is an update (``True``) or a deletion (``False``). |
| 2918 | |
| 2919 | This method reconstructs the current state of the table from such a stream by applying the updates |
| 2920 | and deletions in order. It is a stateful operation: the operator keeps track of the latest value for each id. |
| 2921 | If there are multiple events for a single id in a single batch in a stream, the order of applying |
| 2922 | the actions is not specified. |
| 2923 | For deletions, only ids are important. The values in columns are ignored. |
| 2924 | |
| 2925 | Args: |
| 2926 | is_upsert: An expression that evaluates to a boolean value. ``True`` means the row |
| 2927 | is an upsert (insert or update), ``False`` means the row is a deletion. |
| 2928 | |
| 2929 | Returns: |
| 2930 | Table: A table with the same columns as the original stream, representing the current state. |
| 2931 | |
| 2932 | Example: |
| 2933 | |
| 2934 | >>> import pathway as pw |
| 2935 | >>> t1 = pw.debug.table_from_markdown( |
| 2936 | ... ''' |
| 2937 | ... id | pet | age | is_upsert | __time__ |
| 2938 | ... 1 | cat | 3 | True | 2 |
| 2939 | ... 2 | dog | 11 | True | 2 |
| 2940 | ... 1 | cat | 4 | True | 4 |
| 2941 | ... 2 | dog | 0 | False | 4 |
| 2942 | ... ''' |
| 2943 | ... ) |
| 2944 | >>> t2 = t1.stream_to_table(pw.this.is_upsert) |
| 2945 | >>> pw.debug.compute_and_print_update_stream(t2, include_id=False) |
| 2946 | pet | age | is_upsert | __time__ | __diff__ |
| 2947 | cat | 3 | True | 2 | 1 |
| 2948 | dog | 11 | True | 2 | 1 |
| 2949 | cat | 3 | True | 4 | -1 |
| 2950 | dog | 11 | True | 4 | -1 |
| 2951 | cat | 4 | True | 4 | 1 |
| 2952 | """ |
| 2953 | is_upsert_type = self.eval_type(is_upsert) |
| 2954 | if is_upsert_type != dt.BOOL: |
| 2955 | raise TypeError( |
| 2956 | f"Expected 'is_upsert' to be of type 'bool', got '{is_upsert_type.typehint}'" |
| 2957 | ) |
| 2958 | self._validate_expression(is_upsert) |
| 2959 | is_upsert_column = self._eval(is_upsert) |
| 2960 | assert self._universe == is_upsert_column.universe |
| 2961 | context = clmn.StreamToTableContext(self._id_column, is_upsert_column) |
| 2962 | return self._table_with_context(context) |
| 2963 | |
| 2964 | @trace_user_frame |
| 2965 | @contextualized_operator |