(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{})
| 355 | } |
| 356 | |
| 357 | func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { |
| 358 | e.logger.Debug("gocron: singletonModeRunner starting", "name", name) |
| 359 | for { |
| 360 | select { |
| 361 | case jIn := <-in: |
| 362 | select { |
| 363 | case <-e.ctx.Done(): |
| 364 | e.logger.Debug("gocron: singletonModeRunner shutting down", "name", name) |
| 365 | wg.Done() |
| 366 | return |
| 367 | default: |
| 368 | } |
| 369 | |
| 370 | ctx, cancel := context.WithCancel(e.ctx) |
| 371 | j := requestJobCtx(ctx, jIn.id, e.jobOutRequest) |
| 372 | cancel() |
| 373 | if j != nil { |
| 374 | // need to set shouldSendOut = false here, as there is a duplicative call to sendOutForRescheduling |
| 375 | // inside the runJob function that needs to be skipped. sendOutForRescheduling is previously called |
| 376 | // when the job is sent to the singleton mode runner. |
| 377 | // Exception: for intervalFromCompletion, we want rescheduling to happen AFTER job completion |
| 378 | if !j.intervalFromCompletion { |
| 379 | jIn.shouldSendOut = false |
| 380 | } |
| 381 | e.runJob(*j, jIn) |
| 382 | } |
| 383 | |
| 384 | // remove the limiter block to allow another job to be scheduled |
| 385 | if limitMode == LimitModeReschedule { |
| 386 | <-rescheduleLimiter |
| 387 | } |
| 388 | case <-e.ctx.Done(): |
| 389 | e.logger.Debug("singletonModeRunner shutting down", "name", name) |
| 390 | wg.Done() |
| 391 | return |
| 392 | } |
| 393 | } |
| 394 | } |
| 395 | |
| 396 | func (e *executor) runJob(j internalJob, jIn jobIn) { |
| 397 | if j.ctx == nil { |
no test coverage detected