MCPcopy
hub / github.com/microsoft/agent-lightning / sync_client_workflow

Function sync_client_workflow

tests/test_client.py:170–187  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

168
169 # Define the synchronous client workflow
170 def sync_client_workflow():
171 # 2. Client polls for the task
172 task = client.poll_next_task()
173 assert task is not None
174 assert task.rollout_id == rollout_id
175 assert task.input == sample_task_input
176 assert task.resources_id is None
177
178 # 3. Client fetches resources
179 res_update = client.get_latest_resources()
180 assert res_update is not None
181 assert res_update.resources_id == resources_id
182
183 # 4. Client posts a completed rollout
184 rollout_payload = RolloutLegacy(rollout_id=rollout_id, final_reward=0.88)
185 response = client.post_rollout(rollout_payload)
186 assert response is not None
187 assert response.get("status") == "ok"
188
189 # Run the sync workflow in a thread
190 await asyncio.to_thread(sync_client_workflow)

Callers

nothing calls this directly

Calls 5

RolloutLegacyClass · 0.90
poll_next_taskMethod · 0.45
get_latest_resourcesMethod · 0.45
post_rolloutMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected