MCPcopy
hub / github.com/Bogdanp/dramatiq / test_raise_thread_exception

Function test_raise_thread_exception

tests/middleware/test_threading.py:16–38  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

14@pytest.mark.skipif(not_supported, reason="Threading not supported on this platform.")
15@pytest.mark.skipif(threading.is_gevent_active(), reason="Thread exceptions not supported with gevent.")
16def test_raise_thread_exception():
17 # Given that I have a database
18 caught = []
19
20 # And a function that waits for an interrupt
21 def work():
22 try:
23 for _ in range(10):
24 time.sleep(0.1)
25 except threading.Interrupt:
26 caught.append(1)
27
28 # When I start the thread
29 t = Thread(target=work)
30 t.start()
31 time.sleep(0.1)
32
33 # And raise the interrupt and join on the thread
34 threading.raise_thread_exception(t.ident, threading.Interrupt)
35 t.join()
36
37 # I expect the interrupt to have been caught
38 assert sum(caught) == 1
39
40
41@pytest.mark.skipif(not_supported, reason="Threading not supported on this platform.")

Callers

nothing calls this directly

Calls 2

startMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected