Handler called when a Weave Call finishes. Converts the call (including nested children) into spans and stores them in LightningStore.
(self, call: tsi.CallSchema)
| 549 | return sequence_id |
| 550 | |
| 551 | async def complete_call_handler(self, call: tsi.CallSchema) -> None: |
| 552 | """Handler called when a Weave Call finishes. |
| 553 | |
| 554 | Converts the call (including nested children) into spans and stores them in LightningStore. |
| 555 | """ |
| 556 | # Make sure the corresponding call_start_future is complete |
| 557 | if call.id in self._partial_call_futures: |
| 558 | sequence_id = await asyncio.wrap_future(self._partial_call_futures[call.id]) |
| 559 | del self._partial_call_futures[call.id] |
| 560 | else: |
| 561 | # Fetch a new sequence ID as the call_start is somehow missing |
| 562 | if call.id in self._calls: |
| 563 | logger.warning( |
| 564 | f"Call {call.id} is already in calls. The call is already completed. Overwriting the call." |
| 565 | ) |
| 566 | else: |
| 567 | logger.warning(f"Call {call.id} has no start future. Fetching a new sequence ID.") |
| 568 | sequence_id = await self._get_next_sequence_id() |
| 569 | |
| 570 | self._calls[call.id] = call |
| 571 | |
| 572 | span = await self.convert_call_to_span(call, self._rollout_id, self._attempt_id, sequence_id) |
| 573 | self._spans.append(span) |
| 574 | if self._store and self._rollout_id and self._attempt_id: |
| 575 | try: |
| 576 | await self._store.add_span(span) |
| 577 | except Exception as exc: |
| 578 | logger.exception(f"Error adding span to store: {exc}") |
| 579 | |
| 580 | async def convert_call_to_span( |
| 581 | self, |
no test coverage detected