| 238 | )(self._modify_set) |
| 239 | |
| 240 | async def _modify_set(self, stream: StreamT[SetManagerOperation]) -> None: |
| 241 | actions = self.actions |
| 242 | _maybe_model = maybe_model |
| 243 | async for set_key, set_operation in stream.items(): |
| 244 | try: |
| 245 | action = SetAction(set_operation.action) |
| 246 | except ValueError: |
| 247 | self.log.exception("Unknown set operation: %r", set_operation.action) |
| 248 | else: |
| 249 | members = [_maybe_model(m) for m in set_operation.members] |
| 250 | handler = actions[action] |
| 251 | handler(set_key, members) |
| 252 | |
| 253 | @cached_property |
| 254 | def topic(self) -> TopicT: |