Remove old entries when they start to satisfy ``time_column <= max(time_column) - threshold``. This operator is useful for removing old entries from the stateful operators downstream (like joins, groupbys etc.). It stores the entries and when the current time (defined as max
(
self,
time_column: expr.ColumnExpression,
threshold: IntervalType,
mark_forgetting_records: bool = False,
)
| 669 | @trace_user_frame |
| 670 | @desugar |
| 671 | def forget( |
| 672 | self, |
| 673 | time_column: expr.ColumnExpression, |
| 674 | threshold: IntervalType, |
| 675 | mark_forgetting_records: bool = False, |
| 676 | ) -> Table[TSchema]: |
| 677 | """Remove old entries when they start to satisfy ``time_column <= max(time_column) - threshold``. |
| 678 | |
| 679 | This operator is useful for removing old entries from the stateful operators |
| 680 | downstream (like joins, groupbys etc.). It stores the entries and when the |
| 681 | current time (defined as max over all ``time_column`` values so far) reaches |
| 682 | their time plus ``threshold``, a deletion of entries is emitted. |
| 683 | |
| 684 | Args: |
| 685 | time_column: ``ColumnExpression`` that specifies the event time. |
| 686 | threshold: value used to determine which entries are old enough to be removed. |
| 687 | Should match the type of the ``time_column`` (``int -> int``, |
| 688 | ``float -> float``, ``datetime -> timedelta``). |
| 689 | mark_forgetting_records : If set to ``True``, Pathway Live Data Framework marks records |
| 690 | corresponding to the deletion of expired entries in a special way, |
| 691 | without changing their visible representation. |
| 692 | This flag is useful when combined with ``filter_out_results_of_forgetting``, |
| 693 | which can later remove those marked deletion records. In other words, it |
| 694 | allows you to revert the effects of forgetting at a later stage. |
| 695 | |
| 696 | Example: |
| 697 | |
| 698 | >>> import pathway as pw |
| 699 | >>> t = pw.debug.table_from_markdown( |
| 700 | ... ''' |
| 701 | ... t | v | __time__ |
| 702 | ... 1 | 1 | 2 |
| 703 | ... 2 | 1 | 2 |
| 704 | ... 4 | 2 | 4 |
| 705 | ... 3 | 3 | 6 |
| 706 | ... ''' |
| 707 | ... ) |
| 708 | >>> t_with_forgetting = t.forget(pw.this.t, 3) |
| 709 | >>> s = pw.debug.table_from_markdown( |
| 710 | ... ''' |
| 711 | ... v | a | __time__ |
| 712 | ... 1 | 1 | 2 |
| 713 | ... 2 | 2 | 4 |
| 714 | ... 1 | 3 | 8 |
| 715 | ... ''' |
| 716 | ... ) |
| 717 | >>> res = t_with_forgetting.join(s, pw.left.v == pw.right.v).select( |
| 718 | ... pw.left.t, pw.left.v, pw.right.a |
| 719 | ... ) |
| 720 | >>> pw.debug.compute_and_print_update_stream(res) |
| 721 | | t | v | a | __time__ | __diff__ |
| 722 | ^YYYD8ZW... | 1 | 1 | 1 | 2 | 1 |
| 723 | ^YYY47FZ... | 2 | 1 | 1 | 2 | 1 |
| 724 | ^Z3QTSKY... | 4 | 2 | 2 | 4 | 1 |
| 725 | ^YYYD8ZW... | 1 | 1 | 1 | 6 | -1 |
| 726 | ^YYY822X... | 2 | 1 | 3 | 8 | 1 |
| 727 | |
| 728 | The entry ``t=1,v=1`` is forgotten at the processing time 6. It gets removed from the |