Test connection validation for same, compatible, and incompatible queues.
(self)
| 43 | # self.assertIsNotNone(cron_job.next_run_time) |
| 44 | |
| 45 | def test_connection_validation(self): |
| 46 | """Test connection validation for same, compatible, and incompatible queues.""" |
| 47 | # Start with test3 queue (secondary Redis DB configured for tests) |
| 48 | scheduler = DjangoCronScheduler() |
| 49 | |
| 50 | # Same queue multiple times should work |
| 51 | job1 = scheduler.register(say_hello, "test3", interval=60) |
| 52 | job2 = scheduler.register(say_hello, "test3", interval=120) |
| 53 | |
| 54 | self.assertEqual(len(scheduler.get_jobs()), 2) |
| 55 | self.assertEqual(job1.queue_name, "test3") |
| 56 | self.assertEqual(job2.queue_name, "test3") |
| 57 | |
| 58 | # Compatible queues (same Redis connection) should work |
| 59 | # Both 'test3' and 'async' share the same Redis connection settings |
| 60 | job3 = scheduler.register(say_hello, "async", interval=180) |
| 61 | self.assertEqual(len(scheduler.get_jobs()), 3) |
| 62 | self.assertEqual(job3.queue_name, "async") |
| 63 | |
| 64 | # Queues having different Redis connections should fail |
| 65 | with self.assertRaises(ValueError): |
| 66 | scheduler.register(say_hello, "default", interval=240) |
| 67 | |
| 68 | # Undefined queue_name should fail |
| 69 | scheduler = DjangoCronScheduler() |
| 70 | with self.assertRaises(KeyError): |
| 71 | scheduler.register(say_hello, "nonexistent_queue", interval=300) |
| 72 | |
| 73 | def test_connection_index_property(self): |
| 74 | """Test connection_index property returns correct index or raises appropriate exceptions.""" |
nothing calls this directly
no test coverage detected