| 336 | |
| 337 | |
| 338 | class InMemoryExporter(export_lib.SpanExporter): |
| 339 | |
| 340 | def __init__(self, trace_dict): |
| 341 | super().__init__() |
| 342 | self._spans = [] |
| 343 | self.trace_dict = trace_dict |
| 344 | |
| 345 | @override |
| 346 | def export( |
| 347 | self, spans: typing.Sequence[ReadableSpan] |
| 348 | ) -> export_lib.SpanExportResult: |
| 349 | for span in spans: |
| 350 | trace_id = span.context.trace_id |
| 351 | attributes = dict(span.attributes) |
| 352 | session_id = attributes.get( |
| 353 | "gcp.vertex.agent.session_id", None |
| 354 | ) or attributes.get("gen_ai.conversation.id", None) |
| 355 | if session_id: |
| 356 | trace_ids = self.trace_dict.setdefault(session_id, []) |
| 357 | if trace_id not in trace_ids: |
| 358 | trace_ids.append(trace_id) |
| 359 | self._spans.extend(spans) |
| 360 | return export_lib.SpanExportResult.SUCCESS |
| 361 | |
| 362 | @override |
| 363 | def force_flush(self, timeout_millis: int = 30000) -> bool: |
| 364 | return True |
| 365 | |
| 366 | def get_finished_spans(self, session_id: str): |
| 367 | trace_ids = self.trace_dict.get(session_id, None) |
| 368 | if trace_ids is None or not trace_ids: |
| 369 | return [] |
| 370 | return [x for x in self._spans if x.context.trace_id in trace_ids] |
| 371 | |
| 372 | def clear(self): |
| 373 | self._spans.clear() |
| 374 | |
| 375 | |
| 376 | class RunAgentRequest(common.BaseModel): |