Commit offsets for partitions.
(
self, offsets: Mapping[TP, int], start_new_transaction: bool = True
)
| 322 | return await fut |
| 323 | |
| 324 | async def commit( |
| 325 | self, offsets: Mapping[TP, int], start_new_transaction: bool = True |
| 326 | ) -> bool: |
| 327 | """Commit offsets for partitions.""" |
| 328 | producer = self.producer |
| 329 | group_id = self.app.conf.id |
| 330 | by_transactional_id: MutableMapping[str, MutableMapping[TP, int]] |
| 331 | by_transactional_id = defaultdict(dict) |
| 332 | |
| 333 | for tp, offset in offsets.items(): |
| 334 | group = self.app.assignor.group_for_topic(tp.topic) |
| 335 | transactional_id = f"{group_id}-{group}-{tp.partition}" |
| 336 | by_transactional_id[transactional_id][tp] = offset |
| 337 | |
| 338 | if by_transactional_id: |
| 339 | try: |
| 340 | await producer.commit_transactions( |
| 341 | by_transactional_id, |
| 342 | group_id, |
| 343 | start_new_transaction=start_new_transaction, |
| 344 | ) |
| 345 | except ProducerFenced as pf: |
| 346 | logger.warning(f"ProducerFenced {pf}") |
| 347 | await self.app.crash(pf) |
| 348 | return True |
| 349 | |
| 350 | def key_partition(self, topic: str, key: bytes) -> TP: |
| 351 | raise NotImplementedError() |
nothing calls this directly
no test coverage detected