MCPcopy
hub / github.com/QuantumNous/new-api / UpdateVideoTasks

Function UpdateVideoTasks

service/task_polling.go:342–370  ·  view source on GitHub ↗

UpdateVideoTasks 按渠道更新所有视频任务

(ctx context.Context, platform constant.TaskPlatform, taskChannelM map[int][]string, taskM map[string]*model.Task)

Source from the content-addressed store, hash-verified

340
341// UpdateVideoTasks 按渠道更新所有视频任务
342func UpdateVideoTasks(ctx context.Context, platform constant.TaskPlatform, taskChannelM map[int][]string, taskM map[string]*model.Task) error {
343 channelIDs := make([]int, 0, len(taskChannelM))
344 for channelID := range taskChannelM {
345 channelIDs = append(channelIDs, channelID)
346 }
347 sort.Ints(channelIDs)
348
349 var wg sync.WaitGroup
350 for _, channelId := range channelIDs {
351 taskIds := taskChannelM[channelId]
352 if len(taskIds) == 0 {
353 continue
354 }
355 taskIds = append([]string(nil), taskIds...)
356
357 wg.Add(1)
358 gopool.Go(func() {
359 defer wg.Done()
360 if err := updateVideoTasks(ctx, platform, channelId, taskIds, taskM); err != nil {
361 logger.LogError(ctx, fmt.Sprintf("Channel #%d failed to update video async tasks: %s", channelId, err.Error()))
362 }
363 })
364 }
365 wg.Wait()
366 if ctx.Err() != nil {
367 return ctx.Err()
368 }
369 return nil
370}
371
372func updateVideoTasks(ctx context.Context, platform constant.TaskPlatform, channelId int, taskIds []string, taskM map[string]*model.Task) error {
373 logger.LogInfo(ctx, fmt.Sprintf("Channel #%d pending video tasks: %d", channelId, len(taskIds)))

Calls 5

LogErrorFunction · 0.92
updateVideoTasksFunction · 0.85
AddMethod · 0.80
DoneMethod · 0.80
ErrorMethod · 0.45