Buffer spans per trace and export them once the workflow finishes.
| 301 | |
| 302 | |
| 303 | class PromptfooTracingProcessor(TracingProcessor): |
| 304 | """Buffer spans per trace and export them once the workflow finishes.""" |
| 305 | |
| 306 | def __init__(self, exporter: PromptfooOTLPExporter) -> None: |
| 307 | self._exporter = exporter |
| 308 | self._lock = threading.Lock() |
| 309 | self._traces: dict[str, Trace] = {} |
| 310 | self._spans_by_trace: dict[str, list[Span[Any]]] = {} |
| 311 | |
| 312 | def on_trace_start(self, trace: Trace) -> None: |
| 313 | with self._lock: |
| 314 | self._traces[trace.trace_id] = trace |
| 315 | |
| 316 | def on_trace_end(self, trace: Trace) -> None: |
| 317 | with self._lock: |
| 318 | spans = list(self._spans_by_trace.pop(trace.trace_id, [])) |
| 319 | self._traces.pop(trace.trace_id, None) |
| 320 | |
| 321 | try: |
| 322 | self._exporter.export([trace, *spans]) |
| 323 | except Exception as exc: |
| 324 | print( |
| 325 | f"[promptfoo_tracing] Failed to export trace {trace.trace_id}: {exc}", |
| 326 | file=sys.stderr, |
| 327 | ) |
| 328 | |
| 329 | def on_span_start(self, span: Span[Any]) -> None: |
| 330 | return None |
| 331 | |
| 332 | def on_span_end(self, span: Span[Any]) -> None: |
| 333 | with self._lock: |
| 334 | self._spans_by_trace.setdefault(span.trace_id, []).append(span) |
| 335 | |
| 336 | def shutdown(self) -> None: |
| 337 | self.force_flush() |
| 338 | |
| 339 | def force_flush(self) -> None: |
| 340 | with self._lock: |
| 341 | pending_trace_ids = set(self._traces) | set(self._spans_by_trace) |
| 342 | pending_batches = [] |
| 343 | for trace_id in pending_trace_ids: |
| 344 | trace = self._traces.pop(trace_id, None) |
| 345 | spans = list(self._spans_by_trace.pop(trace_id, [])) |
| 346 | pending_batches.append((trace, spans)) |
| 347 | |
| 348 | for trace, spans in pending_batches: |
| 349 | if trace is not None or spans: |
| 350 | items: list[Trace | Span[Any]] = [*spans] |
| 351 | if trace is not None: |
| 352 | items.insert(0, trace) |
| 353 | try: |
| 354 | self._exporter.export(items) |
| 355 | except Exception as exc: |
| 356 | print( |
| 357 | f"[promptfoo_tracing] Failed to flush " |
| 358 | f"{len(spans)} span(s): {exc}", |
| 359 | file=sys.stderr, |
| 360 | ) |
no outgoing calls
no test coverage detected
searching dependent graphs…