MCPcopy
hub / github.com/microsoft/agent-lightning / _run_client_side_operations

Function _run_client_side_operations

tests/store/test_client_server.py:123–150  ·  view source on GitHub ↗
(client: LightningStoreClient)

Source from the content-addressed store, hash-verified

121
122
123async def _run_client_side_operations(client: LightningStoreClient) -> None:
124 await client.update_resources("metrics-client", {})
125 await client.get_latest_resources()
126
127 await client.start_rollout(input={"origin": "client"}, mode="train", config=RolloutConfig(timeout_seconds=2.0))
128 queued = await client.enqueue_rollout(
129 input={"origin": "client-queue"}, config=RolloutConfig(unresponsive_seconds=5.0)
130 )
131 dequeued = await client.dequeue_rollout(worker_id="metrics-client-worker")
132 assert dequeued is not None
133
134 span = _make_span(dequeued.rollout_id, dequeued.attempt.attempt_id, 1, "client-span")
135 await client.add_span(span)
136
137 await client.update_attempt(
138 dequeued.rollout_id,
139 dequeued.attempt.attempt_id,
140 status="running",
141 worker_id="metrics-client-worker",
142 )
143 await client.update_attempt(dequeued.rollout_id, dequeued.attempt.attempt_id, status="succeeded")
144 await client.update_rollout(dequeued.rollout_id, status="succeeded")
145
146 await client.wait_for_rollouts(rollout_ids=[dequeued.rollout_id], timeout=0.1)
147 await client.query_rollouts()
148 await client.query_attempts(dequeued.rollout_id)
149 await client.get_worker_by_id("metrics-client-worker")
150 assert queued.rollout_id == dequeued.rollout_id
151
152
153@pytest.mark.asyncio

Callers 1

Calls 14

RolloutConfigClass · 0.90
_make_spanFunction · 0.70
update_resourcesMethod · 0.45
get_latest_resourcesMethod · 0.45
start_rolloutMethod · 0.45
enqueue_rolloutMethod · 0.45
dequeue_rolloutMethod · 0.45
add_spanMethod · 0.45
update_attemptMethod · 0.45
update_rolloutMethod · 0.45
wait_for_rolloutsMethod · 0.45
query_rolloutsMethod · 0.45

Tested by

no test coverage detected