(ctx context.Context, node *registry.Node, req Request, opts CallOptions)
| 234 | } |
| 235 | |
| 236 | func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) { |
| 237 | address := node.Address |
| 238 | logger := r.Options().Logger |
| 239 | |
| 240 | msg := &transport.Message{ |
| 241 | Header: make(map[string]string), |
| 242 | } |
| 243 | |
| 244 | md, ok := metadata.FromContext(ctx) |
| 245 | if ok { |
| 246 | for k, v := range md { |
| 247 | msg.Header[k] = v |
| 248 | } |
| 249 | } |
| 250 | |
| 251 | // set timeout in nanoseconds |
| 252 | if opts.StreamTimeout > time.Duration(0) { |
| 253 | msg.Header["Timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) |
| 254 | } |
| 255 | // set the content type for the request |
| 256 | msg.Header["Content-Type"] = req.ContentType() |
| 257 | // set the accept header |
| 258 | msg.Header["Accept"] = req.ContentType() |
| 259 | |
| 260 | // set old codecs |
| 261 | nCodec := setupProtocol(msg, node) |
| 262 | |
| 263 | // no codec specified |
| 264 | if nCodec == nil { |
| 265 | var err error |
| 266 | |
| 267 | nCodec, err = r.newCodec(req.ContentType()) |
| 268 | if err != nil { |
| 269 | return nil, merrors.InternalServerError("go.micro.client", err.Error()) |
| 270 | } |
| 271 | } |
| 272 | |
| 273 | dOpts := []transport.DialOption{ |
| 274 | transport.WithStream(), |
| 275 | } |
| 276 | |
| 277 | if opts.DialTimeout >= 0 { |
| 278 | dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout)) |
| 279 | } |
| 280 | |
| 281 | c, err := r.opts.Transport.Dial(address, dOpts...) |
| 282 | if err != nil { |
| 283 | return nil, merrors.InternalServerError("go.micro.client", "connection error: %v", err) |
| 284 | } |
| 285 | |
| 286 | // increment the sequence number |
| 287 | seq := atomic.AddUint64(&r.seq, 1) - 1 |
| 288 | id := fmt.Sprintf("%v", seq) |
| 289 | |
| 290 | // create codec with stream id |
| 291 | codec := newRPCCodec(msg, c, nCodec, id) |
| 292 | |
| 293 | rsp := &rpcResponse{ |
no test coverage detected