MCPcopy
hub / github.com/mudler/LocalAI / QueueSubscribeReply

Method QueueSubscribeReply

core/services/messaging/client.go:252–264  ·  view source on GitHub ↗

QueueSubscribeReply creates a queue subscription that supports replying to requests. Load-balanced across subscribers in the same queue group, with request-reply support.

(subject, queue string, handler func(data []byte, reply func([]byte)))

Source from the content-addressed store, hash-verified

250// QueueSubscribeReply creates a queue subscription that supports replying to requests.
251// Load-balanced across subscribers in the same queue group, with request-reply support.
252func (c *Client) QueueSubscribeReply(subject, queue string, handler func(data []byte, reply func([]byte))) (Subscription, error) {
253 return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
254 return conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
255 handler(msg.Data, func(replyData []byte) {
256 if msg.Reply != "" {
257 if err := msg.Respond(replyData); err != nil {
258 xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
259 }
260 }
261 })
262 })
263 })
264}
265
266// SubscribeJSON creates a subscription that automatically unmarshals JSON messages.
267// Invalid JSON messages are logged and skipped.

Callers

nothing calls this directly

Calls 3

confirmSubscriptionMethod · 0.95
QueueSubscribeMethod · 0.65
handlerFunction · 0.50

Tested by

no test coverage detected