This functions calls the proper renderer depending on the flow type. We also want to cache the renderer output, so we extract all attributes relevant for display and call the render with only that. This assures that rows are updated if the flow is changed.
(
f: flow.Flow,
*,
render_mode: RenderMode,
hostheader: bool = False, # pass options directly if we need more stuff from them
focused: bool = True,
)
| 724 | |
| 725 | |
| 726 | def format_flow( |
| 727 | f: flow.Flow, |
| 728 | *, |
| 729 | render_mode: RenderMode, |
| 730 | hostheader: bool = False, # pass options directly if we need more stuff from them |
| 731 | focused: bool = True, |
| 732 | ) -> urwid.Widget: |
| 733 | """ |
| 734 | This functions calls the proper renderer depending on the flow type. |
| 735 | We also want to cache the renderer output, so we extract all attributes |
| 736 | relevant for display and call the render with only that. This assures that rows |
| 737 | are updated if the flow is changed. |
| 738 | """ |
| 739 | duration: float | None |
| 740 | error_message: str | None |
| 741 | if f.error: |
| 742 | error_message = f.error.msg |
| 743 | else: |
| 744 | error_message = None |
| 745 | |
| 746 | if isinstance(f, (TCPFlow, UDPFlow)): |
| 747 | total_size = 0 |
| 748 | for message in f.messages: |
| 749 | total_size += len(message.content) |
| 750 | if f.messages: |
| 751 | duration = f.messages[-1].timestamp - f.client_conn.timestamp_start |
| 752 | else: |
| 753 | duration = None |
| 754 | if f.client_conn.tls_version == "QUICv1": |
| 755 | protocol = "quic" |
| 756 | else: |
| 757 | protocol = f.type |
| 758 | return format_message_flow( |
| 759 | render_mode=render_mode, |
| 760 | focused=focused, |
| 761 | timestamp_start=f.client_conn.timestamp_start, |
| 762 | marked=f.marked, |
| 763 | protocol=protocol, |
| 764 | client_address=f.client_conn.peername, |
| 765 | server_address=f.server_conn.address, |
| 766 | total_size=total_size, |
| 767 | duration=duration, |
| 768 | error_message=error_message, |
| 769 | ) |
| 770 | elif isinstance(f, DNSFlow): |
| 771 | if f.request.timestamp and f.response and f.response.timestamp: |
| 772 | duration = f.response.timestamp - f.request.timestamp |
| 773 | else: |
| 774 | duration = None |
| 775 | if f.response: |
| 776 | response_code_str: str | None = dns.response_codes.to_str( |
| 777 | f.response.response_code |
| 778 | ) |
| 779 | response_code_http_equiv = dns.response_codes.http_equiv_status_code( |
| 780 | f.response.response_code |
| 781 | ) |
| 782 | answer = ", ".join(str(x) for x in f.response.answers) |
| 783 | else: |
nothing calls this directly
no test coverage detected
searching dependent graphs…