| 94 | } |
| 95 | |
| 96 | func (dc *dataNodeQueryClientImpl) queryRaw(ctx context.Context, requestID string, host topology.Host, query queryCom.AQLQuery, hll bool) (bs []byte, err error) { |
| 97 | var u *url.URL |
| 98 | if err = ctx.Err(); err != nil { |
| 99 | return |
| 100 | } |
| 101 | if host == nil { |
| 102 | err = utils.StackError(nil, "host is nil") |
| 103 | return |
| 104 | } |
| 105 | u, err = url.Parse(host.Address()) |
| 106 | if err != nil { |
| 107 | return |
| 108 | } |
| 109 | u.Scheme = "http" |
| 110 | u.Path = "/query/aql" |
| 111 | q := u.Query() |
| 112 | q.Set("dataonly", "1") |
| 113 | u.RawQuery = q.Encode() |
| 114 | |
| 115 | aqlRequestBody := aqlRequestBody{ |
| 116 | []queryCom.AQLQuery{query}, |
| 117 | } |
| 118 | var bodyBytes []byte |
| 119 | bodyBytes, err = json.Marshal(aqlRequestBody) |
| 120 | if err != nil { |
| 121 | return |
| 122 | } |
| 123 | var req *http.Request |
| 124 | req, err = http.NewRequest(http.MethodPost, u.String(), bytes.NewBuffer(bodyBytes)) |
| 125 | if err != nil { |
| 126 | return |
| 127 | } |
| 128 | |
| 129 | req.Header.Add(requestIDHeaderKey, requestID) |
| 130 | if hll { |
| 131 | req.Header.Add(utils.HTTPAcceptTypeHeaderKey, utils.HTTPContentTypeHyperLogLog) |
| 132 | } |
| 133 | |
| 134 | req = req.WithContext(ctx) |
| 135 | var res *http.Response |
| 136 | res, err = dc.client.Do(req) |
| 137 | if res != nil { |
| 138 | defer res.Body.Close() |
| 139 | } |
| 140 | if err != nil { |
| 141 | utils.GetLogger().With("err", err).Error("error connecting to datanode") |
| 142 | err = ErrFailedToConnect |
| 143 | return |
| 144 | } |
| 145 | if res.StatusCode != http.StatusOK { |
| 146 | err = errors.New(fmt.Sprintf("got status code %d from datanode", res.StatusCode)) |
| 147 | return |
| 148 | } |
| 149 | bs, err = ReadAll(res.Body) |
| 150 | if err != nil { |
| 151 | bs = nil |
| 152 | } |
| 153 | |