Call when stream is done processing an event.
(
self,
tp: TP,
offset: int,
stream: StreamT,
event: EventT,
state: typing.Dict = None,
)
| 368 | ) |
| 369 | |
| 370 | def on_stream_event_out( |
| 371 | self, |
| 372 | tp: TP, |
| 373 | offset: int, |
| 374 | stream: StreamT, |
| 375 | event: EventT, |
| 376 | state: typing.Dict = None, |
| 377 | ) -> None: |
| 378 | """Call when stream is done processing an event.""" |
| 379 | super().on_stream_event_out(tp, offset, stream, event, state) |
| 380 | self._metrics.total_active_events.dec() |
| 381 | if state is not None: |
| 382 | self._metrics.events_runtime_latency.observe( |
| 383 | self.secs_to_ms(self.events_runtime[-1]) |
| 384 | ) |
| 385 | |
| 386 | def on_message_out(self, tp: TP, offset: int, message: Message) -> None: |
| 387 | """Call when message is fully acknowledged and can be committed.""" |
nothing calls this directly
no test coverage detected