MCPcopy Index your code
hub / github.com/the-open-agent/openagent / readSSEStream

Method readSSEStream

model/opencode.go:238–400  ·  view source on GitHub ↗
(ctx context.Context, sessionID string, question string, writer io.Writer, flusher http.Flusher, ready chan<- struct{})

Source from the content-addressed store, hash-verified

236}
237
238func (p *OpenCodeProvider) readSSEStream(ctx context.Context, sessionID string, question string, writer io.Writer, flusher http.Flusher, ready chan<- struct{}) (*ModelResult, error) {
239 // OpenCode only exposes global SSE endpoints (/event and /global/event).
240 // There is no session-scoped SSE endpoint, so we filter by sessionID
241 // client-side. This is the intended design — events include a sessionID
242 // field for this purpose.
243 req, err := http.NewRequest("GET", p.serverUrl+"/event", nil)
244 if err != nil {
245 return nil, err
246 }
247
248 p.setAuth(req)
249 req.Header.Set("Accept", "text/event-stream")
250 req.Header.Set("Cache-Control", "no-cache")
251
252 sseCtx, cancel := context.WithTimeout(ctx, 1200*time.Second)
253 defer cancel()
254 req = req.WithContext(sseCtx)
255
256 resp, err := p.client.Do(req)
257 if err != nil {
258 close(ready) // unblock caller even on failure
259 return nil, fmt.Errorf("OpenCode: failed to connect to SSE stream: %v", err)
260 }
261 defer resp.Body.Close()
262
263 // Signal caller that SSE connection is established
264 close(ready)
265
266 if resp.StatusCode != http.StatusOK {
267 body, _ := io.ReadAll(resp.Body)
268 return nil, fmt.Errorf("OpenCode: SSE HTTP %d: %s", resp.StatusCode, string(body))
269 }
270
271 var fullText strings.Builder
272 var promptTokens, completionTokens int
273 done := false
274 partTypes := make(map[string]string) // partID -> type ("reasoning" or "text")
275
276 scanner := bufio.NewScanner(resp.Body)
277 scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
278
279 for scanner.Scan() {
280 line := scanner.Text()
281
282 if !strings.HasPrefix(line, "data: ") {
283 continue
284 }
285
286 data := strings.TrimPrefix(line, "data: ")
287 if data == "" || data == "[DONE]" {
288 continue
289 }
290
291 event := p.parseSSEEvent(data)
292
293 if event.Properties == nil {
294 continue
295 }

Callers 1

QueryTextMethod · 0.95

Calls 6

setAuthMethod · 0.95
parseSSEEventMethod · 0.95
GetTokenSizeFunction · 0.85
CloseMethod · 0.45
FlushMethod · 0.45
StringMethod · 0.45

Tested by

no test coverage detected