enqueue adds a new task to the queue. This must be of type: - *pb.BackupRequest - *pb.ExportRequest
(req interface{})
| 162 | // - *pb.BackupRequest |
| 163 | // - *pb.ExportRequest |
| 164 | func (t *tasks) enqueue(req interface{}) (uint64, error) { |
| 165 | var kind TaskKind |
| 166 | switch req.(type) { |
| 167 | case *pb.BackupRequest: |
| 168 | kind = TaskKindBackup |
| 169 | case *pb.ExportRequest: |
| 170 | kind = TaskKindExport |
| 171 | default: |
| 172 | panic(fmt.Sprintf("invalid TaskKind: %d", kind)) |
| 173 | } |
| 174 | |
| 175 | t.logMu.Lock() |
| 176 | defer t.logMu.Unlock() |
| 177 | |
| 178 | task := taskRequest{ |
| 179 | id: t.newId(), |
| 180 | req: req, |
| 181 | } |
| 182 | select { |
| 183 | // t.logMu must be acquired before pushing to t.queue, otherwise the worker might start the |
| 184 | // task, and won't be able to find it in t.log. |
| 185 | case t.queue <- task: |
| 186 | t.log.Set(task.id, newTaskMeta(kind, TaskStatusQueued).uint64()) |
| 187 | return task.id, nil |
| 188 | default: |
| 189 | return 0, fmt.Errorf("too many pending tasks, please try again later") |
| 190 | } |
| 191 | } |
| 192 | |
| 193 | // get retrieves metadata for a given task ID. |
| 194 | func (t *tasks) get(id uint64) (TaskMeta, error) { |