Call when stream is done processing an event.
(
self, tp: TP, offset: int, stream: StreamT, event: EventT, state: Dict = None
)
| 99 | ) |
| 100 | |
| 101 | def on_stream_event_out( |
| 102 | self, tp: TP, offset: int, stream: StreamT, event: EventT, state: Dict = None |
| 103 | ) -> None: |
| 104 | """Call when stream is done processing an event.""" |
| 105 | super().on_stream_event_out(tp, offset, stream, event, state) |
| 106 | self.client.decr("events_active", rate=self.rate) |
| 107 | if state is not None: |
| 108 | self.client.timing( |
| 109 | "events_runtime", |
| 110 | self.secs_to_ms(self.events_runtime[-1]), |
| 111 | rate=self.rate, |
| 112 | ) |
| 113 | |
| 114 | def on_message_out(self, tp: TP, offset: int, message: Message) -> None: |
| 115 | """Call when message is fully acknowledged and can be committed.""" |
nothing calls this directly
no test coverage detected