MCPcopy
hub / github.com/pathwaycom/pathway / ignore_late

Method ignore_late

python/pathway/internals/table.py:852–895  ·  view source on GitHub ↗

Filter out entries that satisfy ``time_column <= max(time_column) - threshold``. In contrast to ``forget``, this operator doesn't store the entries. It just checks if the entries match the condition and, if they do, allows them to pass. The only value stored by this operator

(
        self, time_column: expr.ColumnExpression, threshold: IntervalType
    )

Source from the content-addressed store, hash-verified

850 @trace_user_frame
851 @desugar
852 def ignore_late(
853 self, time_column: expr.ColumnExpression, threshold: IntervalType
854 ) -> Table[TSchema]:
855 """Filter out entries that satisfy ``time_column <= max(time_column) - threshold``.
856
857 In contrast to ``forget``, this operator doesn&#x27;t store the entries. It just checks
858 if the entries match the condition and, if they do, allows them to pass. The only
859 value stored by this operator is the current time (defined as max over all
860 ``time_column`` values so far).
861
862 Please note that if the table is non-append-only and there&#x27;s a difference in
863 processing time between an insertion and a deletion for some key, the insertion
864 may pass through but the deletion may be filtered out. It&#x27;ll happen if the max
865 value in ``time_column`` advanced between the insertion and deletion and the insertion
866 didn&#x27;t satisfy the filtering-out criterion but the deletion did.
867
868 Args:
869 time_column: ``ColumnExpression`` that specifies the event time.
870 threshold: value used to determine which entries should be filtered out.
871 Should match the type of the ``time_column`` (``int -> int``,
872 ``float -> float``, ``datetime -> timedelta``).
873
874 Example:
875
876 >>> import pathway as pw
877 >>> t = pw.debug.table_from_markdown(
878 ... ''&#x27;
879 ... t | v | __time__
880 ... 1 | 1 | 2
881 ... 2 | 2 | 4
882 ... 5 | 3 | 6
883 ... 2 | 4 | 8
884 ... 7 | 5 | 10
885 ... ''&#x27;
886 ... )
887 >>> res = t.ignore_late(pw.this.t, 3)
888 >>> pw.debug.compute_and_print_update_stream(res)
889 | t | v | __time__ | __diff__
890 ^X1MXHYY... | 1 | 1 | 2 | 1
891 ^YYY4HAB... | 2 | 2 | 4 | 1
892 ^Z3QWT29... | 5 | 3 | 6 | 1
893 ^3HN31E1... | 7 | 5 | 10 | 1
894 """
895 return self._freeze(time_column + threshold, time_column)
896
897 @trace_user_frame
898 @desugar

Callers 1

test_freeze_no_instanceFunction · 0.80

Calls 1

_freezeMethod · 0.95

Tested by 1

test_freeze_no_instanceFunction · 0.64