| 167 | |
| 168 | # tg通知 |
| 169 | def telegram_bot(title, content): |
| 170 | try: |
| 171 | print("\n") |
| 172 | bot_token = TG_BOT_TOKEN |
| 173 | user_id = TG_USER_ID |
| 174 | if not bot_token or not user_id: |
| 175 | print("tg服务的bot_token或者user_id未设置!!\n取消推送") |
| 176 | return |
| 177 | print("tg服务启动") |
| 178 | if TG_API_HOST: |
| 179 | if 'http' in TG_API_HOST: |
| 180 | url = f"{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage" |
| 181 | else: |
| 182 | url = f"https://{TG_API_HOST}/bot{TG_BOT_TOKEN}/sendMessage" |
| 183 | else: |
| 184 | url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage" |
| 185 | |
| 186 | headers = {'Content-Type': 'application/x-www-form-urlencoded'} |
| 187 | payload = {'chat_id': str(TG_USER_ID), 'text': f'{title}\n\n{content}', 'disable_web_page_preview': 'true'} |
| 188 | proxies = None |
| 189 | if TG_PROXY_IP and TG_PROXY_PORT: |
| 190 | proxyStr = "http://{}:{}".format(TG_PROXY_IP, TG_PROXY_PORT) |
| 191 | proxies = {"http": proxyStr, "https": proxyStr} |
| 192 | try: |
| 193 | response = requests.post(url=url, headers=headers, params=payload, proxies=proxies).json() |
| 194 | except: |
| 195 | print('推送失败!') |
| 196 | if response['ok']: |
| 197 | print('推送成功!') |
| 198 | else: |
| 199 | print('推送失败!') |
| 200 | except Exception as e: |
| 201 | print(e) |
| 202 | |
| 203 | def dingding_bot(title, content): |
| 204 | timestamp = str(round(time.time() * 1000)) # 时间戳 |