Schedule a model instance by picking one candidate. Args: item: Model instance to schedule.
(self, instance: ModelInstance)
| 312 | logger.error(f"Failed to get item from schedule queue: {e}") |
| 313 | |
| 314 | async def _schedule_one(self, instance: ModelInstance): # noqa: C901 |
| 315 | """ |
| 316 | Schedule a model instance by picking one candidate. |
| 317 | Args: |
| 318 | item: Model instance to schedule. |
| 319 | """ |
| 320 | logger.debug(f"Scheduling model instance {instance.name}") |
| 321 | |
| 322 | state_message = "" |
| 323 | |
| 324 | async with async_session() as session: |
| 325 | workers = await Worker.all(session) |
| 326 | if not workers: |
| 327 | state_message = "No available workers" |
| 328 | |
| 329 | model = await Model.one_by_id(session, instance.model_id) |
| 330 | if model is None: |
| 331 | state_message = "Model not found" |
| 332 | |
| 333 | model_instance = await ModelInstance.one_by_id(session, instance.id) |
| 334 | if model_instance is None: |
| 335 | logger.debug( |
| 336 | f"Model instance(ID: {instance.id}) was deleted before scheduling due" |
| 337 | ) |
| 338 | return |
| 339 | |
| 340 | model_instances = await ModelInstance.all( |
| 341 | session, options=[selectinload(ModelInstance.model)] |
| 342 | ) |
| 343 | |
| 344 | candidate = None |
| 345 | messages = [] |
| 346 | if workers and model: |
| 347 | try: |
| 348 | candidate, messages = await find_candidate( |
| 349 | self._config, model, workers, model_instances |
| 350 | ) |
| 351 | except Exception as e: |
| 352 | state_message = f"Failed to find candidate: {e}" |
| 353 | |
| 354 | if candidate is None: |
| 355 | # update model instance. |
| 356 | if model_instance.state in ( |
| 357 | ModelInstanceStateEnum.SCHEDULED, |
| 358 | ModelInstanceStateEnum.ANALYZING, |
| 359 | ): |
| 360 | model_instance.state = ModelInstanceStateEnum.PENDING |
| 361 | model_instance.state_message = ( |
| 362 | "No suitable workers.\nDetails:\n" + "".join(messages) |
| 363 | ) |
| 364 | if state_message != "": |
| 365 | model_instance.state_message = state_message |
| 366 | |
| 367 | await ModelInstanceService(session).update(model_instance) |
| 368 | logger.debug( |
| 369 | f"No suitable workers for model instance {model_instance.name}, state: {model_instance.state}" |
| 370 | ) |
| 371 | else: |
no test coverage detected