MCPcopy
hub / github.com/SurgeDM/Surge / worker

Method worker

internal/engine/concurrent/worker.go:17–183  ·  view source on GitHub ↗

worker downloads tasks from the queue

(ctx context.Context, id int, mirrors []string, file *os.File, queue *TaskQueue, totalSize int64, client *http.Client)

Source from the content-addressed store, hash-verified

15
16// worker downloads tasks from the queue
17func (d *ConcurrentDownloader) worker(ctx context.Context, id int, mirrors []string, file *os.File, queue *TaskQueue, totalSize int64, client *http.Client) error {
18 // Get pooled buffer
19 bufPtr := d.bufPool.Get().(*[]byte)
20 defer d.bufPool.Put(bufPtr)
21 buf := *bufPtr
22
23 utils.Debug("Worker %d started", id)
24 defer utils.Debug("Worker %d finished", id)
25
26 // Initial mirror assignment: Round Robin based on ID
27 currentMirrorIdx := id % len(mirrors)
28
29 for {
30 // Get next task
31 task, ok := queue.Pop()
32
33 if !ok {
34 return nil // Queue closed, no more work
35 }
36
37 // Update active workers
38 if d.State != nil {
39 d.State.ActiveWorkers.Add(1)
40 }
41
42 var lastErr error
43 maxRetries := d.Runtime.GetMaxTaskRetries()
44 for attempt := 0; attempt < maxRetries; attempt++ {
45 if attempt > 0 {
46
47 if len(mirrors) == 1 {
48 time.Sleep(time.Duration(1<<attempt) * types.RetryBaseDelay) // Exponential backoff incase of failure
49 }
50
51 // FAILOVER: Switch mirror on retry
52 // Report error for the previous mirror
53 d.ReportMirrorError(mirrors[currentMirrorIdx])
54
55 currentMirrorIdx = (currentMirrorIdx + 1) % len(mirrors)
56 utils.Debug("Worker %d: switching to mirror %s (attempt %d)", id, mirrors[currentMirrorIdx], attempt+1)
57 }
58
59 // Use current mirror
60 currentURL := mirrors[currentMirrorIdx]
61
62 // Register active task with per-task cancellable context
63 taskCtx, taskCancel := context.WithCancel(ctx)
64 now := time.Now()
65 activeTask := &ActiveTask{
66 Task: task,
67 StartTime: now,
68 Cancel: taskCancel,
69 WindowStart: now, // Initialize sliding window
70 }
71 // If the incoming Task carried a shared pointer, copy it into the active task
72 if task.SharedMaxOffset != nil {
73 // activeTask is newly allocated and not yet visible to other goroutines,
74 // so this assignment does not need mutex protection.

Callers 1

executeWorkersMethod · 0.95

Calls 9

ReportMirrorErrorMethod · 0.95
downloadTaskMethod · 0.95
RemainingTaskMethod · 0.95
DebugFunction · 0.92
PopMethod · 0.80
GetMaxTaskRetriesMethod · 0.80
UpdateChunkStatusMethod · 0.80
PushMethod · 0.80
AddMethod · 0.65

Tested by

no test coverage detected