GetAccountBalancesStream streams an account balance for a specific exchange
(r *gctrpc.GetAccountBalancesRequest, stream gctrpc.GoCryptoTraderService_GetAccountBalancesStreamServer)
| 663 | |
| 664 | // GetAccountBalancesStream streams an account balance for a specific exchange |
| 665 | func (s *RPCServer) GetAccountBalancesStream(r *gctrpc.GetAccountBalancesRequest, stream gctrpc.GoCryptoTraderService_GetAccountBalancesStreamServer) error { |
| 666 | assetType, err := asset.New(r.AssetType) |
| 667 | if err != nil { |
| 668 | return err |
| 669 | } |
| 670 | |
| 671 | exch, err := s.GetExchangeByName(r.Exchange) |
| 672 | if err != nil { |
| 673 | return err |
| 674 | } |
| 675 | |
| 676 | err = checkParams(r.Exchange, exch, assetType, currency.EMPTYPAIR) |
| 677 | if err != nil { |
| 678 | return err |
| 679 | } |
| 680 | |
| 681 | pipe, err := exch.SubscribeAccountBalances() |
| 682 | if err != nil { |
| 683 | return err |
| 684 | } |
| 685 | |
| 686 | defer func() { |
| 687 | pipeErr := pipe.Release() |
| 688 | if pipeErr != nil { |
| 689 | log.Errorln(log.DispatchMgr, pipeErr) |
| 690 | } |
| 691 | }() |
| 692 | init := make(chan struct{}, 1) |
| 693 | init <- struct{}{} |
| 694 | |
| 695 | for { |
| 696 | select { |
| 697 | case <-stream.Context().Done(): |
| 698 | return stream.Context().Err() |
| 699 | case _, ok := <-pipe.Channel(): |
| 700 | if !ok { |
| 701 | return errDispatchSystem |
| 702 | } |
| 703 | case <-init: |
| 704 | } |
| 705 | |
| 706 | subAccts, err := exch.GetCachedSubAccounts(stream.Context(), assetType) |
| 707 | if err != nil { |
| 708 | return err |
| 709 | } |
| 710 | |
| 711 | if err := stream.Send(accountBalanceResp(r.Exchange, subAccts)); err != nil { |
| 712 | return err |
| 713 | } |
| 714 | } |
| 715 | } |
| 716 | |
| 717 | // GetConfig returns the bots config |
| 718 | func (s *RPCServer) GetConfig(_ context.Context, _ *gctrpc.GetConfigRequest) (*gctrpc.GetConfigResponse, error) { |
nothing calls this directly
no test coverage detected