startTaskAtTs is used to check whether an op is already running. If a rollup is running, it is canceled and startTask will wait until it completes before returning. If the same task is already running, this method returns an error. Restore operations have preference and cancel all other operations,
(id op, ts uint64)
| 114 | // You should only call Done() on the returned closer. Calling other functions (such as |
| 115 | // SignalAndWait) for closer could result in panics. For more details, see GitHub issue #5034. |
| 116 | func (n *node) startTaskAtTs(id op, ts uint64) (*z.Closer, error) { |
| 117 | n.opsLock.Lock() |
| 118 | defer n.opsLock.Unlock() |
| 119 | |
| 120 | stopTask := func(id op) { |
| 121 | n.opsLock.Lock() |
| 122 | delete(n.ops, id) |
| 123 | n.opsLock.Unlock() |
| 124 | glog.Infof("Operation completed with id: %s", id) |
| 125 | |
| 126 | // Resume rollups if another operation is being stopped. |
| 127 | if id != opRollup { |
| 128 | time.Sleep(10 * time.Second) // Wait for 10s to start rollup operation. |
| 129 | // If any other operation is running, this would error out. This error can |
| 130 | // be safely ignored because rollups will resume once that other task is done. |
| 131 | _, _ = n.startTask(opRollup) |
| 132 | } |
| 133 | } |
| 134 | |
| 135 | closer := z.NewCloser(1) |
| 136 | switch id { |
| 137 | case opRollup: |
| 138 | if len(n.ops) > 0 { |
| 139 | return nil, errors.Errorf("another operation is already running") |
| 140 | } |
| 141 | go posting.IncrRollup.Process(closer, State.GetTimestamp) |
| 142 | case opRestore: |
| 143 | // Restores cancel all other operations, except for other restores since |
| 144 | // only one restore operation should be active any given moment. |
| 145 | for otherId, otherOp := range n.ops { |
| 146 | if otherId == opRestore { |
| 147 | return nil, errors.Errorf("another restore operation is already running") |
| 148 | } |
| 149 | // Remove from map and signal the closer to cancel the operation. |
| 150 | delete(n.ops, otherId) |
| 151 | otherOp.SignalAndWait() |
| 152 | } |
| 153 | case opBackup: |
| 154 | // Backup cancels all other operations, except for other backups since |
| 155 | // only one backup operation should be active any given moment. Also, indexing at higher |
| 156 | // timestamp can also run concurrently with backup. |
| 157 | for otherId, otherOp := range n.ops { |
| 158 | if otherId == opBackup { |
| 159 | return nil, errors.Errorf("another backup operation is already running") |
| 160 | } |
| 161 | // Remove from map and signal the closer to cancel the operation. |
| 162 | delete(n.ops, otherId) |
| 163 | otherOp.SignalAndWait() |
| 164 | } |
| 165 | case opIndexing: |
| 166 | for otherId, otherOp := range n.ops { |
| 167 | switch otherId { |
| 168 | case opBackup: |
| 169 | if otherOp.ts < ts { |
| 170 | // If backup is running at higher timestamp, then indexing can't be executed. |
| 171 | continue |
| 172 | } |
| 173 | return nil, errors.Errorf("operation %s is already running", otherId) |