(ctx context.Context, quitCh chan struct{})
| 154 | } |
| 155 | |
| 156 | func (m *Manager) run(ctx context.Context, quitCh chan struct{}) { |
| 157 | parser := newInputParser() |
| 158 | m.scheduler = newKeyScheduler(m.queueSize) |
| 159 | m.observeSchedulerPending() |
| 160 | var workersWG sync.WaitGroup |
| 161 | |
| 162 | m.startWorkers(&workersWG) |
| 163 | |
| 164 | // Wake any enqueue() blocked on a full scheduler when the parent context |
| 165 | // is canceled. Necessary because the reader loop below may itself be |
| 166 | // blocked inside dispatchInvocation -> scheduler.enqueue waiting for |
| 167 | // space, and would otherwise miss the ctx.Done signal. |
| 168 | stopWatcher := make(chan struct{}) |
| 169 | defer close(stopWatcher) |
| 170 | go func() { |
| 171 | select { |
| 172 | case <-ctx.Done(): |
| 173 | if m.scheduler != nil { |
| 174 | m.scheduler.stop() |
| 175 | } |
| 176 | case <-stopWatcher: |
| 177 | } |
| 178 | }() |
| 179 | |
| 180 | for { |
| 181 | select { |
| 182 | case <-ctx.Done(): |
| 183 | m.shutdown(false, true, quitCh, &workersWG) |
| 184 | return |
| 185 | case line, ok := <-m.input.lines(): |
| 186 | if !ok { |
| 187 | m.shutdown(false, true, quitCh, &workersWG) |
| 188 | return |
| 189 | } |
| 190 | event, err := parser.parseEvent(line) |
| 191 | if err != nil { |
| 192 | m.Warningf("parse function: %v ('%s')", err, line) |
| 193 | continue |
| 194 | } |
| 195 | |
| 196 | switch event.kind { |
| 197 | case inputEventNone, inputEventProgress: |
| 198 | continue |
| 199 | case inputEventQuit: |
| 200 | m.shutdown(true, true, quitCh, &workersWG) |
| 201 | return |
| 202 | case inputEventCancel: |
| 203 | m.handleCancelEvent(event) |
| 204 | continue |
| 205 | case inputEventCall: |
| 206 | m.observeFunctionCall() |
| 207 | m.dispatchInvocation(ctx, event.fn) |
| 208 | } |
| 209 | } |
| 210 | } |
| 211 | } |
| 212 | |
| 213 | func (m *Manager) startWorkers(workersWG *sync.WaitGroup) { |
no test coverage detected