Simplified version of arun that focuses on citation handling for testing.
(
self,
system_instruction: str = None,
messages: list[Message] = None,
*args,
**kwargs,
)
| 338 | return f"event: {event_type}\ndata: {json.dumps(data)}\n\n" |
| 339 | |
| 340 | async def arun( |
| 341 | self, |
| 342 | system_instruction: str = None, |
| 343 | messages: list[Message] = None, |
| 344 | *args, |
| 345 | **kwargs, |
| 346 | ) -> AsyncGenerator[str, None]: |
| 347 | """ |
| 348 | Simplified version of arun that focuses on citation handling for testing. |
| 349 | """ |
| 350 | await self._setup(system_instruction) |
| 351 | |
| 352 | if messages: |
| 353 | for m in messages: |
| 354 | await self.conversation.add_message(m) |
| 355 | |
| 356 | # Initialize citation tracker |
| 357 | citation_tracker = CitationTracker() |
| 358 | citation_payloads = {} |
| 359 | |
| 360 | # Track streaming citations for final persistence |
| 361 | self.streaming_citations = [] |
| 362 | |
| 363 | # Get the LLM response with citations |
| 364 | response_content = "This is a test response with citations" |
| 365 | response_content += " [abc1234] [def5678]" |
| 366 | |
| 367 | # Yield an initial message event with the start of the text |
| 368 | yield self._format_sse_event("message", {"content": response_content}) |
| 369 | |
| 370 | # Manually extract and emit citation events |
| 371 | # This is a simpler approach than the character-by-character approach |
| 372 | citation_spans = extract_citation_spans(response_content) |
| 373 | |
| 374 | # Process the citations |
| 375 | for cid, spans in citation_spans.items(): |
| 376 | for span in spans: |
| 377 | # Check if the span is new and record it |
| 378 | if citation_tracker.is_new_span(cid, span): |
| 379 | |
| 380 | # Look up the source document for this citation |
| 381 | source_doc = self.search_results_collector.find_by_short_id(cid) |
| 382 | |
| 383 | # Create citation payload |
| 384 | citation_payload = { |
| 385 | "document_id": source_doc.get("document_id", f"doc_{cid}"), |
| 386 | "text": source_doc.get("text", f"This is document text for {cid}"), |
| 387 | "metadata": source_doc.get("metadata", {"source": f"source_{cid}"}), |
| 388 | } |
| 389 | |
| 390 | # Store the payload by citation ID |
| 391 | citation_payloads[cid] = citation_payload |
| 392 | |
| 393 | # Track for persistence |
| 394 | self.streaming_citations.append({ |
| 395 | "id": cid, |
| 396 | "span": {"start": span[0], "end": span[1]}, |
| 397 | "payload": citation_payload |
nothing calls this directly
no test coverage detected