Merge chunks from Anthropic streaming into a single message dict. Returns a dict with keys: - role: "assistant" - content_text: full content text (concatenated) - tool_calls: list of dicts { name, input } if any
(chunks: Iterator[Dict[str, Any]])
| 69 | |
| 70 | |
| 71 | def merge_anthropic_streaming(chunks: Iterator[Dict[str, Any]]) -> Dict[str, Any]: |
| 72 | """ |
| 73 | Merge chunks from Anthropic streaming into a single message dict. |
| 74 | |
| 75 | Returns a dict with keys: |
| 76 | - role: "assistant" |
| 77 | - content_text: full content text (concatenated) |
| 78 | - tool_calls: list of dicts { name, input } if any |
| 79 | """ |
| 80 | role: Optional[str] = None |
| 81 | content_text_parts: List[str] = [] |
| 82 | tool_calls: List[Dict[str, Any]] = [] |
| 83 | current_tool: Optional[Dict[str, Any]] = None |
| 84 | current_tool_input_str: Optional[str] = None |
| 85 | |
| 86 | for chunk in chunks: |
| 87 | # role |
| 88 | if role is None and "role" in chunk: |
| 89 | role = chunk["role"] |
| 90 | |
| 91 | # handle content_block style (fine-grained) |
| 92 | typ = chunk.get("type") |
| 93 | if typ == "content_block_start": |
| 94 | block = chunk.get("content_block", {}) |
| 95 | if block.get("type") == "tool_use": |
| 96 | # finish previous tool if exists |
| 97 | if current_tool is not None: |
| 98 | try: |
| 99 | input_obj = json.loads(current_tool_input_str or "") |
| 100 | except Exception: |
| 101 | input_obj = current_tool_input_str |
| 102 | current_tool["input"] = input_obj |
| 103 | tool_calls.append(current_tool) |
| 104 | current_tool = {"name": block.get("name"), "id": block.get("id"), "input": None} |
| 105 | current_tool_input_str = "" |
| 106 | continue |
| 107 | |
| 108 | if typ == "content_block_delta": |
| 109 | delta = chunk.get("delta", {}) |
| 110 | dtyp = delta.get("type") |
| 111 | if dtyp == "input_json_delta": |
| 112 | current_tool_input_str = (current_tool_input_str or "") + delta.get("partial_json", "") |
| 113 | elif dtyp == "text_delta": |
| 114 | content_text_parts.append(delta.get("text", "")) |
| 115 | continue |
| 116 | |
| 117 | if typ == "content_block_stop": |
| 118 | if current_tool is not None: |
| 119 | try: |
| 120 | input_obj = json.loads(current_tool_input_str or "") |
| 121 | except Exception: |
| 122 | input_obj = current_tool_input_str |
| 123 | current_tool["input"] = input_obj |
| 124 | tool_calls.append(current_tool) |
| 125 | current_tool = None |
| 126 | current_tool_input_str = None |
| 127 | continue |
| 128 |
no test coverage detected