(ctx context.Context, event cloudevents.Event, task *PushQueueTask, timeout time.Duration)
| 86 | } |
| 87 | |
| 88 | func (dispatcher *EventDispatcher) enqueue(ctx context.Context, event cloudevents.Event, task *PushQueueTask, timeout time.Duration) (string, error) { |
| 89 | ctx, span, ctxLogger := dispatcher.tracer.StartWithLogger(ctx, dispatcher.logger) |
| 90 | defer span.End() |
| 91 | |
| 92 | queueID, err := dispatcher.queue.Enqueue(ctx, task, timeout) |
| 93 | if errors.Is(err, context.DeadlineExceeded) { |
| 94 | msg := fmt.Sprintf("cannot enqueue event with ID [%s] and type [%s] to [%T]", event.ID(), event.Type(), dispatcher.queue) |
| 95 | ctxLogger.Warn(stacktrace.Propagate(err, msg)) |
| 96 | queueID, err = fmt.Sprintf("local-%s", event.ID()), nil |
| 97 | time.AfterFunc(timeout, func() { |
| 98 | dispatcher.Publish(ctx, event) |
| 99 | }) |
| 100 | } |
| 101 | return queueID, err |
| 102 | } |
| 103 | |
| 104 | // Dispatch a new event by adding it to the queue to be processed async |
| 105 | func (dispatcher *EventDispatcher) Dispatch(ctx context.Context, event cloudevents.Event) error { |
no test coverage detected