Clear the singleton cache for the channel class so that a new instance can be created with updated config.
(channel_name: str)
| 222 | |
| 223 | |
| 224 | def _clear_singleton_cache(channel_name: str): |
| 225 | """ |
| 226 | Clear the singleton cache for the channel class so that |
| 227 | a new instance can be created with updated config. |
| 228 | """ |
| 229 | cls_map = { |
| 230 | "web": "channel.web.web_channel.WebChannel", |
| 231 | "wechatmp": "channel.wechatmp.wechatmp_channel.WechatMPChannel", |
| 232 | "wechatmp_service": "channel.wechatmp.wechatmp_channel.WechatMPChannel", |
| 233 | "wechatcom_app": "channel.wechatcom.wechatcomapp_channel.WechatComAppChannel", |
| 234 | const.WECHAT_KF: "channel.wechat_kf.wechat_kf_channel.WechatKfChannel", |
| 235 | const.FEISHU: "channel.feishu.feishu_channel.FeiShuChanel", |
| 236 | const.DINGTALK: "channel.dingtalk.dingtalk_channel.DingTalkChanel", |
| 237 | const.WECOM_BOT: "channel.wecom_bot.wecom_bot_channel.WecomBotChannel", |
| 238 | const.QQ: "channel.qq.qq_channel.QQChannel", |
| 239 | const.TELEGRAM: "channel.telegram.telegram_channel.TelegramChannel", |
| 240 | const.SLACK: "channel.slack.slack_channel.SlackChannel", |
| 241 | const.DISCORD: "channel.discord.discord_channel.DiscordChannel", |
| 242 | const.WEIXIN: "channel.weixin.weixin_channel.WeixinChannel", |
| 243 | "wx": "channel.weixin.weixin_channel.WeixinChannel", |
| 244 | } |
| 245 | module_path = cls_map.get(channel_name) |
| 246 | if not module_path: |
| 247 | return |
| 248 | try: |
| 249 | parts = module_path.rsplit(".", 1) |
| 250 | module_name, class_name = parts[0], parts[1] |
| 251 | import importlib |
| 252 | module = importlib.import_module(module_name) |
| 253 | wrapper = getattr(module, class_name, None) |
| 254 | if wrapper and hasattr(wrapper, '__closure__') and wrapper.__closure__: |
| 255 | for cell in wrapper.__closure__: |
| 256 | try: |
| 257 | cell_contents = cell.cell_contents |
| 258 | if isinstance(cell_contents, dict): |
| 259 | cell_contents.clear() |
| 260 | logger.debug(f"[ChannelManager] Cleared singleton cache for {class_name}") |
| 261 | break |
| 262 | except ValueError: |
| 263 | pass |
| 264 | except Exception as e: |
| 265 | logger.warning(f"[ChannelManager] Failed to clear singleton cache: {e}") |
| 266 | |
| 267 | |
| 268 | def sigterm_handler_wrap(_signo): |
no test coverage detected