Test that first register() call initializes the scheduler with queue's connection.
(self)
| 25 | self.assertEqual(scheduler._cron_jobs, []) |
| 26 | |
| 27 | def test_first_register_initializes_connection(self): |
| 28 | """Test that first register() call initializes the scheduler with queue's connection.""" |
| 29 | scheduler = DjangoCronScheduler() |
| 30 | |
| 31 | # Register a job with cron expression (run every minute) |
| 32 | cron_job = scheduler.register(say_hello, "default", cron="* * * * *") |
| 33 | |
| 34 | # Should now have connection set |
| 35 | self.assertIsNotNone(scheduler.connection) |
| 36 | self.assertIsNotNone(scheduler._connection_config) |
| 37 | self.assertIsInstance(cron_job, CronJob) |
| 38 | self.assertEqual(len(scheduler.get_jobs()), 1) |
| 39 | |
| 40 | # Verify cron expression is set correctly |
| 41 | self.assertEqual(cron_job.cron, "* * * * *") |
| 42 | self.assertIsNone(cron_job.interval) |
| 43 | # self.assertIsNotNone(cron_job.next_run_time) |
| 44 | |
| 45 | def test_connection_validation(self): |
| 46 | """Test connection validation for same, compatible, and incompatible queues.""" |
nothing calls this directly
no test coverage detected