(agent_name: str)
| 69 | |
| 70 | |
| 71 | def _get_agent_span(agent_name: str) -> dict[str, Any]: |
| 72 | for span in SPAN_PROCESSOR_TESTING.get_ordered_spans(including_empty=True): |
| 73 | exported = span.export() |
| 74 | if not exported: |
| 75 | continue |
| 76 | span_data = exported.get("span_data") |
| 77 | if not isinstance(span_data, dict): |
| 78 | continue |
| 79 | if span_data.get("type") == "agent" and span_data.get("name") == agent_name: |
| 80 | return exported |
| 81 | raise AssertionError(f"Agent span for '{agent_name}' not found") |
| 82 | |
| 83 | |
| 84 | def _action_with_keys(factory: Callable[..., T], **kwargs: Any) -> T: |
no test coverage detected