(ctx context.Context, platform constant.TaskPlatform, channelId int, taskIds []string, taskM map[string]*model.Task)
| 370 | } |
| 371 | |
| 372 | func updateVideoTasks(ctx context.Context, platform constant.TaskPlatform, channelId int, taskIds []string, taskM map[string]*model.Task) error { |
| 373 | logger.LogInfo(ctx, fmt.Sprintf("Channel #%d pending video tasks: %d", channelId, len(taskIds))) |
| 374 | if ctx.Err() != nil { |
| 375 | return ctx.Err() |
| 376 | } |
| 377 | if len(taskIds) == 0 { |
| 378 | return nil |
| 379 | } |
| 380 | cacheGetChannel, err := model.CacheGetChannel(channelId) |
| 381 | if err != nil { |
| 382 | // Collect DB primary key IDs for bulk update (taskIds are upstream IDs, not task_id column values) |
| 383 | var failedIDs []int64 |
| 384 | for _, upstreamID := range taskIds { |
| 385 | if t, ok := taskM[upstreamID]; ok { |
| 386 | failedIDs = append(failedIDs, t.ID) |
| 387 | } |
| 388 | } |
| 389 | errUpdate := model.TaskBulkUpdateByID(failedIDs, map[string]any{ |
| 390 | "fail_reason": fmt.Sprintf("Failed to get channel info, channel ID: %d", channelId), |
| 391 | "status": "FAILURE", |
| 392 | "progress": "100%", |
| 393 | }) |
| 394 | if errUpdate != nil { |
| 395 | common.SysLog(fmt.Sprintf("UpdateVideoTask error: %v", errUpdate)) |
| 396 | } |
| 397 | return fmt.Errorf("CacheGetChannel failed: %w", err) |
| 398 | } |
| 399 | adaptor := GetTaskAdaptorFunc(platform) |
| 400 | if adaptor == nil { |
| 401 | return fmt.Errorf("video adaptor not found") |
| 402 | } |
| 403 | info := &relaycommon.RelayInfo{} |
| 404 | info.ChannelMeta = &relaycommon.ChannelMeta{ |
| 405 | ChannelBaseUrl: cacheGetChannel.GetBaseURL(), |
| 406 | } |
| 407 | info.ApiKey = cacheGetChannel.Key |
| 408 | adaptor.Init(info) |
| 409 | disablePollingSleep := cacheGetChannel.GetOtherSettings().DisableTaskPollingSleep |
| 410 | for i, taskId := range taskIds { |
| 411 | if ctx.Err() != nil { |
| 412 | return ctx.Err() |
| 413 | } |
| 414 | if err := updateVideoSingleTask(ctx, adaptor, cacheGetChannel, taskId, taskM); err != nil { |
| 415 | logger.LogError(ctx, fmt.Sprintf("Failed to update video task %s: %s", taskId, err.Error())) |
| 416 | } |
| 417 | if disablePollingSleep || i == len(taskIds)-1 { |
| 418 | continue |
| 419 | } |
| 420 | |
| 421 | // sleep 1 second between tasks for this channel only. |
| 422 | select { |
| 423 | case <-ctx.Done(): |
| 424 | return ctx.Err() |
| 425 | case <-time.After(1 * time.Second): |
| 426 | } |
| 427 | } |
| 428 | return nil |
| 429 | } |
no test coverage detected