MCPcopy
hub / github.com/zhayujie/CowAgent / startup

Method startup

channel/telegram/telegram_channel.py:74–120  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

72 # ------------------------------------------------------------------
73
74 def startup(self):
75 self.bot_token = conf().get("telegram_token", "")
76 if not self.bot_token:
77 err = "[Telegram] telegram_token is required"
78 logger.error(err)
79 self.report_startup_error(err)
80 return
81
82 try:
83 from telegram.ext import (
84 Application,
85 MessageHandler,
86 CommandHandler,
87 filters,
88 )
89 except ImportError:
90 err = (
91 "[Telegram] python-telegram-bot is not installed. "
92 "Run: pip install python-telegram-bot"
93 )
94 logger.error(err)
95 self.report_startup_error(err)
96 return
97
98 # Run the asyncio event loop in a dedicated thread so the sync cow body
99 # is untouched.
100 self._loop = asyncio.new_event_loop()
101
102 def _run_loop():
103 asyncio.set_event_loop(self._loop)
104 try:
105 self._loop.run_until_complete(self._async_main(Application, MessageHandler, CommandHandler, filters))
106 except Exception as e:
107 logger.error(f"[Telegram] event loop crashed: {e}", exc_info=True)
108 self.report_startup_error(str(e))
109 finally:
110 try:
111 self._loop.close()
112 except Exception:
113 pass
114 logger.info("[Telegram] event loop exited")
115
116 self._loop_thread = threading.Thread(target=_run_loop, daemon=True, name="telegram-loop")
117 self._loop_thread.start()
118 # Block startup() until the loop thread exits, matching other channels'
119 # behaviour (startup is a blocking call).
120 self._loop_thread.join()
121
122 async def _async_main(self, Application, MessageHandler, CommandHandler, filters):
123 """Build Application, register handlers, and run polling."""

Callers

nothing calls this directly

Calls 5

confFunction · 0.90
errorMethod · 0.80
report_startup_errorMethod · 0.80
getMethod · 0.45
startMethod · 0.45

Tested by

no test coverage detected