| 37 | self._pending_tasks: set[asyncio.Task] = set() |
| 38 | |
| 39 | async def dispatch(self) -> None: |
| 40 | while True: |
| 41 | event: AstrMessageEvent = await self.event_queue.get() |
| 42 | conf_info = self.astrbot_config_mgr.get_conf_info(event.unified_msg_origin) |
| 43 | conf_id = conf_info["id"] |
| 44 | conf_name = conf_info.get("name") or conf_id |
| 45 | self._print_event(event, conf_name) |
| 46 | scheduler = self.pipeline_scheduler_mapping.get(conf_id) |
| 47 | if not scheduler: |
| 48 | logger.error( |
| 49 | f"PipelineScheduler not found for id: {conf_id}, event ignored." |
| 50 | ) |
| 51 | continue |
| 52 | task = asyncio.create_task(scheduler.execute(event)) |
| 53 | self._pending_tasks.add(task) |
| 54 | task.add_done_callback(self._on_task_done) |
| 55 | |
| 56 | def _on_task_done(self, task: asyncio.Task) -> None: |
| 57 | """pipeline 任务结束回调: 移除强引用并暴露未捕获的异常""" |