(
agent: LitAgent[Any],
*,
resources: Optional[Dict[str, LLM]] = None,
)
| 39 | ) |
| 40 | def init_runner(request: pytest.FixtureRequest) -> InitRunnerFunction: |
| 41 | async def init_runner_fn( |
| 42 | agent: LitAgent[Any], |
| 43 | *, |
| 44 | resources: Optional[Dict[str, LLM]] = None, |
| 45 | ) -> tuple[LitAgentRunner[Any], InMemoryLightningStore]: |
| 46 | store = InMemoryLightningStore() |
| 47 | llm_resource: NamedResources = resources or {"llm": LLM(endpoint="http://localhost", model="dummy")} # type: ignore[assignment] |
| 48 | await store.update_resources("default", llm_resource) |
| 49 | |
| 50 | # This is always AgentOpsTracer for now |
| 51 | runner = LitAgentRunner[Any](tracer=AgentOpsTracer(), poll_interval=0.01) |
| 52 | runner.init(agent) |
| 53 | runner.init_worker(worker_id=0, store=store) |
| 54 | return runner, store |
| 55 | |
| 56 | return init_runner_fn # type: ignore |
| 57 |
nothing calls this directly
no test coverage detected