MCPcopy
hub / github.com/QuantumNous/new-api / updateVideoTasks

Function updateVideoTasks

service/task_polling.go:372–429  ·  view source on GitHub ↗
(ctx context.Context, platform constant.TaskPlatform, channelId int, taskIds []string, taskM map[string]*model.Task)

Source from the content-addressed store, hash-verified

370}
371
372func updateVideoTasks(ctx context.Context, platform constant.TaskPlatform, channelId int, taskIds []string, taskM map[string]*model.Task) error {
373 logger.LogInfo(ctx, fmt.Sprintf("Channel #%d pending video tasks: %d", channelId, len(taskIds)))
374 if ctx.Err() != nil {
375 return ctx.Err()
376 }
377 if len(taskIds) == 0 {
378 return nil
379 }
380 cacheGetChannel, err := model.CacheGetChannel(channelId)
381 if err != nil {
382 // Collect DB primary key IDs for bulk update (taskIds are upstream IDs, not task_id column values)
383 var failedIDs []int64
384 for _, upstreamID := range taskIds {
385 if t, ok := taskM[upstreamID]; ok {
386 failedIDs = append(failedIDs, t.ID)
387 }
388 }
389 errUpdate := model.TaskBulkUpdateByID(failedIDs, map[string]any{
390 "fail_reason": fmt.Sprintf("Failed to get channel info, channel ID: %d", channelId),
391 "status": "FAILURE",
392 "progress": "100%",
393 })
394 if errUpdate != nil {
395 common.SysLog(fmt.Sprintf("UpdateVideoTask error: %v", errUpdate))
396 }
397 return fmt.Errorf("CacheGetChannel failed: %w", err)
398 }
399 adaptor := GetTaskAdaptorFunc(platform)
400 if adaptor == nil {
401 return fmt.Errorf("video adaptor not found")
402 }
403 info := &relaycommon.RelayInfo{}
404 info.ChannelMeta = &relaycommon.ChannelMeta{
405 ChannelBaseUrl: cacheGetChannel.GetBaseURL(),
406 }
407 info.ApiKey = cacheGetChannel.Key
408 adaptor.Init(info)
409 disablePollingSleep := cacheGetChannel.GetOtherSettings().DisableTaskPollingSleep
410 for i, taskId := range taskIds {
411 if ctx.Err() != nil {
412 return ctx.Err()
413 }
414 if err := updateVideoSingleTask(ctx, adaptor, cacheGetChannel, taskId, taskM); err != nil {
415 logger.LogError(ctx, fmt.Sprintf("Failed to update video task %s: %s", taskId, err.Error()))
416 }
417 if disablePollingSleep || i == len(taskIds)-1 {
418 continue
419 }
420
421 // sleep 1 second between tasks for this channel only.
422 select {
423 case <-ctx.Done():
424 return ctx.Err()
425 case <-time.After(1 * time.Second):
426 }
427 }
428 return nil
429}

Callers 1

UpdateVideoTasksFunction · 0.85

Calls 10

LogInfoFunction · 0.92
CacheGetChannelFunction · 0.92
TaskBulkUpdateByIDFunction · 0.92
LogErrorFunction · 0.92
GetBaseURLMethod · 0.80
GetOtherSettingsMethod · 0.80
DoneMethod · 0.80
updateVideoSingleTaskFunction · 0.70
InitMethod · 0.65
ErrorMethod · 0.45

Tested by

no test coverage detected