MCPcopy Index your code
hub / github.com/codeaashu/claude-code / parseStream

Function parseStream

web/lib/api/stream.ts:18–81  ·  view source on GitHub ↗
(
  response: Response,
  opts?: ParseStreamOptions
)

Source from the content-addressed store, hash-verified

16 * events are never dropped regardless of how fast they arrive.
17 */
18export async function* parseStream(
19 response: Response,
20 opts?: ParseStreamOptions
21): AsyncGenerator<StreamEvent> {
22 const reader = response.body?.getReader();
23 if (!reader) throw new ApiError(0, "No response body", "network");
24
25 const decoder = new TextDecoder();
26 let buffer = "";
27 const startTime = Date.now();
28 let tokensReceived = 0;
29
30 try {
31 while (true) {
32 if (opts?.signal?.aborted) break;
33
34 const { done, value } = await reader.read();
35 if (done) break;
36
37 buffer += decoder.decode(value, { stream: true });
38
39 // Process all complete lines. The last (potentially incomplete) line
40 // stays in the buffer.
41 const lines = buffer.split("\n");
42 buffer = lines.pop() ?? "";
43
44 for (const line of lines) {
45 if (!line.trim()) continue; // blank line = SSE event separator
46 if (!line.startsWith("data: ")) continue;
47
48 const data = line.slice(6).trim();
49 if (data === "[DONE]") return;
50
51 try {
52 const event = JSON.parse(data) as StreamEvent;
53
54 if (
55 event.type === "content_block_delta" &&
56 event.delta.type === "text_delta"
57 ) {
58 tokensReceived += event.delta.text.length;
59 }
60
61 opts?.onProgress?.({
62 tokensReceived,
63 elapsedMs: Date.now() - startTime,
64 isComplete: false,
65 });
66
67 yield event;
68 } catch {
69 // skip malformed JSON — keep going
70 }
71 }
72 }
73 } finally {
74 reader.releaseLock();
75 opts?.onProgress?.({

Callers 2

streamRequestFunction · 0.90
fetchSSEFunction · 0.85

Calls 4

decodeMethod · 0.80
popMethod · 0.80
readMethod · 0.65
parseMethod · 0.45

Tested by

no test coverage detected