GetTickerStream streams the requested updated ticker
(r *gctrpc.GetTickerStreamRequest, stream gctrpc.GoCryptoTraderService_GetTickerStreamServer)
| 2174 | |
| 2175 | // GetTickerStream streams the requested updated ticker |
| 2176 | func (s *RPCServer) GetTickerStream(r *gctrpc.GetTickerStreamRequest, stream gctrpc.GoCryptoTraderService_GetTickerStreamServer) error { |
| 2177 | if r.Exchange == "" { |
| 2178 | return common.ErrExchangeNameNotSet |
| 2179 | } |
| 2180 | |
| 2181 | if _, err := s.GetExchangeByName(r.Exchange); err != nil { |
| 2182 | return err |
| 2183 | } |
| 2184 | |
| 2185 | a, err := asset.New(r.AssetType) |
| 2186 | if err != nil { |
| 2187 | return err |
| 2188 | } |
| 2189 | |
| 2190 | if r.Pair.String() == "" { |
| 2191 | return errCurrencyPairUnset |
| 2192 | } |
| 2193 | |
| 2194 | if r.AssetType == "" { |
| 2195 | return errAssetTypeUnset |
| 2196 | } |
| 2197 | |
| 2198 | p, err := currency.NewPairFromStrings(r.Pair.Base, r.Pair.Quote) |
| 2199 | if err != nil { |
| 2200 | return err |
| 2201 | } |
| 2202 | |
| 2203 | pipe, err := ticker.SubscribeTicker(r.Exchange, p, a) |
| 2204 | if err != nil { |
| 2205 | return err |
| 2206 | } |
| 2207 | |
| 2208 | defer func() { |
| 2209 | pipeErr := pipe.Release() |
| 2210 | if pipeErr != nil { |
| 2211 | log.Errorln(log.DispatchMgr, pipeErr) |
| 2212 | } |
| 2213 | }() |
| 2214 | |
| 2215 | for { |
| 2216 | data, ok := <-pipe.Channel() |
| 2217 | if !ok { |
| 2218 | return errDispatchSystem |
| 2219 | } |
| 2220 | |
| 2221 | t, ok := data.(*ticker.Price) |
| 2222 | if !ok { |
| 2223 | return common.GetTypeAssertError("*ticker.Price", data) |
| 2224 | } |
| 2225 | |
| 2226 | err := stream.Send(&gctrpc.TickerResponse{ |
| 2227 | Pair: &gctrpc.CurrencyPair{ |
| 2228 | Base: t.Pair.Base.String(), |
| 2229 | Quote: t.Pair.Quote.String(), |
| 2230 | Delimiter: t.Pair.Delimiter, |
| 2231 | }, |
| 2232 | LastUpdated: s.unixTimestamp(t.LastUpdated), |
| 2233 | Last: t.Last, |
nothing calls this directly
no test coverage detected