StartMessageDistributor 启动消息分发器
()
| 174 | |
| 175 | // StartMessageDistributor 启动消息分发器 |
| 176 | func (e *AsyncExecutor) StartMessageDistributor() { |
| 177 | go func() { |
| 178 | for task := range e.msgQueue { |
| 179 | msg := task.message |
| 180 | e.DistributeToModelPipelines(msg.Model, msg) |
| 181 | if msg.Status == "error" || msg.Status == "success" { |
| 182 | mp, has := e.GetModelPipeline(msg.Model) |
| 183 | if has && mp.pullFn != nil { |
| 184 | mp.pullFn(msg) |
| 185 | } |
| 186 | e.CloseModelPipeline(msg.Model) |
| 187 | continue |
| 188 | } |
| 189 | } |
| 190 | }() |
| 191 | } |
| 192 | |
| 193 | // DistributeToModelPipelines 仅将消息分发给指定模型的管道 |
| 194 | func (e *AsyncExecutor) DistributeToModelPipelines(model string, msg PullMessage) { |
no test coverage detected