ProcessCtx will use the Pool to process a payload and synchronously return the result. If the context cancels before the job has finished the worker will be interrupted and ErrJobTimedOut will be returned. ProcessCtx can be called safely by any goroutines.
(ctx context.Context, payload interface{})
| 220 | // be interrupted and ErrJobTimedOut will be returned. ProcessCtx can be |
| 221 | // called safely by any goroutines. |
| 222 | func (p *Pool) ProcessCtx(ctx context.Context, payload interface{}) (interface{}, error) { |
| 223 | atomic.AddInt64(&p.queuedJobs, 1) |
| 224 | defer atomic.AddInt64(&p.queuedJobs, -1) |
| 225 | |
| 226 | var request workRequest |
| 227 | var open bool |
| 228 | |
| 229 | select { |
| 230 | case request, open = <-p.reqChan: |
| 231 | if !open { |
| 232 | return nil, ErrPoolNotRunning |
| 233 | } |
| 234 | case <-ctx.Done(): |
| 235 | return nil, ctx.Err() |
| 236 | } |
| 237 | |
| 238 | select { |
| 239 | case request.jobChan <- payload: |
| 240 | case <-ctx.Done(): |
| 241 | request.interruptFunc() |
| 242 | return nil, ctx.Err() |
| 243 | } |
| 244 | |
| 245 | select { |
| 246 | case payload, open = <-request.retChan: |
| 247 | if !open { |
| 248 | return nil, ErrWorkerClosed |
| 249 | } |
| 250 | case <-ctx.Done(): |
| 251 | request.interruptFunc() |
| 252 | return nil, ctx.Err() |
| 253 | } |
| 254 | |
| 255 | return payload, nil |
| 256 | } |
| 257 | |
| 258 | // QueueLength returns the current count of pending queued jobs. |
| 259 | func (p *Pool) QueueLength() int64 { |
no outgoing calls