Sends built email message asynchronously.
(
campaign_key: str,
to: List[Dict[str, str]],
subject: str,
headers: Dict,
txt_body: str = "",
html_body: str = "",
reply_to: Optional[str] = None,
)
| 43 | |
| 44 | @app.task(ignore_result=True, max_retries=3) |
| 45 | def _send_email( |
| 46 | campaign_key: str, |
| 47 | to: List[Dict[str, str]], |
| 48 | subject: str, |
| 49 | headers: Dict, |
| 50 | txt_body: str = "", |
| 51 | html_body: str = "", |
| 52 | reply_to: Optional[str] = None, |
| 53 | ) -> None: |
| 54 | """ |
| 55 | Sends built email message asynchronously. |
| 56 | """ |
| 57 | |
| 58 | messages: List = [] |
| 59 | records: List = [] |
| 60 | |
| 61 | with transaction.atomic(): |
| 62 | |
| 63 | for dest in to: |
| 64 | record, _ = MessagingRecord.objects.get_or_create(raw_email=dest["raw_email"], campaign_key=campaign_key) |
| 65 | |
| 66 | # Lock object (database-level) while the message is sent |
| 67 | record = MessagingRecord.objects.select_for_update().get(pk=record.pk) |
| 68 | # If an email for this campaign was already sent to this user, skip recipient |
| 69 | if record.sent_at: |
| 70 | record.save() # release DB lock |
| 71 | continue |
| 72 | |
| 73 | records.append(record) |
| 74 | reply_to = reply_to or get_instance_setting("EMAIL_REPLY_TO") |
| 75 | |
| 76 | email_message = mail.EmailMultiAlternatives( |
| 77 | subject=subject, |
| 78 | body=txt_body, |
| 79 | from_email=get_instance_setting("EMAIL_DEFAULT_FROM"), |
| 80 | to=[dest["recipient"]], |
| 81 | headers=headers, |
| 82 | reply_to=[reply_to] if reply_to else None, |
| 83 | ) |
| 84 | |
| 85 | email_message.attach_alternative(html_body, "text/html") |
| 86 | messages.append(email_message) |
| 87 | |
| 88 | connection = None |
| 89 | try: |
| 90 | klass = import_string(settings.EMAIL_BACKEND) if settings.EMAIL_BACKEND else EmailBackend |
| 91 | connection = klass( |
| 92 | host=get_instance_setting("EMAIL_HOST"), |
| 93 | port=get_instance_setting("EMAIL_PORT"), |
| 94 | username=get_instance_setting("EMAIL_HOST_USER"), |
| 95 | password=get_instance_setting("EMAIL_HOST_PASSWORD"), |
| 96 | use_tls=get_instance_setting("EMAIL_USE_TLS"), |
| 97 | use_ssl=get_instance_setting("EMAIL_USE_SSL"), |
| 98 | ) |
| 99 | connection.open() |
| 100 | connection.send_messages(messages) |
| 101 | |
| 102 | for record in records: |
searching dependent graphs…