(
self,
reader: asyncio.StreamReader | mitmproxy_rs.Stream,
writer: asyncio.StreamWriter | mitmproxy_rs.Stream,
options: moptions.Options,
mode: mode_specs.ProxyMode,
)
| 464 | |
| 465 | class LiveConnectionHandler(ConnectionHandler, metaclass=abc.ABCMeta): |
| 466 | def __init__( |
| 467 | self, |
| 468 | reader: asyncio.StreamReader | mitmproxy_rs.Stream, |
| 469 | writer: asyncio.StreamWriter | mitmproxy_rs.Stream, |
| 470 | options: moptions.Options, |
| 471 | mode: mode_specs.ProxyMode, |
| 472 | ) -> None: |
| 473 | client = Client( |
| 474 | transport_protocol=writer.get_extra_info("transport_protocol", "tcp"), |
| 475 | peername=writer.get_extra_info("peername"), |
| 476 | sockname=writer.get_extra_info("sockname"), |
| 477 | timestamp_start=time.time(), |
| 478 | proxy_mode=mode, |
| 479 | state=ConnectionState.OPEN, |
| 480 | ) |
| 481 | context = Context(client, options) |
| 482 | super().__init__(context) |
| 483 | self.transports[client] = ConnectionIO( |
| 484 | handler=None, reader=reader, writer=writer |
| 485 | ) |
| 486 | |
| 487 | |
| 488 | class SimpleConnectionHandler(LiveConnectionHandler): # pragma: no cover |
nothing calls this directly
no test coverage detected