(
updates: CapturedStream, *, terminate_on_error: bool = True
)
| 195 | |
| 196 | |
| 197 | def squash_updates( |
| 198 | updates: CapturedStream, *, terminate_on_error: bool = True |
| 199 | ) -> CapturedTable: |
| 200 | state: CapturedTable = {} |
| 201 | updates.sort(key=lambda row: (row.time, row.diff)) |
| 202 | |
| 203 | def handle_error(row: DataRow, msg: str): |
| 204 | if terminate_on_error: |
| 205 | raise KeyError(msg) |
| 206 | else: |
| 207 | warnings.warn(msg) |
| 208 | t: tuple[Value, ...] = (ERROR,) * len(row.values) |
| 209 | state[row.key] = t |
| 210 | |
| 211 | for row in updates: |
| 212 | if row.diff == 1: |
| 213 | if row.key in state: |
| 214 | handle_error(row, f"duplicated entries for key {row.key}") |
| 215 | continue |
| 216 | state[row.key] = tuple(row.values) |
| 217 | elif row.diff == -1: |
| 218 | if state[row.key] != tuple(row.values): |
| 219 | handle_error(row, f"deleting non-existing entry {row.values}") |
| 220 | continue |
| 221 | del state[row.key] |
| 222 | else: |
| 223 | handle_error(row, f"invalid diff value: {row.diff}") |
| 224 | continue |
| 225 | |
| 226 | return state |
| 227 | |
| 228 | |
| 229 | class PyObjectWrapperSerializerProtocol(Protocol): |
nothing calls this directly
no test coverage detected