MCPcopy
hub / github.com/pathwaycom/pathway / forget

Method forget

python/pathway/internals/table.py:671–755  ·  view source on GitHub ↗

Remove old entries when they start to satisfy ``time_column <= max(time_column) - threshold``. This operator is useful for removing old entries from the stateful operators downstream (like joins, groupbys etc.). It stores the entries and when the current time (defined as max

(
        self,
        time_column: expr.ColumnExpression,
        threshold: IntervalType,
        mark_forgetting_records: bool = False,
    )

Source from the content-addressed store, hash-verified

669 @trace_user_frame
670 @desugar
671 def forget(
672 self,
673 time_column: expr.ColumnExpression,
674 threshold: IntervalType,
675 mark_forgetting_records: bool = False,
676 ) -> Table[TSchema]:
677 """Remove old entries when they start to satisfy ``time_column <= max(time_column) - threshold``.
678
679 This operator is useful for removing old entries from the stateful operators
680 downstream (like joins, groupbys etc.). It stores the entries and when the
681 current time (defined as max over all ``time_column`` values so far) reaches
682 their time plus ``threshold``, a deletion of entries is emitted.
683
684 Args:
685 time_column: ``ColumnExpression`` that specifies the event time.
686 threshold: value used to determine which entries are old enough to be removed.
687 Should match the type of the ``time_column`` (``int -> int``,
688 ``float -> float``, ``datetime -> timedelta``).
689 mark_forgetting_records : If set to ``True``, Pathway Live Data Framework marks records
690 corresponding to the deletion of expired entries in a special way,
691 without changing their visible representation.
692 This flag is useful when combined with ``filter_out_results_of_forgetting``,
693 which can later remove those marked deletion records. In other words, it
694 allows you to revert the effects of forgetting at a later stage.
695
696 Example:
697
698 >>> import pathway as pw
699 >>> t = pw.debug.table_from_markdown(
700 ... ''&#x27;
701 ... t | v | __time__
702 ... 1 | 1 | 2
703 ... 2 | 1 | 2
704 ... 4 | 2 | 4
705 ... 3 | 3 | 6
706 ... ''&#x27;
707 ... )
708 >>> t_with_forgetting = t.forget(pw.this.t, 3)
709 >>> s = pw.debug.table_from_markdown(
710 ... ''&#x27;
711 ... v | a | __time__
712 ... 1 | 1 | 2
713 ... 2 | 2 | 4
714 ... 1 | 3 | 8
715 ... ''&#x27;
716 ... )
717 >>> res = t_with_forgetting.join(s, pw.left.v == pw.right.v).select(
718 ... pw.left.t, pw.left.v, pw.right.a
719 ... )
720 >>> pw.debug.compute_and_print_update_stream(res)
721 | t | v | a | __time__ | __diff__
722 ^YYYD8ZW... | 1 | 1 | 1 | 2 | 1
723 ^YYY47FZ... | 2 | 1 | 1 | 2 | 1
724 ^Z3QTSKY... | 4 | 2 | 2 | 4 | 1
725 ^YYYD8ZW... | 1 | 1 | 1 | 6 | -1
726 ^YYY822X... | 2 | 1 | 3 | 8 | 1
727
728 The entry ``t=1,v=1`` is forgotten at the processing time 6. It gets removed from the

Callers 4

runMethod · 0.45
test_forget_no_instanceFunction · 0.45

Calls 1

_forgetMethod · 0.95