MCPcopy
hub / github.com/Koed00/django-q / test_timeout

Function test_timeout

django_q/tests/test_cluster.py:313–333  ·  view source on GitHub ↗
(broker, cluster_config_timeout, async_task_kwargs)

Source from the content-addressed store, hash-verified

311 ),
312)
313def test_timeout(broker, cluster_config_timeout, async_task_kwargs):
314 # set up the Sentinel
315 broker.list_key = "timeout_test:q"
316 broker.purge_queue()
317 async_task("time.sleep", 5, broker=broker, **async_task_kwargs)
318 start_event = Event()
319 stop_event = Event()
320 cluster_id = uuidlib.uuid4()
321 # Set a timer to stop the Sentinel
322 threading.Timer(3, stop_event.set).start()
323 s = Sentinel(
324 stop_event,
325 start_event,
326 cluster_id=cluster_id,
327 broker=broker,
328 timeout=cluster_config_timeout,
329 )
330 assert start_event.is_set()
331 assert s.status() == Conf.STOPPED
332 assert s.reincarnations == 1
333 broker.delete_queue()
334
335
336@pytest.mark.django_db

Callers

nothing calls this directly

Calls 6

statusMethod · 0.95
async_taskFunction · 0.90
SentinelClass · 0.90
purge_queueMethod · 0.45
startMethod · 0.45
delete_queueMethod · 0.45

Tested by

no test coverage detected