MCPcopy
hub / github.com/crowdsecurity/crowdsec / poll

Method poll

pkg/longpollclient/client.go:83–158  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

81}
82
83func (c *LongPollClient) poll(ctx context.Context) error {
84 logger := c.logger.WithField("method", "poll")
85
86 resp, err := c.doQuery(ctx)
87 if err != nil {
88 return err
89 }
90
91 defer resp.Body.Close()
92
93 requestId := resp.Header.Get("X-Amzn-Trace-Id")
94 logger = logger.WithField("request-id", requestId)
95 if resp.StatusCode != http.StatusOK {
96 c.logger.Errorf("unexpected status code: %d", resp.StatusCode)
97 if resp.StatusCode == http.StatusPaymentRequired {
98 bodyContent, err := io.ReadAll(resp.Body)
99 if err != nil {
100 logger.Errorf("failed to read response body: %s", err)
101 return err
102 }
103 logger.Error(string(bodyContent))
104 return errUnauthorized
105 }
106 return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
107 }
108
109 decoder := json.NewDecoder(resp.Body)
110
111 for {
112 select {
113 case <-c.t.Dying():
114 logger.Debugf("dying")
115 close(c.c)
116 return nil
117 case <-ctx.Done():
118 logger.Debugf("context canceled")
119 close(c.c)
120 return ctx.Err()
121 default:
122 var pollResp pollResponse
123 err = decoder.Decode(&pollResp)
124 if err != nil {
125 if errors.Is(err, io.EOF) {
126 logger.Debugf("server closed connection")
127 return nil
128 }
129 return fmt.Errorf("error decoding poll response: %v", err)
130 }
131
132 logger.Tracef("got response: %+v", pollResp)
133
134 if pollResp.ErrorMessage != "" {
135 if pollResp.ErrorMessage == timeoutMessage {
136 logger.Debugf("got timeout message")
137 return nil
138 }
139 return fmt.Errorf("longpoll API error message: %s", pollResp.ErrorMessage)
140 }

Callers 1

pollEventsMethod · 0.95

Calls 6

doQueryMethod · 0.95
ErrMethod · 0.80
TracefMethod · 0.80
ErrorMethod · 0.65
CloseMethod · 0.45
GetMethod · 0.45

Tested by

no test coverage detected