MCPcopy
hub / github.com/pathwaycom/pathway / to_stream

Method to_stream

python/pathway/internals/table.py:2857–2905  ·  view source on GitHub ↗

Converts a table to a stream of changes. If in a given batch there is: - an insert or an update for a given key, a row with ``True`` in the ``update_column_name`` column is produced - a delete for a given key, a row with ``False`` in the ``update_column_name`` column

(self, upsert_column_name: str = "is_upsert")

Source from the content-addressed store, hash-verified

2855 @contextualized_operator
2856 @check_arg_types
2857 def to_stream(self, upsert_column_name: str = "is_upsert") -> Table:
2858 """Converts a table to a stream of changes.
2859
2860 If in a given batch there is:
2861 - an insert or an update for a given key, a row with ``True`` in the
2862 ``update_column_name`` column is produced
2863 - a delete for a given key, a row with ``False`` in the ``update_column_name`` column is produced.
2864
2865 The values in all other columns are kept. This is a stateless operation.
2866
2867 Args:
2868 upsert_column_name: name of the boolean column that will be added to the table
2869 and contain information about the type of action.
2870
2871 Returns:
2872 Table: An append only table with an additional column informing about the action type.
2873
2874 Example:
2875
2876 >>> import pathway as pw
2877 >>> t1 = pw.debug.table_from_markdown('''
2878 ... id | age | owner | pet | __time__ | __diff__
2879 ... 1 | 10 | Alice | dog | 2 | 1
2880 ... 2 | 9 | Bob | cat | 2 | 1
2881 ... 1 | 10 | Alice | dog | 4 | -1
2882 ... 1 | 11 | Alice | dog | 4 | 1
2883 ... 2 | 9 | Bob | cat | 4 | -1
2884 ... 2 | 10 | Bob | cat | 4 | 1
2885 ... 1 | 11 | Alice | dog | 6 | -1
2886 ... 1 | 12 | Alice | dog | 6 | 1
2887 ... 2 | 10 | Bob | cat | 6 | -1
2888 ... ''')
2889 >>> t2 = t1.to_stream()
2890 >>> pw.debug.compute_and_print_update_stream(t2, include_id=False)
2891 age | owner | pet | is_upsert | __time__ | __diff__
2892 9 | Bob | cat | True | 2 | 1
2893 10 | Alice | dog | True | 2 | 1
2894 10 | Bob | cat | True | 4 | 1
2895 11 | Alice | dog | True | 4 | 1
2896 10 | Bob | cat | False | 6 | 1
2897 12 | Alice | dog | True | 6 | 1
2898 """
2899 context = clmn.TableToStreamContext(self._id_column)
2900 columns = {
2901 name: self._wrap_column_in_context(context, column, name)
2902 for name, column in self._columns.items()
2903 }
2904 columns[upsert_column_name] = context.is_upsert_column
2905 return Table(_columns=columns, _context=context)
2906
2907 @trace_user_frame
2908 @desugar

Callers 3

add_update_timestamp_utcFunction · 0.80
test_table_to_streamFunction · 0.80
test_to_streamFunction · 0.80

Calls 3

TableClass · 0.85
itemsMethod · 0.80

Tested by 2

test_table_to_streamFunction · 0.64
test_to_streamFunction · 0.64