(self, data: bytes, metadata: Metadata)
| 87 | return "HTTP/3 Frames" |
| 88 | |
| 89 | def prettify(self, data: bytes, metadata: Metadata) -> str: |
| 90 | flow = metadata.flow |
| 91 | tcp_message = metadata.tcp_message |
| 92 | assert isinstance(flow, tcp.TCPFlow) |
| 93 | assert tcp_message |
| 94 | |
| 95 | state = self.connections[flow] |
| 96 | |
| 97 | for message in flow.messages[state.message_count :]: |
| 98 | if message.from_client: |
| 99 | buf = state.client_buf |
| 100 | else: |
| 101 | buf = state.server_buf |
| 102 | buf += message.content |
| 103 | |
| 104 | if state.message_count == 0 and flow.metadata["quic_is_unidirectional"]: |
| 105 | h3_buf = Buffer(data=bytes(buf[:8])) |
| 106 | stream_type = h3_buf.pull_uint_var() |
| 107 | consumed = h3_buf.tell() |
| 108 | del buf[:consumed] |
| 109 | state.frames[0] = [StreamType(stream_type)] |
| 110 | |
| 111 | while True: |
| 112 | h3_buf = Buffer(data=bytes(buf[:16])) |
| 113 | try: |
| 114 | frame_type = h3_buf.pull_uint_var() |
| 115 | frame_size = h3_buf.pull_uint_var() |
| 116 | except BufferReadError: |
| 117 | break |
| 118 | |
| 119 | consumed = h3_buf.tell() |
| 120 | |
| 121 | if len(buf) < consumed + frame_size: |
| 122 | break |
| 123 | |
| 124 | frame_data = bytes(buf[consumed : consumed + frame_size]) |
| 125 | |
| 126 | frame = Frame(frame_type, frame_data) |
| 127 | |
| 128 | state.frames.setdefault(state.message_count, []).append(frame) |
| 129 | |
| 130 | del buf[: consumed + frame_size] |
| 131 | |
| 132 | state.message_count += 1 |
| 133 | |
| 134 | frames = state.frames.get(flow.messages.index(tcp_message), []) |
| 135 | if not frames: |
| 136 | return "" |
| 137 | else: |
| 138 | return "\n\n".join(frame.pretty() for frame in frames) |
| 139 | |
| 140 | def render_priority( |
| 141 | self, |
no test coverage detected