Publish an event to subscribers
(ctx context.Context, event cloudevents.Event)
| 120 | |
| 121 | // Publish an event to subscribers |
| 122 | func (dispatcher *EventDispatcher) Publish(ctx context.Context, event cloudevents.Event) { |
| 123 | ctx, span, ctxLogger := dispatcher.tracer.StartWithLogger(ctx, dispatcher.logger) |
| 124 | defer span.End() |
| 125 | |
| 126 | dispatcher.addCloudEventAttributes(span, event) |
| 127 | |
| 128 | start := time.Now() |
| 129 | |
| 130 | subscribers, ok := dispatcher.listeners[event.Type()] |
| 131 | if !ok { |
| 132 | ctxLogger.Info(fmt.Sprintf("no listener is configured for event type [%s] with id [%s]", event.Type(), event.ID())) |
| 133 | return |
| 134 | } |
| 135 | |
| 136 | var wg sync.WaitGroup |
| 137 | for _, sub := range subscribers { |
| 138 | wg.Add(1) |
| 139 | go func(ctx context.Context, sub events.EventListener) { |
| 140 | if err := sub(ctx, event); err != nil { |
| 141 | msg := fmt.Sprintf("subscriber [%T] cannot handle event [%s]", sub, event.Type()) |
| 142 | ctxLogger.Error(stacktrace.Propagate(err, msg)) |
| 143 | } |
| 144 | wg.Done() |
| 145 | }(ctx, sub) |
| 146 | } |
| 147 | |
| 148 | wg.Wait() |
| 149 | |
| 150 | dispatcher.meter.Record( |
| 151 | ctx, |
| 152 | float64(time.Since(start).Milliseconds()), |
| 153 | metric.WithAttributes( |
| 154 | semconv.CloudeventsEventType(event.Type()), |
| 155 | semconv.CloudeventsEventSpecVersion(event.SpecVersion()), |
| 156 | ), |
| 157 | ) |
| 158 | } |
| 159 | |
| 160 | func (dispatcher *EventDispatcher) addCloudEventAttributes(span trace.Span, event cloudevents.Event) { |
| 161 | span.SetAttributes( |
no test coverage detected