MCPcopy
hub / github.com/getsentry/sentry / process_occurrence_batch

Function process_occurrence_batch

src/sentry/issues/occurrence_consumer.py:449–489  ·  view source on GitHub ↗

Receives batches of occurrences. This function will take the batch and group them together by fingerprint (ensuring order is preserved) and execute each group using a ThreadPoolWorker. By batching we're able to process occurrences in parallel while guaranteeing that no occurren

(
    worker: ContextPropagatingThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]
)

Source from the content-addressed store, hash-verified

447@sentry_sdk.tracing.trace
448@metrics.wraps("occurrence_consumer.process_batch")
449def process_occurrence_batch(
450 worker: ContextPropagatingThreadPoolExecutor, message: Message[ValuesBatch[KafkaPayload]]
451) -> None:
452 """
453 Receives batches of occurrences. This function will take the batch
454 and group them together by fingerprint (ensuring order is preserved) and
455 execute each group using a ThreadPoolWorker.
456
457 By batching we're able to process occurrences in parallel while guaranteeing
458 that no occurrences are processed out of order per group.
459 """
460
461 batch = message.payload
462
463 occcurrence_mapping: Mapping[str, list[Mapping[str, Any]]] = defaultdict(list)
464
465 for item in batch:
466 assert isinstance(item, BrokerValue)
467
468 try:
469 payload = orjson.loads(item.payload.value)
470 except Exception:
471 logger.exception("Failed to unpack message payload")
472 continue
473
474 # group by the fingerprint, there should only be one of them
475 partition_key: str = payload["fingerprint"][0] if payload["fingerprint"] else ""
476
477 occcurrence_mapping[partition_key].append(payload)
478
479 # Number of occurrences that are being processed in this batch
480 metrics.gauge("occurrence_consumer.checkin.parallel_batch_count", len(batch))
481
482 # Number of groups we've collected to be processed in parallel
483 metrics.gauge("occurrence_consumer.checkin.parallel_batch_groups", len(occcurrence_mapping))
484 # Submit occurrences & status changes for processing
485 with sentry_sdk.start_transaction(op="process_batch", name="occurrence.occurrence_consumer"):
486 futures = [
487 worker.submit(process_occurrence_group, group) for group in occcurrence_mapping.values()
488 ]
489 wait(futures)
490
491
492@metrics.wraps("occurrence_consumer.process_occurrence_group")

Callers 1

process_batchFunction · 0.90

Calls 5

waitFunction · 0.85
appendMethod · 0.65
gaugeMethod · 0.45
submitMethod · 0.45
valuesMethod · 0.45

Tested by

no test coverage detected