The worker works as scheduler too if RQ Scheduler not installed, and the pid scheduler_pid is correct
(self)
| 647 | } |
| 648 | ) |
| 649 | def test_worker_scheduler_pid_inactive(self): |
| 650 | '''The worker works as scheduler too if RQ Scheduler not installed, and the pid scheduler_pid is correct''' |
| 651 | test_queue = 'worker_scheduler_inactive_test' |
| 652 | worker = get_worker(test_queue, name=uuid4().hex) |
| 653 | worker.work( |
| 654 | with_scheduler=False, burst=True |
| 655 | ) # worker will not acquire lock, scheduler_pid should return None |
| 656 | self.assertIsNone(get_scheduler_pid(worker.queues[0])) |
nothing calls this directly
no test coverage detected