MCPcopy
hub / github.com/rq/django-rq / test_connection_validation

Method test_connection_validation

tests/test_cron.py:45–71  ·  view source on GitHub ↗

Test connection validation for same, compatible, and incompatible queues.

(self)

Source from the content-addressed store, hash-verified

43 # self.assertIsNotNone(cron_job.next_run_time)
44
45 def test_connection_validation(self):
46 """Test connection validation for same, compatible, and incompatible queues."""
47 # Start with test3 queue (secondary Redis DB configured for tests)
48 scheduler = DjangoCronScheduler()
49
50 # Same queue multiple times should work
51 job1 = scheduler.register(say_hello, "test3", interval=60)
52 job2 = scheduler.register(say_hello, "test3", interval=120)
53
54 self.assertEqual(len(scheduler.get_jobs()), 2)
55 self.assertEqual(job1.queue_name, "test3")
56 self.assertEqual(job2.queue_name, "test3")
57
58 # Compatible queues (same Redis connection) should work
59 # Both 'test3' and 'async' share the same Redis connection settings
60 job3 = scheduler.register(say_hello, "async", interval=180)
61 self.assertEqual(len(scheduler.get_jobs()), 3)
62 self.assertEqual(job3.queue_name, "async")
63
64 # Queues having different Redis connections should fail
65 with self.assertRaises(ValueError):
66 scheduler.register(say_hello, "default", interval=240)
67
68 # Undefined queue_name should fail
69 scheduler = DjangoCronScheduler()
70 with self.assertRaises(KeyError):
71 scheduler.register(say_hello, "nonexistent_queue", interval=300)
72
73 def test_connection_index_property(self):
74 """Test connection_index property returns correct index or raises appropriate exceptions."""

Callers

nothing calls this directly

Calls 2

registerMethod · 0.95
DjangoCronSchedulerClass · 0.90

Tested by

no test coverage detected