MCPcopy
hub / github.com/dgraph-io/dgraph / startTaskAtTs

Method startTaskAtTs

worker/draft.go:116–204  ·  view source on GitHub ↗

startTaskAtTs is used to check whether an op is already running. If a rollup is running, it is canceled and startTask will wait until it completes before returning. If the same task is already running, this method returns an error. Restore operations have preference and cancel all other operations,

(id op, ts uint64)

Source from the content-addressed store, hash-verified

114// You should only call Done() on the returned closer. Calling other functions (such as
115// SignalAndWait) for closer could result in panics. For more details, see GitHub issue #5034.
116func (n *node) startTaskAtTs(id op, ts uint64) (*z.Closer, error) {
117 n.opsLock.Lock()
118 defer n.opsLock.Unlock()
119
120 stopTask := func(id op) {
121 n.opsLock.Lock()
122 delete(n.ops, id)
123 n.opsLock.Unlock()
124 glog.Infof("Operation completed with id: %s", id)
125
126 // Resume rollups if another operation is being stopped.
127 if id != opRollup {
128 time.Sleep(10 * time.Second) // Wait for 10s to start rollup operation.
129 // If any other operation is running, this would error out. This error can
130 // be safely ignored because rollups will resume once that other task is done.
131 _, _ = n.startTask(opRollup)
132 }
133 }
134
135 closer := z.NewCloser(1)
136 switch id {
137 case opRollup:
138 if len(n.ops) > 0 {
139 return nil, errors.Errorf("another operation is already running")
140 }
141 go posting.IncrRollup.Process(closer, State.GetTimestamp)
142 case opRestore:
143 // Restores cancel all other operations, except for other restores since
144 // only one restore operation should be active any given moment.
145 for otherId, otherOp := range n.ops {
146 if otherId == opRestore {
147 return nil, errors.Errorf("another restore operation is already running")
148 }
149 // Remove from map and signal the closer to cancel the operation.
150 delete(n.ops, otherId)
151 otherOp.SignalAndWait()
152 }
153 case opBackup:
154 // Backup cancels all other operations, except for other backups since
155 // only one backup operation should be active any given moment. Also, indexing at higher
156 // timestamp can also run concurrently with backup.
157 for otherId, otherOp := range n.ops {
158 if otherId == opBackup {
159 return nil, errors.Errorf("another backup operation is already running")
160 }
161 // Remove from map and signal the closer to cancel the operation.
162 delete(n.ops, otherId)
163 otherOp.SignalAndWait()
164 }
165 case opIndexing:
166 for otherId, otherOp := range n.ops {
167 switch otherId {
168 case opBackup:
169 if otherOp.ts < ts {
170 // If backup is running at higher timestamp, then indexing can't be executed.
171 continue
172 }
173 return nil, errors.Errorf("operation %s is already running", otherId)

Callers 3

startTaskMethod · 0.95
backupCurrentGroupFunction · 0.80
runSchemaMutationFunction · 0.80

Calls 7

startTaskMethod · 0.95
InfofMethod · 0.80
WaitMethod · 0.80
LockMethod · 0.45
UnlockMethod · 0.45
ErrorfMethod · 0.45
ProcessMethod · 0.45

Tested by

no test coverage detected