MCPcopy
hub / github.com/zhayujie/CowAgent / _interrupt_thread

Method _interrupt_thread

app.py:165–181  ·  view source on GitHub ↗

Raise SystemExit in target thread to break blocking loops like start_forever.

(th: threading.Thread, name: str)

Source from the content-addressed store, hash-verified

163
164 @staticmethod
165 def _interrupt_thread(th: threading.Thread, name: str):
166 """Raise SystemExit in target thread to break blocking loops like start_forever."""
167 import ctypes
168 try:
169 tid = th.ident
170 if tid is None:
171 return
172 res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
173 ctypes.c_ulong(tid), ctypes.py_object(SystemExit)
174 )
175 if res == 1:
176 logger.info(f"[ChannelManager] Interrupted thread for channel '{name}'")
177 elif res > 1:
178 ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_ulong(tid), None)
179 logger.warning(f"[ChannelManager] Failed to interrupt thread for channel '{name}'")
180 except Exception as e:
181 logger.warning(f"[ChannelManager] Thread interrupt error for '{name}': {e}")
182
183 def restart(self, new_channel_name: str):
184 """

Callers 1

stopMethod · 0.95

Calls

no outgoing calls

Tested by

no test coverage detected