Subscribe implements the [Client.Subscribe] interface method. Empty subscriptions (aka. "") are ignored.
(subs ...string)
| 152 | // |
| 153 | // Empty subscriptions (aka. "") are ignored. |
| 154 | func (c *DefaultClient) Subscribe(subs ...string) { |
| 155 | c.mu.Lock() |
| 156 | defer c.mu.Unlock() |
| 157 | |
| 158 | for _, s := range subs { |
| 159 | if s == "" { |
| 160 | continue // skip empty |
| 161 | } |
| 162 | |
| 163 | // extract subscription options (if any) |
| 164 | rawOptions := struct { |
| 165 | // note: any instead of string to minimize the breaking changes with earlier versions |
| 166 | Query map[string]any `json:"query"` |
| 167 | Headers map[string]any `json:"headers"` |
| 168 | }{} |
| 169 | u, err := url.Parse(s) |
| 170 | if err == nil { |
| 171 | raw := u.Query().Get(optionsParam) |
| 172 | if raw != "" { |
| 173 | json.Unmarshal([]byte(raw), &rawOptions) |
| 174 | } |
| 175 | } |
| 176 | |
| 177 | options := SubscriptionOptions{ |
| 178 | Query: make(map[string]string, len(rawOptions.Query)), |
| 179 | Headers: make(map[string]string, len(rawOptions.Headers)), |
| 180 | } |
| 181 | |
| 182 | // normalize query |
| 183 | // (currently only single string values are supported for consistency with the default routes handling) |
| 184 | for k, v := range rawOptions.Query { |
| 185 | options.Query[k] = cast.ToString(v) |
| 186 | } |
| 187 | |
| 188 | // normalize headers name and values, eg. "X-Token" is converted to "x_token" |
| 189 | // (currently only single string values are supported for consistency with the default routes handling) |
| 190 | for k, v := range rawOptions.Headers { |
| 191 | options.Headers[inflector.Snakecase(k)] = cast.ToString(v) |
| 192 | } |
| 193 | |
| 194 | c.subscriptions[s] = options |
| 195 | } |
| 196 | } |
| 197 | |
| 198 | // Unsubscribe implements the [Client.Unsubscribe] interface method. |
| 199 | // |