MCPcopy
hub / github.com/faust-streaming/faust / commit

Method commit

faust/transport/consumer.py:324–348  ·  view source on GitHub ↗

Commit offsets for partitions.

(
        self, offsets: Mapping[TP, int], start_new_transaction: bool = True
    )

Source from the content-addressed store, hash-verified

322 return await fut
323
324 async def commit(
325 self, offsets: Mapping[TP, int], start_new_transaction: bool = True
326 ) -> bool:
327 """Commit offsets for partitions."""
328 producer = self.producer
329 group_id = self.app.conf.id
330 by_transactional_id: MutableMapping[str, MutableMapping[TP, int]]
331 by_transactional_id = defaultdict(dict)
332
333 for tp, offset in offsets.items():
334 group = self.app.assignor.group_for_topic(tp.topic)
335 transactional_id = f"{group_id}-{group}-{tp.partition}"
336 by_transactional_id[transactional_id][tp] = offset
337
338 if by_transactional_id:
339 try:
340 await producer.commit_transactions(
341 by_transactional_id,
342 group_id,
343 start_new_transaction=start_new_transaction,
344 )
345 except ProducerFenced as pf:
346 logger.warning(f"ProducerFenced {pf}")
347 await self.app.crash(pf)
348 return True
349
350 def key_partition(self, topic: str, key: bytes) -> TP:
351 raise NotImplementedError()

Callers

nothing calls this directly

Calls 3

itemsMethod · 0.45
group_for_topicMethod · 0.45
commit_transactionsMethod · 0.45

Tested by

no test coverage detected