Filter out entries that satisfy ``time_column <= max(time_column) - threshold``. In contrast to ``forget``, this operator doesn't store the entries. It just checks if the entries match the condition and, if they do, allows them to pass. The only value stored by this operator
(
self, time_column: expr.ColumnExpression, threshold: IntervalType
)
| 850 | @trace_user_frame |
| 851 | @desugar |
| 852 | def ignore_late( |
| 853 | self, time_column: expr.ColumnExpression, threshold: IntervalType |
| 854 | ) -> Table[TSchema]: |
| 855 | """Filter out entries that satisfy ``time_column <= max(time_column) - threshold``. |
| 856 | |
| 857 | In contrast to ``forget``, this operator doesn't store the entries. It just checks |
| 858 | if the entries match the condition and, if they do, allows them to pass. The only |
| 859 | value stored by this operator is the current time (defined as max over all |
| 860 | ``time_column`` values so far). |
| 861 | |
| 862 | Please note that if the table is non-append-only and there's a difference in |
| 863 | processing time between an insertion and a deletion for some key, the insertion |
| 864 | may pass through but the deletion may be filtered out. It'll happen if the max |
| 865 | value in ``time_column`` advanced between the insertion and deletion and the insertion |
| 866 | didn't satisfy the filtering-out criterion but the deletion did. |
| 867 | |
| 868 | Args: |
| 869 | time_column: ``ColumnExpression`` that specifies the event time. |
| 870 | threshold: value used to determine which entries should be filtered out. |
| 871 | Should match the type of the ``time_column`` (``int -> int``, |
| 872 | ``float -> float``, ``datetime -> timedelta``). |
| 873 | |
| 874 | Example: |
| 875 | |
| 876 | >>> import pathway as pw |
| 877 | >>> t = pw.debug.table_from_markdown( |
| 878 | ... ''' |
| 879 | ... t | v | __time__ |
| 880 | ... 1 | 1 | 2 |
| 881 | ... 2 | 2 | 4 |
| 882 | ... 5 | 3 | 6 |
| 883 | ... 2 | 4 | 8 |
| 884 | ... 7 | 5 | 10 |
| 885 | ... ''' |
| 886 | ... ) |
| 887 | >>> res = t.ignore_late(pw.this.t, 3) |
| 888 | >>> pw.debug.compute_and_print_update_stream(res) |
| 889 | | t | v | __time__ | __diff__ |
| 890 | ^X1MXHYY... | 1 | 1 | 2 | 1 |
| 891 | ^YYY4HAB... | 2 | 2 | 4 | 1 |
| 892 | ^Z3QWT29... | 5 | 3 | 6 | 1 |
| 893 | ^3HN31E1... | 7 | 5 | 10 | 1 |
| 894 | """ |
| 895 | return self._freeze(time_column + threshold, time_column) |
| 896 | |
| 897 | @trace_user_frame |
| 898 | @desugar |