CancelOperation cancels an in-progress operation by its ID. In distributed mode the UI's cancel click may land on a different replica than the one running the operation. We still publish the cancel event in that case — the peer holding the cancellation func picks it up via the SubjectGalleryCancelW
(id string)
| 325 | // SubjectGalleryCancelWildcard subscriber and runs it locally. The caller |
| 326 | // gets a non-error reply so the UI shows the cancel as accepted. |
| 327 | func (g *GalleryService) CancelOperation(id string) error { |
| 328 | g.Lock() |
| 329 | |
| 330 | if status, ok := g.statuses[id]; ok && status.Cancelled { |
| 331 | g.Unlock() |
| 332 | return fmt.Errorf("operation %q is already cancelled", id) |
| 333 | } |
| 334 | |
| 335 | cancelFunc, localExists := g.cancellations[id] |
| 336 | if localExists { |
| 337 | delete(g.cancellations, id) |
| 338 | } |
| 339 | |
| 340 | nc := g.natsClient |
| 341 | store := g.galleryStore |
| 342 | |
| 343 | if !localExists && nc == nil { |
| 344 | g.Unlock() |
| 345 | return fmt.Errorf("operation %q not found or already completed", id) |
| 346 | } |
| 347 | |
| 348 | if status, ok := g.statuses[id]; ok { |
| 349 | status.Cancelled = true |
| 350 | status.Processed = true |
| 351 | status.Message = "cancelled" |
| 352 | } else { |
| 353 | g.statuses[id] = &OpStatus{ |
| 354 | Cancelled: true, |
| 355 | Processed: true, |
| 356 | Message: "cancelled", |
| 357 | Cancellable: false, |
| 358 | } |
| 359 | } |
| 360 | g.Unlock() |
| 361 | |
| 362 | // Persist the terminal status so the cancel survives a restart. Without |
| 363 | // this the row stays in its active state and re-hydrates straight back into |
| 364 | // processingBackends on the next replica boot — the UI spins again on an op |
| 365 | // the admin already cancelled. The peer that broadcasts wins the write; a |
| 366 | // no-op when standalone (store nil). |
| 367 | if store != nil { |
| 368 | if err := store.Cancel(id); err != nil { |
| 369 | xlog.Warn("Failed to persist gallery operation cancellation", "op_id", id, "error", err) |
| 370 | } |
| 371 | } |
| 372 | |
| 373 | // I/O and user-provided callback after Unlock — the cancel-wildcard |
| 374 | // subscriber loops back into applyCancel on this same replica, which |
| 375 | // would otherwise deadlock on g.Mutex. |
| 376 | if cancelFunc != nil { |
| 377 | cancelFunc() |
| 378 | } |
| 379 | if nc != nil { |
| 380 | if err := nc.Publish(messaging.SubjectGalleryCancel(id), GalleryCancelEvent{JobID: id}); err != nil { |
| 381 | xlog.Warn("Failed to broadcast gallery cancel", "op_id", id, "error", err) |
| 382 | } |
| 383 | } |
| 384 |
no test coverage detected