MCPcopy Index your code
hub / github.com/NdoleStudio/httpsms / Publish

Method Publish

api/pkg/services/event_dispatcher_service.go:122–158  ·  view source on GitHub ↗

Publish an event to subscribers

(ctx context.Context, event cloudevents.Event)

Source from the content-addressed store, hash-verified

120
121// Publish an event to subscribers
122func (dispatcher *EventDispatcher) Publish(ctx context.Context, event cloudevents.Event) {
123 ctx, span, ctxLogger := dispatcher.tracer.StartWithLogger(ctx, dispatcher.logger)
124 defer span.End()
125
126 dispatcher.addCloudEventAttributes(span, event)
127
128 start := time.Now()
129
130 subscribers, ok := dispatcher.listeners[event.Type()]
131 if !ok {
132 ctxLogger.Info(fmt.Sprintf("no listener is configured for event type [%s] with id [%s]", event.Type(), event.ID()))
133 return
134 }
135
136 var wg sync.WaitGroup
137 for _, sub := range subscribers {
138 wg.Add(1)
139 go func(ctx context.Context, sub events.EventListener) {
140 if err := sub(ctx, event); err != nil {
141 msg := fmt.Sprintf("subscriber [%T] cannot handle event [%s]", sub, event.Type())
142 ctxLogger.Error(stacktrace.Propagate(err, msg))
143 }
144 wg.Done()
145 }(ctx, sub)
146 }
147
148 wg.Wait()
149
150 dispatcher.meter.Record(
151 ctx,
152 float64(time.Since(start).Milliseconds()),
153 metric.WithAttributes(
154 semconv.CloudeventsEventType(event.Type()),
155 semconv.CloudeventsEventSpecVersion(event.SpecVersion()),
156 ),
157 )
158}
159
160func (dispatcher *EventDispatcher) addCloudEventAttributes(span trace.Span, event cloudevents.Event) {
161 span.SetAttributes(

Callers 2

DispatchSyncMethod · 0.95
enqueueMethod · 0.95

Calls 4

StartWithLoggerMethod · 0.65
InfoMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected