MCPcopy
hub / github.com/microsoft/agent-lightning / _maybe_flush

Method _maybe_flush

agentlightning/llm_proxy.py:321–439  ·  view source on GitHub ↗

Flush ready subtrees from the buffer. Strategy: We consider a subtree "ready" if we can identify a root span. We then take that root and all its descendants out of the buffer and try to reconstruct rollout/attempt/sequence headers by merging any

(self)

Source from the content-addressed store, hash-verified

319 return SpanExportResult.SUCCESS
320
321 def _maybe_flush(self):
322 """Flush ready subtrees from the buffer.
323
324 Strategy:
325 We consider a subtree "ready" if we can identify a root span. We
326 then take that root and all its descendants out of the buffer and
327 try to reconstruct rollout/attempt/sequence headers by merging any
328 span's `metadata.requester_custom_headers` within the subtree.
329
330 Required headers:
331 `x-rollout-id` (str), `x-attempt-id` (str), `x-sequence-id` (str of int)
332
333 Raises:
334 None directly. Logs and skips malformed spans.
335
336 """
337 # Iterate over current roots. Each iteration pops a whole subtree.
338 for root_span_id in self._get_root_span_ids():
339 subtree_spans = self._pop_subtrees(root_span_id)
340 if not subtree_spans:
341 continue
342
343 # Store is initialized lazily here in most cases.
344 store = self._store or get_active_llm_proxy().get_store()
345 if store is None:
346 logger.warning("Store is not set in LLMProxy. Cannot log spans to store.")
347 continue
348
349 # If the store supports OTLP endpoint, use it.
350 if store.capabilities.get("otlp_traces", False):
351 otlp_traces_endpoint = store.otlp_traces_endpoint()
352 self._otlp_exporter._endpoint = otlp_traces_endpoint # pyright: ignore[reportPrivateUsage]
353 otlp_enabled = True
354 else:
355 otlp_enabled = False
356
357 # Merge all custom headers found in the subtree.
358 headers_merged: Dict[str, Any] = {}
359
360 for span in subtree_spans:
361 if span.attributes is None:
362 continue
363 headers_str = span.attributes.get("metadata.requester_custom_headers")
364 if headers_str is None:
365 continue
366 if not isinstance(headers_str, str):
367 logger.error(
368 f"metadata.requester_custom_headers is not stored as a string: {headers_str}. Skipping the span."
369 )
370 continue
371 if not headers_str.strip():
372 logger.warning("metadata.requester_custom_headers is an empty string. Skipping the span.")
373 continue
374 try:
375 # Use literal_eval to parse the stringified dict safely.
376 headers = ast.literal_eval(headers_str)
377 except Exception as e:
378 logger.error(

Callers 1

exportMethod · 0.95

Calls 10

_get_root_span_idsMethod · 0.95
_pop_subtreesMethod · 0.95
_ensure_loopMethod · 0.95
get_active_llm_proxyFunction · 0.85
get_storeMethod · 0.45
getMethod · 0.45
otlp_traces_endpointMethod · 0.45
updateMethod · 0.45
exportMethod · 0.45
add_otel_spanMethod · 0.45

Tested by

no test coverage detected