| 552 | } |
| 553 | |
| 554 | func (r *rpcClient) Stream(ctx context.Context, request Request, opts ...CallOption) (Stream, error) { |
| 555 | r.mu.RLock() |
| 556 | defer r.mu.RUnlock() |
| 557 | |
| 558 | // make a copy of call opts |
| 559 | callOpts := r.opts.CallOptions |
| 560 | for _, opt := range opts { |
| 561 | opt(&callOpts) |
| 562 | } |
| 563 | |
| 564 | next, err := r.next(request, callOpts) |
| 565 | if err != nil { |
| 566 | return nil, err |
| 567 | } |
| 568 | |
| 569 | select { |
| 570 | case <-ctx.Done(): |
| 571 | return nil, merrors.Timeout("go.micro.client", fmt.Sprintf("%v", ctx.Err())) |
| 572 | default: |
| 573 | } |
| 574 | |
| 575 | call := func(i int) (Stream, error) { |
| 576 | // call backoff first. Someone may want an initial start delay |
| 577 | t, err := callOpts.Backoff(ctx, request, i) |
| 578 | if err != nil { |
| 579 | return nil, merrors.InternalServerError("go.micro.client", "backoff error: %v", err.Error()) |
| 580 | } |
| 581 | |
| 582 | // only sleep if greater than 0 |
| 583 | if t.Seconds() > 0 { |
| 584 | time.Sleep(t) |
| 585 | } |
| 586 | |
| 587 | node, err := next() |
| 588 | service := request.Service() |
| 589 | |
| 590 | if err != nil { |
| 591 | if errors.Is(err, selector.ErrNotFound) { |
| 592 | return nil, merrors.InternalServerError("go.micro.client", "service %s: %s", service, err.Error()) |
| 593 | } |
| 594 | |
| 595 | return nil, merrors.InternalServerError("go.micro.client", |
| 596 | "error getting next %s node: %s", |
| 597 | service, |
| 598 | err.Error()) |
| 599 | } |
| 600 | |
| 601 | stream, err := r.stream(ctx, node, request, callOpts) |
| 602 | r.opts.Selector.Mark(service, node, err) |
| 603 | |
| 604 | return stream, err |
| 605 | } |
| 606 | |
| 607 | type response struct { |
| 608 | stream Stream |
| 609 | err error |
| 610 | } |
| 611 | |