MCPcopy
hub / github.com/microsoft/agent-lightning / merge_anthropic_streaming

Function merge_anthropic_streaming

tests/llm_proxy/test_stream.py:71–178  ·  view source on GitHub ↗

Merge chunks from Anthropic streaming into a single message dict. Returns a dict with keys: - role: "assistant" - content_text: full content text (concatenated) - tool_calls: list of dicts { name, input } if any

(chunks: Iterator[Dict[str, Any]])

Source from the content-addressed store, hash-verified

69
70
71def merge_anthropic_streaming(chunks: Iterator[Dict[str, Any]]) -> Dict[str, Any]:
72 """
73 Merge chunks from Anthropic streaming into a single message dict.
74
75 Returns a dict with keys:
76 - role: "assistant"
77 - content_text: full content text (concatenated)
78 - tool_calls: list of dicts { name, input } if any
79 """
80 role: Optional[str] = None
81 content_text_parts: List[str] = []
82 tool_calls: List[Dict[str, Any]] = []
83 current_tool: Optional[Dict[str, Any]] = None
84 current_tool_input_str: Optional[str] = None
85
86 for chunk in chunks:
87 # role
88 if role is None and "role" in chunk:
89 role = chunk["role"]
90
91 # handle content_block style (fine-grained)
92 typ = chunk.get("type")
93 if typ == "content_block_start":
94 block = chunk.get("content_block", {})
95 if block.get("type") == "tool_use":
96 # finish previous tool if exists
97 if current_tool is not None:
98 try:
99 input_obj = json.loads(current_tool_input_str or "")
100 except Exception:
101 input_obj = current_tool_input_str
102 current_tool["input"] = input_obj
103 tool_calls.append(current_tool)
104 current_tool = {"name": block.get("name"), "id": block.get("id"), "input": None}
105 current_tool_input_str = ""
106 continue
107
108 if typ == "content_block_delta":
109 delta = chunk.get("delta", {})
110 dtyp = delta.get("type")
111 if dtyp == "input_json_delta":
112 current_tool_input_str = (current_tool_input_str or "") + delta.get("partial_json", "")
113 elif dtyp == "text_delta":
114 content_text_parts.append(delta.get("text", ""))
115 continue
116
117 if typ == "content_block_stop":
118 if current_tool is not None:
119 try:
120 input_obj = json.loads(current_tool_input_str or "")
121 except Exception:
122 input_obj = current_tool_input_str
123 current_tool["input"] = input_obj
124 tool_calls.append(current_tool)
125 current_tool = None
126 current_tool_input_str = None
127 continue
128

Calls 2

getMethod · 0.45
updateMethod · 0.45

Tested by

no test coverage detected