| 234 | } |
| 235 | |
| 236 | func (t *tasks) run(task taskRequest) error { |
| 237 | // Fetch the task from the log. If the task isn't found, this means it has expired (older than |
| 238 | // taskTtl). |
| 239 | t.logMu.Lock() |
| 240 | meta := TaskMeta(t.log.Get(task.id)) |
| 241 | t.logMu.Unlock() |
| 242 | if meta == 0 { |
| 243 | return fmt.Errorf("is expired, skipping") |
| 244 | } |
| 245 | |
| 246 | // Only proceed if the task is still queued. It's possible that the task got canceled before we |
| 247 | // were able to run it. |
| 248 | if status := meta.Status(); status != TaskStatusQueued { |
| 249 | return fmt.Errorf("status is set to %s, skipping", status) |
| 250 | } |
| 251 | |
| 252 | // Change the task status to Running. |
| 253 | t.logMu.Lock() |
| 254 | t.log.Set(task.id, newTaskMeta(meta.Kind(), TaskStatusRunning).uint64()) |
| 255 | t.logMu.Unlock() |
| 256 | |
| 257 | // Run the task. |
| 258 | var status TaskStatus |
| 259 | err := task.run() |
| 260 | if err != nil { |
| 261 | status = TaskStatusFailed |
| 262 | } else { |
| 263 | status = TaskStatusSuccess |
| 264 | } |
| 265 | |
| 266 | // Change the task status to Success / Failed. |
| 267 | t.logMu.Lock() |
| 268 | t.log.Set(task.id, newTaskMeta(meta.Kind(), status).uint64()) |
| 269 | t.logMu.Unlock() |
| 270 | |
| 271 | // Return the error from the task. |
| 272 | return err |
| 273 | } |
| 274 | |
| 275 | // cleanup deletes all expired tasks. |
| 276 | func (t *tasks) cleanup() { |