MCPcopy
hub / github.com/micro/go-micro / stream

Method stream

client/rpc_client.go:236–348  ·  view source on GitHub ↗
(ctx context.Context, node *registry.Node, req Request, opts CallOptions)

Source from the content-addressed store, hash-verified

234}
235
236func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request, opts CallOptions) (Stream, error) {
237 address := node.Address
238 logger := r.Options().Logger
239
240 msg := &transport.Message{
241 Header: make(map[string]string),
242 }
243
244 md, ok := metadata.FromContext(ctx)
245 if ok {
246 for k, v := range md {
247 msg.Header[k] = v
248 }
249 }
250
251 // set timeout in nanoseconds
252 if opts.StreamTimeout > time.Duration(0) {
253 msg.Header["Timeout"] = fmt.Sprintf("%d", opts.StreamTimeout)
254 }
255 // set the content type for the request
256 msg.Header["Content-Type"] = req.ContentType()
257 // set the accept header
258 msg.Header["Accept"] = req.ContentType()
259
260 // set old codecs
261 nCodec := setupProtocol(msg, node)
262
263 // no codec specified
264 if nCodec == nil {
265 var err error
266
267 nCodec, err = r.newCodec(req.ContentType())
268 if err != nil {
269 return nil, merrors.InternalServerError("go.micro.client", err.Error())
270 }
271 }
272
273 dOpts := []transport.DialOption{
274 transport.WithStream(),
275 }
276
277 if opts.DialTimeout >= 0 {
278 dOpts = append(dOpts, transport.WithTimeout(opts.DialTimeout))
279 }
280
281 c, err := r.opts.Transport.Dial(address, dOpts...)
282 if err != nil {
283 return nil, merrors.InternalServerError("go.micro.client", "connection error: %v", err)
284 }
285
286 // increment the sequence number
287 seq := atomic.AddUint64(&r.seq, 1) - 1
288 id := fmt.Sprintf("%v", seq)
289
290 // create codec with stream id
291 codec := newRPCCodec(msg, c, nCodec, id)
292
293 rsp := &rpcResponse{

Callers 1

StreamMethod · 0.95

Calls 15

OptionsMethod · 0.95
newCodecMethod · 0.95
SendMethod · 0.95
CloseMethod · 0.95
FromContextFunction · 0.92
WithStreamFunction · 0.92
WithTimeoutFunction · 0.92
LogfFunction · 0.92
DoneMethod · 0.80
setupProtocolFunction · 0.70
newRPCCodecFunction · 0.70
DurationMethod · 0.65

Tested by

no test coverage detected