MCPcopy
hub / github.com/microsoft/agent-lightning / update_attempt

Method update_attempt

tests/store/dummy_store.py:172–188  ·  view source on GitHub ↗
(
        self,
        rollout_id: str,
        attempt_id: str | Literal["latest"],
        status: AttemptStatus | Any = UNSET,
        worker_id: str | Any = UNSET,
        last_heartbeat_time: float | Any = UNSET,
        metadata: Optional[Dict[str, Any]] | Any = UNSET,
    )

Source from the content-addressed store, hash-verified

170 return self.return_values["update_rollout"]
171
172 async def update_attempt(
173 self,
174 rollout_id: str,
175 attempt_id: str | Literal["latest"],
176 status: AttemptStatus | Any = UNSET,
177 worker_id: str | Any = UNSET,
178 last_heartbeat_time: float | Any = UNSET,
179 metadata: Optional[Dict[str, Any]] | Any = UNSET,
180 ) -> Attempt:
181 self.calls.append(
182 (
183 "update_attempt",
184 (rollout_id, attempt_id, status, worker_id, last_heartbeat_time, metadata),
185 {},
186 )
187 )
188 return self.return_values["update_attempt"]
189
190 async def query_workers(self, *args: Any, **kwargs: Any) -> List[Worker]:
191 self.calls.append(("query_workers", args, kwargs))

Calls

no outgoing calls