MCPcopy
hub / github.com/micro/go-micro / Stream

Method Stream

client/rpc_client.go:554–653  ·  view source on GitHub ↗
(ctx context.Context, request Request, opts ...CallOption)

Source from the content-addressed store, hash-verified

552}
553
554func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) {
555 r.mu.RLock()
556 defer r.mu.RUnlock()
557
558 // make a copy of call opts
559 callOpts := r.opts.CallOptions
560 for _, opt := range opts {
561 opt(&callOpts)
562 }
563
564 next, err := r.next(request, callOpts)
565 if err != nil {
566 return nil, err
567 }
568
569 select {
570 case <-ctx.Done():
571 return nil, merrors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err()))
572 default:
573 }
574
575 call := func(i int) (Stream, error) {
576 // call backoff first. Someone may want an initial start delay
577 t, err := callOpts.Backoff(ctx, request, i)
578 if err != nil {
579 return nil, merrors.InternalServerError("go.micro.client", "backoff error: %v", err.Error())
580 }
581
582 // only sleep if greater than 0
583 if t.Seconds() > 0 {
584 time.Sleep(t)
585 }
586
587 node, err := next()
588 service := request.Service()
589
590 if err != nil {
591 if errors.Is(err, selector.ErrNotFound) {
592 return nil, merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error())
593 }
594
595 return nil, merrors.InternalServerError("go.micro.client",
596 "error getting next %s node: %s",
597 service,
598 err.Error())
599 }
600
601 stream, err := r.stream(ctx, node, request, callOpts)
602 r.opts.Selector.Mark(service, node, err)
603
604 return stream, err
605 }
606
607 type response struct {
608 stream Stream
609 err error
610 }
611

Callers

nothing calls this directly

Calls 8

nextMethod · 0.95
streamMethod · 0.95
ProxyFunction · 0.92
DoneMethod · 0.80
ErrorMethod · 0.65
ServiceMethod · 0.65
MarkMethod · 0.65
SleepMethod · 0.45

Tested by

no test coverage detected