MCPcopy Index your code
hub / github.com/1Panel-dev/MaxKB / SendEmailSerializer

Class SendEmailSerializer

apps/users/serializers/user.py:1082–1155  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

1080
1081
1082class SendEmailSerializer(serializers.Serializer):
1083 email = serializers.EmailField(
1084 required=True
1085 , label=_("Email"),
1086 validators=[validators.EmailValidator(message=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.message,
1087 code=ExceptionCodeConstants.EMAIL_FORMAT_ERROR.value.code)])
1088
1089 type = serializers.CharField(required=True, label=_("Type"), validators=[
1090 validators.RegexValidator(regex=re.compile("^register|reset_password$"),
1091 message=_("The type only supports register|reset_password"), code=500)
1092 ])
1093
1094 class Meta:
1095 model = User
1096 fields = '__all__'
1097
1098 def is_valid(self, *, raise_exception=False):
1099 super().is_valid(raise_exception=raise_exception)
1100 code_cache_key = self.data.get('email') + ":" + self.data.get("type")
1101 code_cache_key_lock = code_cache_key + "_lock"
1102 ttl = cache.ttl(code_cache_key_lock, version=version)
1103 if ttl is not None and ttl > 0:
1104 raise AppApiException(500, _("Do not send emails again within {seconds} seconds").format(
1105 seconds=int(ttl.total_seconds())))
1106 return True
1107
1108 def send(self):
1109 """
1110 发送邮件
1111 :return: 是否发送成功
1112 :exception 发送失败异常
1113 """
1114 email = self.data.get("email")
1115 state = self.data.get("type")
1116 # 生成随机验证码
1117 code = "".join(list(map(lambda i: random.choice(['1', '2', '3', '4', '5', '6', '7', '8', '9', '0'
1118 ]), range(6))))
1119 # 获取邮件模板
1120 language = get_language()
1121 file = open(
1122 os.path.join(PROJECT_DIR, "apps", "common", 'template', f'email_template_{language}.html'), "r",
1123 encoding='utf-8')
1124 content = file.read()
1125 file.close()
1126 code_cache_key = email + ":" + state
1127 code_cache_key_lock = code_cache_key + "_lock"
1128 # 设置缓存
1129 cache.set(get_key(code_cache_key_lock), code, timeout=60, version=version)
1130 system_setting = QuerySet(SystemSetting).filter(type=SettingType.EMAIL.value).first()
1131 if system_setting is None:
1132 cache.delete(get_key(code_cache_key_lock), version=version)
1133 raise AppApiException(1004,
1134 _("The email service has not been set up. Please contact the administrator to set up the email service in [Email Settings]."))
1135 try:
1136 connection = EmailBackend(system_setting.meta.get("email_host"),
1137 system_setting.meta.get('email_port'),
1138 system_setting.meta.get('email_host_user'),
1139 system_setting.meta.get('email_host_password'),

Callers 2

postMethod · 0.90
postMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected