MCPcopy Index your code
hub / github.com/plotly/dash / KillerThread

Class KillerThread

dash/testing/application_runners.py:118–138  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

116
117
118class KillerThread(threading.Thread):
119 def __init__(self, **kwargs):
120 super().__init__(**kwargs)
121 self._old_threads = list(threading._active.keys()) # type: ignore[reportAttributeAccessIssue]; pylint: disable=W0212
122
123 def kill(self):
124 # Kill all the new threads.
125 for thread_id in list(threading._active): # type: ignore[reportAttributeAccessIssue]; pylint: disable=W0212
126 if thread_id in self._old_threads:
127 continue
128
129 res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
130 ctypes.c_long(thread_id), ctypes.py_object(SystemExit)
131 )
132 if res == 0:
133 raise ValueError(f"Invalid thread id: {thread_id}")
134 if res > 1:
135 ctypes.pythonapi.PyThreadState_SetAsyncExc(
136 ctypes.c_long(thread_id), None
137 )
138 raise SystemExit("Stopping thread failure")
139
140
141class ThreadedRunner(BaseDashRunner):

Callers 1

startMethod · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…