MCPcopy
hub / github.com/knadh/listmonk / worker

Method worker

internal/manager/manager.go:487–582  ·  view source on GitHub ↗

worker is a blocking function that perpetually listents to events (message) on different queues and processes them.

()

Source from the content-addressed store, hash-verified

485// worker is a blocking function that perpetually listents to events (message) on different
486// queues and processes them.
487func (m *Manager) worker() {
488 // Counter to keep track of the message / sec rate limit.
489 numMsg := 0
490 for {
491 select {
492 // Campaign message.
493 case msg, ok := <-m.campMsgQ:
494 if !ok {
495 return
496 }
497
498 // If the campaign has ended or stopped, ignore the message.
499 if msg.pipe != nil && msg.pipe.stopped.Load() {
500 // Reduce the message counter on the pipe.
501 msg.pipe.wg.Done()
502 continue
503 }
504
505 // Pause on hitting the message rate.
506 if numMsg >= m.cfg.MessageRate {
507 time.Sleep(time.Second)
508 numMsg = 0
509 }
510 numMsg++
511
512 // Outgoing message.
513 out := models.Message{
514 From: msg.from,
515 To: []string{msg.to},
516 Subject: msg.subject,
517 ContentType: msg.Campaign.ContentType,
518 Body: msg.body,
519 AltBody: msg.altBody,
520 Subscriber: msg.Subscriber,
521 Campaign: msg.Campaign,
522 Attachments: msg.Campaign.Attachments,
523 }
524
525 h := textproto.MIMEHeader{}
526 h.Set(models.EmailHeaderCampaignUUID, msg.Campaign.UUID)
527 h.Set(models.EmailHeaderSubscriberUUID, msg.Subscriber.UUID)
528
529 // Attach List-Unsubscribe headers?
530 if m.cfg.UnsubHeader {
531 h.Set("List-Unsubscribe-Post", "List-Unsubscribe=One-Click")
532 h.Set("List-Unsubscribe", `<`+msg.unsubURL+`>`)
533 }
534
535 // Attach any custom headers.
536 for _, set := range msg.headers {
537 for hdr, val := range set {
538 h.Add(hdr, val)
539 }
540 }
541
542 // Set the headers.
543 out.Headers = h
544

Callers 1

RunMethod · 0.95

Calls 3

LoadMethod · 0.80
OnErrorMethod · 0.80
PushMethod · 0.65

Tested by

no test coverage detected