Converts a table to a stream of changes. If in a given batch there is: - an insert or an update for a given key, a row with ``True`` in the ``update_column_name`` column is produced - a delete for a given key, a row with ``False`` in the ``update_column_name`` column
(self, upsert_column_name: str = "is_upsert")
| 2855 | @contextualized_operator |
| 2856 | @check_arg_types |
| 2857 | def to_stream(self, upsert_column_name: str = "is_upsert") -> Table: |
| 2858 | """Converts a table to a stream of changes. |
| 2859 | |
| 2860 | If in a given batch there is: |
| 2861 | - an insert or an update for a given key, a row with ``True`` in the |
| 2862 | ``update_column_name`` column is produced |
| 2863 | - a delete for a given key, a row with ``False`` in the ``update_column_name`` column is produced. |
| 2864 | |
| 2865 | The values in all other columns are kept. This is a stateless operation. |
| 2866 | |
| 2867 | Args: |
| 2868 | upsert_column_name: name of the boolean column that will be added to the table |
| 2869 | and contain information about the type of action. |
| 2870 | |
| 2871 | Returns: |
| 2872 | Table: An append only table with an additional column informing about the action type. |
| 2873 | |
| 2874 | Example: |
| 2875 | |
| 2876 | >>> import pathway as pw |
| 2877 | >>> t1 = pw.debug.table_from_markdown(''' |
| 2878 | ... id | age | owner | pet | __time__ | __diff__ |
| 2879 | ... 1 | 10 | Alice | dog | 2 | 1 |
| 2880 | ... 2 | 9 | Bob | cat | 2 | 1 |
| 2881 | ... 1 | 10 | Alice | dog | 4 | -1 |
| 2882 | ... 1 | 11 | Alice | dog | 4 | 1 |
| 2883 | ... 2 | 9 | Bob | cat | 4 | -1 |
| 2884 | ... 2 | 10 | Bob | cat | 4 | 1 |
| 2885 | ... 1 | 11 | Alice | dog | 6 | -1 |
| 2886 | ... 1 | 12 | Alice | dog | 6 | 1 |
| 2887 | ... 2 | 10 | Bob | cat | 6 | -1 |
| 2888 | ... ''') |
| 2889 | >>> t2 = t1.to_stream() |
| 2890 | >>> pw.debug.compute_and_print_update_stream(t2, include_id=False) |
| 2891 | age | owner | pet | is_upsert | __time__ | __diff__ |
| 2892 | 9 | Bob | cat | True | 2 | 1 |
| 2893 | 10 | Alice | dog | True | 2 | 1 |
| 2894 | 10 | Bob | cat | True | 4 | 1 |
| 2895 | 11 | Alice | dog | True | 4 | 1 |
| 2896 | 10 | Bob | cat | False | 6 | 1 |
| 2897 | 12 | Alice | dog | True | 6 | 1 |
| 2898 | """ |
| 2899 | context = clmn.TableToStreamContext(self._id_column) |
| 2900 | columns = { |
| 2901 | name: self._wrap_column_in_context(context, column, name) |
| 2902 | for name, column in self._columns.items() |
| 2903 | } |
| 2904 | columns[upsert_column_name] = context.is_upsert_column |
| 2905 | return Table(_columns=columns, _context=context) |
| 2906 | |
| 2907 | @trace_user_frame |
| 2908 | @desugar |