Stop channel(s). If channel_name is given, stop only that channel; otherwise stop all channels.
(self, channel_name: str = None)
| 122 | logger.exception(e) |
| 123 | |
| 124 | def stop(self, channel_name: str = None): |
| 125 | """ |
| 126 | Stop channel(s). If channel_name is given, stop only that channel; |
| 127 | otherwise stop all channels. |
| 128 | """ |
| 129 | # Pop under lock, then stop outside lock to avoid deadlock |
| 130 | with self._lock: |
| 131 | names = [channel_name] if channel_name else list(self._channels.keys()) |
| 132 | to_stop = [] |
| 133 | for name in names: |
| 134 | ch = self._channels.pop(name, None) |
| 135 | th = self._threads.pop(name, None) |
| 136 | to_stop.append((name, ch, th)) |
| 137 | if channel_name and self._primary_channel is self._channels.get(channel_name): |
| 138 | self._primary_channel = None |
| 139 | |
| 140 | for name, ch, th in to_stop: |
| 141 | if ch is None: |
| 142 | logger.warning(f"[ChannelManager] Channel '{name}' not found in managed channels") |
| 143 | if th and th.is_alive(): |
| 144 | self._interrupt_thread(th, name) |
| 145 | continue |
| 146 | logger.info(f"[ChannelManager] Stopping channel '{name}'...") |
| 147 | graceful = False |
| 148 | if hasattr(ch, 'stop'): |
| 149 | try: |
| 150 | ch.stop() |
| 151 | graceful = True |
| 152 | except Exception as e: |
| 153 | logger.warning(f"[ChannelManager] Error during channel '{name}' stop: {e}") |
| 154 | if th and th.is_alive(): |
| 155 | th.join(timeout=5) |
| 156 | if th.is_alive(): |
| 157 | if graceful: |
| 158 | logger.info(f"[ChannelManager] Channel '{name}' thread still alive after stop(), " |
| 159 | "leaving daemon thread to finish on its own") |
| 160 | else: |
| 161 | logger.warning(f"[ChannelManager] Channel '{name}' thread did not exit in 5s, forcing interrupt") |
| 162 | self._interrupt_thread(th, name) |
| 163 | |
| 164 | @staticmethod |
| 165 | def _interrupt_thread(th: threading.Thread, name: str): |
no test coverage detected