MCPcopy
hub / github.com/pathwaycom/pathway / _build_groups

Function _build_groups

python/pathway/stdlib/temporal/_asof_join.py:40–103  ·  view source on GitHub ↗

Inputs: - t: ordered table - key: tuple where the last element indicate the group - next/prev pointers - dir_next: boolean Outputs a table with the same number elements with: - peer: next if dir_next else prev - peer_key: t(peer).key

(t: pw.Table, dir_next: bool)

Source from the content-addressed store, hash-verified

38
39
40def _build_groups(t: pw.Table, dir_next: bool) -> pw.Table:
41 """
42 Inputs:
43 - t: ordered table
44 - key: tuple where the last element indicate the group
45 - next/prev pointers
46 - dir_next: boolean
47
48 Outputs a table with the same number elements with:
49 - peer: next if dir_next else prev
50 - peer_key: t(peer).key
51 - peer_same: id of next/prev element in the table with the same group
52 - peer_diff: id of next/prev element in the table with a different group
53 """
54
55 def proc(cur_id, cur, peer_id, peer) -> pw.Pointer:
56 if peer is None:
57 return cur_id
58 if cur[1] != peer[1]: # check if the same side of a join
59 return cur_id
60 return peer_id
61
62 succ_table = t.select(
63 orig_id=t.orig_id,
64 key=t.key,
65 peer=t.next if dir_next else t.prev,
66 )
67
68 succ_table += succ_table.select(
69 peer_key=succ_table.ix(succ_table.peer, optional=True).key,
70 )
71 succ_table += succ_table.select(
72 group_repr=pw.apply(
73 proc,
74 succ_table.id,
75 succ_table.key,
76 succ_table.peer,
77 succ_table.peer_key,
78 )
79 )
80
81 def merge_ccs(data):
82 data = data.with_columns(data.ix(data.group_repr).group_repr)
83 return data
84
85 group_table = pw.iterate(merge_ccs, data=succ_table)
86 # At the end of the iterative merge_ccs, we have:
87 # group_repr = last element of each consecutive group with the same `key`
88 # We want to compute two things:
89 # - `next_same`: the next element with the same key
90 # - `next_diff`: the next element with a different key
91 # To do so,
92 # let reprs = elements which are the last elements of each consecutive group
93 # next_diff(x) = group_repr(x).peer
94 # next_same(x) = is_repr ? next_diff(x).peer : x.peer
95 #
96
97 reprs = group_table.filter(group_table.id == group_table.group_repr)

Callers 1

_mergeMethod · 0.85

Calls 4

applyMethod · 0.80
selectMethod · 0.45
ixMethod · 0.45
filterMethod · 0.45

Tested by

no test coverage detected