MCPcopy
hub / github.com/pathwaycom/pathway / squash_updates

Function squash_updates

python/pathway/internals/api.py:197–226  ·  view source on GitHub ↗
(
    updates: CapturedStream, *, terminate_on_error: bool = True
)

Source from the content-addressed store, hash-verified

195
196
197def squash_updates(
198 updates: CapturedStream, *, terminate_on_error: bool = True
199) -> CapturedTable:
200 state: CapturedTable = {}
201 updates.sort(key=lambda row: (row.time, row.diff))
202
203 def handle_error(row: DataRow, msg: str):
204 if terminate_on_error:
205 raise KeyError(msg)
206 else:
207 warnings.warn(msg)
208 t: tuple[Value, ...] = (ERROR,) * len(row.values)
209 state[row.key] = t
210
211 for row in updates:
212 if row.diff == 1:
213 if row.key in state:
214 handle_error(row, f"duplicated entries for key {row.key}")
215 continue
216 state[row.key] = tuple(row.values)
217 elif row.diff == -1:
218 if state[row.key] != tuple(row.values):
219 handle_error(row, f"deleting non-existing entry {row.values}")
220 continue
221 del state[row.key]
222 else:
223 handle_error(row, f"invalid diff value: {row.diff}")
224 continue
225
226 return state
227
228
229class PyObjectWrapperSerializerProtocol(Protocol):

Callers

nothing calls this directly

Calls 3

handle_errorFunction · 0.85
tupleFunction · 0.85
sortMethod · 0.80

Tested by

no test coverage detected