A thread that will put exceptions into an external queue if the task fails. There are two approaches to stop the thread: 1. Set stop_event to stop the loop 2. Let `task` return False Args: task (Callable[..., bool]): The task to run repeatedly in the thread, should
| 281 | |
| 282 | |
| 283 | class ManagedThread(threading.Thread): |
| 284 | """ A thread that will put exceptions into an external queue if the task fails. |
| 285 | |
| 286 | There are two approaches to stop the thread: |
| 287 | 1. Set stop_event to stop the loop |
| 288 | 2. Let `task` return False |
| 289 | |
| 290 | Args: |
| 291 | task (Callable[..., bool]): The task to run repeatedly in the thread, should return False if break the loop. |
| 292 | error_queue (Queue): The queue to put exceptions into if the task fails. |
| 293 | name (str): The name of the thread. |
| 294 | **kwargs: The arguments to pass to the task |
| 295 | """ |
| 296 | |
| 297 | def __init__(self, |
| 298 | task: Callable[..., bool], |
| 299 | error_queue: Queue, |
| 300 | name: Optional[str] = None, |
| 301 | stop_event: Optional[threading.Event] = None, |
| 302 | **kwargs): |
| 303 | super().__init__(name=name) |
| 304 | self.task = task |
| 305 | self.error_queue = error_queue |
| 306 | self.kwargs = kwargs |
| 307 | self.daemon = True |
| 308 | self.stop_event = stop_event or threading.Event() |
| 309 | |
| 310 | def run(self): |
| 311 | |
| 312 | while not self.stop_event.is_set(): |
| 313 | task = self.task |
| 314 | if isinstance(task, weakref.WeakMethod): |
| 315 | task = task() |
| 316 | if task is None: |
| 317 | # Normally, this should not happen. |
| 318 | logger.warning("WeakMethod is expired.") |
| 319 | break |
| 320 | |
| 321 | try: |
| 322 | if not task(**self.kwargs): |
| 323 | break |
| 324 | except Exception as e: |
| 325 | logger.error( |
| 326 | f"Error in thread {self.name}: {e}\n{traceback.format_exc()}" |
| 327 | ) |
| 328 | self.error_queue.put(e) |
| 329 | |
| 330 | logger.info(f"Thread {self.name} stopped.") |
| 331 | |
| 332 | def stop(self): |
| 333 | self.stop_event.set() |
| 334 | |
| 335 | |
| 336 | _enable_llm_debug_ = None |
no outgoing calls
no test coverage detected