UpdateVideoTasks 按渠道更新所有视频任务
(ctx context.Context, platform constant.TaskPlatform, taskChannelM map[int][]string, taskM map[string]*model.Task)
| 340 | |
| 341 | // UpdateVideoTasks 按渠道更新所有视频任务 |
| 342 | func UpdateVideoTasks(ctx context.Context, platform constant.TaskPlatform, taskChannelM map[int][]string, taskM map[string]*model.Task) error { |
| 343 | channelIDs := make([]int, 0, len(taskChannelM)) |
| 344 | for channelID := range taskChannelM { |
| 345 | channelIDs = append(channelIDs, channelID) |
| 346 | } |
| 347 | sort.Ints(channelIDs) |
| 348 | |
| 349 | var wg sync.WaitGroup |
| 350 | for _, channelId := range channelIDs { |
| 351 | taskIds := taskChannelM[channelId] |
| 352 | if len(taskIds) == 0 { |
| 353 | continue |
| 354 | } |
| 355 | taskIds = append([]string(nil), taskIds...) |
| 356 | |
| 357 | wg.Add(1) |
| 358 | gopool.Go(func() { |
| 359 | defer wg.Done() |
| 360 | if err := updateVideoTasks(ctx, platform, channelId, taskIds, taskM); err != nil { |
| 361 | logger.LogError(ctx, fmt.Sprintf("Channel #%d failed to update video async tasks: %s", channelId, err.Error())) |
| 362 | } |
| 363 | }) |
| 364 | } |
| 365 | wg.Wait() |
| 366 | if ctx.Err() != nil { |
| 367 | return ctx.Err() |
| 368 | } |
| 369 | return nil |
| 370 | } |
| 371 | |
| 372 | func updateVideoTasks(ctx context.Context, platform constant.TaskPlatform, channelId int, taskIds []string, taskM map[string]*model.Task) error { |
| 373 | logger.LogInfo(ctx, fmt.Sprintf("Channel #%d pending video tasks: %d", channelId, len(taskIds))) |