Flush ready subtrees from the buffer. Strategy: We consider a subtree "ready" if we can identify a root span. We then take that root and all its descendants out of the buffer and try to reconstruct rollout/attempt/sequence headers by merging any
(self)
| 319 | return SpanExportResult.SUCCESS |
| 320 | |
| 321 | def _maybe_flush(self): |
| 322 | """Flush ready subtrees from the buffer. |
| 323 | |
| 324 | Strategy: |
| 325 | We consider a subtree "ready" if we can identify a root span. We |
| 326 | then take that root and all its descendants out of the buffer and |
| 327 | try to reconstruct rollout/attempt/sequence headers by merging any |
| 328 | span's `metadata.requester_custom_headers` within the subtree. |
| 329 | |
| 330 | Required headers: |
| 331 | `x-rollout-id` (str), `x-attempt-id` (str), `x-sequence-id` (str of int) |
| 332 | |
| 333 | Raises: |
| 334 | None directly. Logs and skips malformed spans. |
| 335 | |
| 336 | """ |
| 337 | # Iterate over current roots. Each iteration pops a whole subtree. |
| 338 | for root_span_id in self._get_root_span_ids(): |
| 339 | subtree_spans = self._pop_subtrees(root_span_id) |
| 340 | if not subtree_spans: |
| 341 | continue |
| 342 | |
| 343 | # Store is initialized lazily here in most cases. |
| 344 | store = self._store or get_active_llm_proxy().get_store() |
| 345 | if store is None: |
| 346 | logger.warning("Store is not set in LLMProxy. Cannot log spans to store.") |
| 347 | continue |
| 348 | |
| 349 | # If the store supports OTLP endpoint, use it. |
| 350 | if store.capabilities.get("otlp_traces", False): |
| 351 | otlp_traces_endpoint = store.otlp_traces_endpoint() |
| 352 | self._otlp_exporter._endpoint = otlp_traces_endpoint # pyright: ignore[reportPrivateUsage] |
| 353 | otlp_enabled = True |
| 354 | else: |
| 355 | otlp_enabled = False |
| 356 | |
| 357 | # Merge all custom headers found in the subtree. |
| 358 | headers_merged: Dict[str, Any] = {} |
| 359 | |
| 360 | for span in subtree_spans: |
| 361 | if span.attributes is None: |
| 362 | continue |
| 363 | headers_str = span.attributes.get("metadata.requester_custom_headers") |
| 364 | if headers_str is None: |
| 365 | continue |
| 366 | if not isinstance(headers_str, str): |
| 367 | logger.error( |
| 368 | f"metadata.requester_custom_headers is not stored as a string: {headers_str}. Skipping the span." |
| 369 | ) |
| 370 | continue |
| 371 | if not headers_str.strip(): |
| 372 | logger.warning("metadata.requester_custom_headers is an empty string. Skipping the span.") |
| 373 | continue |
| 374 | try: |
| 375 | # Use literal_eval to parse the stringified dict safely. |
| 376 | headers = ast.literal_eval(headers_str) |
| 377 | except Exception as e: |
| 378 | logger.error( |
no test coverage detected