| 375 | ) |
| 376 | |
| 377 | async def server_event(self, event: events.Event) -> None: |
| 378 | # server_event is supposed to be completely sync without any `await` that could pause execution. |
| 379 | # However, create_task with an [eager task factory] will schedule tasks immediately, |
| 380 | # which causes [reentrancy issues]. So we put the entire thing behind a lock. |
| 381 | # |
| 382 | # [eager task factory]: https://docs.python.org/3/library/asyncio-task.html#eager-task-factory |
| 383 | # [reentrancy issues]: https://github.com/mitmproxy/mitmproxy/issues/7027. |
| 384 | async with self._server_event_lock: |
| 385 | # No `await` beyond this point. |
| 386 | |
| 387 | self.timeout_watchdog.register_activity() |
| 388 | try: |
| 389 | layer_commands = self.layer.handle_event(event) |
| 390 | for command in layer_commands: |
| 391 | if isinstance(command, commands.OpenConnection): |
| 392 | assert command.connection not in self.transports |
| 393 | handler = asyncio_utils.create_task( |
| 394 | self.open_connection(command), |
| 395 | name=f"server connection handler {command.connection.address}", |
| 396 | keep_ref=False, |
| 397 | client=self.client.peername, |
| 398 | ) |
| 399 | self.transports[command.connection] = ConnectionIO( |
| 400 | handler=handler |
| 401 | ) |
| 402 | elif isinstance(command, commands.RequestWakeup): |
| 403 | task = asyncio_utils.create_task( |
| 404 | self.wakeup(command), |
| 405 | name=f"wakeup timer ({command.delay:.1f}s)", |
| 406 | keep_ref=False, |
| 407 | client=self.client.peername, |
| 408 | ) |
| 409 | assert task is not None |
| 410 | self.wakeup_timer.add(task) |
| 411 | elif ( |
| 412 | isinstance(command, commands.ConnectionCommand) |
| 413 | and command.connection not in self.transports |
| 414 | ): |
| 415 | pass # The connection has already been closed. |
| 416 | elif isinstance(command, commands.SendData): |
| 417 | writer = self.transports[command.connection].writer |
| 418 | assert writer |
| 419 | if not writer.is_closing(): |
| 420 | writer.write(command.data) |
| 421 | elif isinstance(command, commands.CloseTcpConnection): |
| 422 | self.close_connection(command.connection, command.half_close) |
| 423 | elif isinstance(command, commands.CloseConnection): |
| 424 | self.close_connection(command.connection, False) |
| 425 | elif isinstance(command, commands.StartHook): |
| 426 | asyncio_utils.create_task( |
| 427 | self.hook_task(command), |
| 428 | name=f"handle_hook({command.name})", |
| 429 | keep_ref=True, |
| 430 | client=self.client.peername, |
| 431 | ) |
| 432 | elif isinstance(command, commands.Log): |
| 433 | self.log(command.message, command.level) |
| 434 | else: |