MCPcopy Index your code
hub / github.com/microsoft/agent-lightning / add_task

Method add_task

agentlightning/server.py:61–93  ·  view source on GitHub ↗

Enqueue a new task and return the generated rollout identifier. Args: sample: Payload that describes the task input. mode: Phase in which the sample should be executed (`"train"`, `"val"`, or `"test"`). resources_id: Identifier of a resour

(
        self,
        sample: Any,
        mode: Literal["train", "val", "test"] | None = None,
        resources_id: str | None = None,
        metadata: Dict[str, Any] | None = None,
    )

Source from the content-addressed store, hash-verified

59 self._resources_lock = asyncio.Lock()
60
61 async def add_task(
62 self,
63 sample: Any,
64 mode: Literal["train", "val", "test"] | None = None,
65 resources_id: str | None = None,
66 metadata: Dict[str, Any] | None = None,
67 ) -> str:
68 """Enqueue a new task and return the generated rollout identifier.
69
70 Args:
71 sample: Payload that describes the task input.
72 mode: Phase in which the sample should be executed (`"train"`, `"val"`, or
73 `"test"`).
74 resources_id: Identifier of a resource bundle that the executor should
75 load before running the task.
76 metadata: Optional metadata forwarded to the executor.
77
78 Returns:
79 Unique rollout identifier assigned to the task.
80 """
81 rollout_id = f"rollout-{uuid.uuid4()}"
82 task = Task(
83 rollout_id=rollout_id,
84 input=sample,
85 mode=mode,
86 resources_id=resources_id,
87 create_time=time.time(),
88 num_claims=0,
89 metadata=metadata or {},
90 )
91 await self._task_queue.put(task)
92 logger.info(f"Task queued: {rollout_id} (mode: {mode}, resources_id: {resources_id})")
93 return rollout_id
94
95 async def get_next_task(self) -> Optional[Task]:
96 """Retrieve the next task from the queue without blocking.

Callers 1

queue_taskMethod · 0.80

Calls 2

TaskClass · 0.85
timeMethod · 0.80

Tested by

no test coverage detected