Buffers the values until the condition ``time_column <= max(time_column) - threshold`` is met. This is a stateful operator. It stores the entries if their ``time_column > max(time_column) - threshold``. Otherwise the entries can pass immediately. Once the current time (defin
(
self, time_column: expr.ColumnExpression, threshold: IntervalType
)
| 919 | @trace_user_frame |
| 920 | @desugar |
| 921 | def buffer( |
| 922 | self, time_column: expr.ColumnExpression, threshold: IntervalType |
| 923 | ) -> Table[TSchema]: |
| 924 | """Buffers the values until the condition ``time_column <= max(time_column) - threshold`` is met. |
| 925 | |
| 926 | This is a stateful operator. It stores the entries if their |
| 927 | ``time_column > max(time_column) - threshold``. Otherwise the entries can pass immediately. |
| 928 | Once the current time (defined as max over all ``time_column`` values so far) advances and |
| 929 | some of the stored entries start to satisfy the condition, they are sent for further processing. |
| 930 | |
| 931 | Args: |
| 932 | time_column: ``ColumnExpression`` that specifies the event time. |
| 933 | threshold: value used to determine which entries are old enough to be sent for further processing. |
| 934 | Should match the type of the ``time_column`` (``int -> int``, |
| 935 | ``float -> float``, ``datetime -> timedelta``). |
| 936 | |
| 937 | Example: |
| 938 | |
| 939 | >>> import pathway as pw |
| 940 | >>> t = pw.debug.table_from_markdown( |
| 941 | ... ''' |
| 942 | ... t | v | __time__ |
| 943 | ... 1 | 1 | 2 |
| 944 | ... 2 | 2 | 4 |
| 945 | ... 5 | 3 | 6 |
| 946 | ... 2 | 4 | 8 |
| 947 | ... 7 | 5 | 10 |
| 948 | ... ''' |
| 949 | ... ) |
| 950 | >>> res = t.buffer(pw.this.t, 3) |
| 951 | >>> pw.debug.compute_and_print_update_stream(res) |
| 952 | | t | v | __time__ | __diff__ |
| 953 | ^X1MXHYY... | 1 | 1 | 6 | 1 |
| 954 | ^YYY4HAB... | 2 | 2 | 6 | 1 |
| 955 | ^3CZ78B4... | 2 | 4 | 8 | 1 |
| 956 | ^Z3QWT29... | 5 | 3 | 18446744073709551614 | 1 |
| 957 | ^3HN31E1... | 7 | 5 | 18446744073709551614 | 1 |
| 958 | |
| 959 | The values of processing time for rows with event time 5, 7 are equal |
| 960 | to 18446744073709551614 because there's no more input and they are released |
| 961 | only at the end of the processing. 18446744073709551614 is the maximum |
| 962 | possible time. |
| 963 | """ |
| 964 | return self._buffer(time_column + threshold, time_column) |
| 965 | |
| 966 | @trace_user_frame |
| 967 | @desugar |