| 53 | } |
| 54 | |
| 55 | func (dc *dataNodeQueryClientImpl) Query(ctx context.Context, requestID string, host topology.Host, query queryCom.AQLQuery, hll bool) (result queryCom.AQLQueryResult, err error) { |
| 56 | var bs []byte |
| 57 | bs, err = dc.queryRaw(ctx, requestID, host, query, hll) |
| 58 | if err != nil { |
| 59 | return |
| 60 | } |
| 61 | |
| 62 | if hll { |
| 63 | var results []queryCom.AQLQueryResult |
| 64 | var errs []error |
| 65 | results, errs, err = queryCom.ParseHLLQueryResults(bs, true) |
| 66 | if err != nil { |
| 67 | utils.GetLogger().With("host", host, "query", query, "error", err, "errors", errs, "hll", hll).Error("datanode query client Query failed") |
| 68 | return |
| 69 | } |
| 70 | if len(results) != 1 { |
| 71 | errors.New(fmt.Sprintf("invalid response from datanode, resp: %s", bs)) |
| 72 | } |
| 73 | result = results[0] |
| 74 | } else { |
| 75 | var respBody aqlRespBody |
| 76 | err = json.Unmarshal(bs, &respBody) |
| 77 | if err != nil || len(respBody.Results) != 1 { |
| 78 | err = errors.New(fmt.Sprintf("invalid response from datanode, resp: %s", bs)) |
| 79 | return |
| 80 | } |
| 81 | result = respBody.Results[0] |
| 82 | } |
| 83 | |
| 84 | utils.GetLogger().With("host", host, "query", query, "result", result, "hll", hll).Debug("datanode query client Query succeeded") |
| 85 | return |
| 86 | } |
| 87 | |
| 88 | func (dc *dataNodeQueryClientImpl) QueryRaw(ctx context.Context, requestID string, host topology.Host, query queryCom.AQLQuery) (bs []byte, err error) { |
| 89 | bs, err = dc.queryRaw(ctx, requestID, host, query, false) |