Receives batches of occurrences. This function will take the batch and group them together by fingerprint (ensuring order is preserved) and execute each group using a ThreadPoolWorker. By batching we're able to process occurrences in parallel while guaranteeing that no occurren
(
worker: ContextPropagatingThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]
)
| 447 | @sentry_sdk.tracing.trace |
| 448 | @metrics.wraps("occurrence_consumer.process_batch") |
| 449 | def process_occurrence_batch( |
| 450 | worker: ContextPropagatingThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]] |
| 451 | ) -> None: |
| 452 | """ |
| 453 | Receives batches of occurrences. This function will take the batch |
| 454 | and group them together by fingerprint (ensuring order is preserved) and |
| 455 | execute each group using a ThreadPoolWorker. |
| 456 | |
| 457 | By batching we're able to process occurrences in parallel while guaranteeing |
| 458 | that no occurrences are processed out of order per group. |
| 459 | """ |
| 460 | |
| 461 | batch = message.payload |
| 462 | |
| 463 | occcurrence_mapping: Mapping[str, list[Mapping[str, Any]]] = defaultdict(list) |
| 464 | |
| 465 | for item in batch: |
| 466 | assert isinstance(item, BrokerValue) |
| 467 | |
| 468 | try: |
| 469 | payload = orjson.loads(item.payload.value) |
| 470 | except Exception: |
| 471 | logger.exception("Failed to unpack message payload") |
| 472 | continue |
| 473 | |
| 474 | # group by the fingerprint, there should only be one of them |
| 475 | partition_key: str = payload["fingerprint"][0] if payload["fingerprint"] else "" |
| 476 | |
| 477 | occcurrence_mapping[partition_key].append(payload) |
| 478 | |
| 479 | # Number of occurrences that are being processed in this batch |
| 480 | metrics.gauge("occurrence_consumer.checkin.parallel_batch_count", len(batch)) |
| 481 | |
| 482 | # Number of groups we've collected to be processed in parallel |
| 483 | metrics.gauge("occurrence_consumer.checkin.parallel_batch_groups", len(occcurrence_mapping)) |
| 484 | # Submit occurrences & status changes for processing |
| 485 | with sentry_sdk.start_transaction(op="process_batch", name="occurrence.occurrence_consumer"): |
| 486 | futures = [ |
| 487 | worker.submit(process_occurrence_group, group) for group in occcurrence_mapping.values() |
| 488 | ] |
| 489 | wait(futures) |
| 490 | |
| 491 | |
| 492 | @metrics.wraps("occurrence_consumer.process_occurrence_group") |