MCPcopy
hub / github.com/pathwaycom/pathway / deduplicate

Method deduplicate

python/pathway/internals/table.py:1315–1413  ·  view source on GitHub ↗

Deduplicates rows in `self` on `value` column using acceptor function. It keeps rows which where accepted by the acceptor function. Acceptor operates on two arguments - *CURRENT* value and *PREVIOUS* value. Args: value: column expression used for deduplication.

(
        self,
        *,
        value: expr.ColumnExpression | Value,
        instance: expr.ColumnExpression | None = None,
        acceptor: Callable[[T, T], bool],
        name: str | None = None,
    )

Source from the content-addressed store, hash-verified

1313 @check_arg_types
1314 @contextualized_operator
1315 def deduplicate(
1316 self,
1317 *,
1318 value: expr.ColumnExpression | Value,
1319 instance: expr.ColumnExpression | None = None,
1320 acceptor: Callable[[T, T], bool],
1321 name: str | None = None,
1322 ) -> Table:
1323 """Deduplicates rows in `self` on `value` column using acceptor function.
1324
1325 It keeps rows which where accepted by the acceptor function.
1326 Acceptor operates on two arguments - *CURRENT* value and *PREVIOUS* value.
1327
1328 Args:
1329 value: column expression used for deduplication.
1330 instance: Grouping column. For rows with different
1331 values in this column, deduplication will be performed separately.
1332 Defaults to None.
1333 acceptor: callback telling whether two values are different.
1334 name: An identifier, under which the state of the table
1335 will be persisted or ``None``, if there is no need to persist the state of this table.
1336 When a program restarts, it restores the state for all input tables according to what
1337 was saved for their ``name``. This way it's possible to configure the start of
1338 computations from the moment they were terminated last time.
1339
1340 Returns:
1341 Table: the result of deduplication.
1342
1343 Example:
1344
1345 >>> import pathway as pw
1346 >>> table = pw.debug.table_from_markdown(
1347 ... '''
1348 ... val | __time__
1349 ... 1 | 2
1350 ... 2 | 4
1351 ... 3 | 6
1352 ... 4 | 8
1353 ... '''
1354 ... )
1355 >>>
1356 >>> def acceptor(new_value, old_value) -> bool:
1357 ... return new_value >= old_value + 2
1358 ...
1359 >>>
1360 >>> result = table.deduplicate(value=pw.this.val, acceptor=acceptor)
1361 >>> pw.debug.compute_and_print_update_stream(result, include_id=False)
1362 val | __time__ | __diff__
1363 1 | 2 | 1
1364 1 | 6 | -1
1365 3 | 6 | 1
1366 >>>
1367 >>> table = pw.debug.table_from_markdown(
1368 ... '''
1369 ... val | instance | __time__
1370 ... 1 | 1 | 2
1371 ... 2 | 1 | 4
1372 ... 3 | 2 | 6

Calls 4

_validate_expressionMethod · 0.95
_evalMethod · 0.95
_table_with_contextMethod · 0.95