QueueSubscribeReply creates a queue subscription that supports replying to requests. Load-balanced across subscribers in the same queue group, with request-reply support.
(subject, queue string, handler func(data []byte, reply func([]byte)))
| 250 | // QueueSubscribeReply creates a queue subscription that supports replying to requests. |
| 251 | // Load-balanced across subscribers in the same queue group, with request-reply support. |
| 252 | func (c *Client) QueueSubscribeReply(subject, queue string, handler func(data []byte, reply func([]byte))) (Subscription, error) { |
| 253 | return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) { |
| 254 | return conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) { |
| 255 | handler(msg.Data, func(replyData []byte) { |
| 256 | if msg.Reply != "" { |
| 257 | if err := msg.Respond(replyData); err != nil { |
| 258 | xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err) |
| 259 | } |
| 260 | } |
| 261 | }) |
| 262 | }) |
| 263 | }) |
| 264 | } |
| 265 | |
| 266 | // SubscribeJSON creates a subscription that automatically unmarshals JSON messages. |
| 267 | // Invalid JSON messages are logged and skipped. |
nothing calls this directly
no test coverage detected