Handle a connection for its entire lifetime. This means we read until EOF, but then possibly also keep on waiting for our side of the connection to be closed.
(self, connection: Connection)
| 272 | await self.server_event(events.Wakeup(request)) |
| 273 | |
| 274 | async def handle_connection(self, connection: Connection) -> None: |
| 275 | """ |
| 276 | Handle a connection for its entire lifetime. |
| 277 | This means we read until EOF, |
| 278 | but then possibly also keep on waiting for our side of the connection to be closed. |
| 279 | """ |
| 280 | cancelled = None |
| 281 | reader = self.transports[connection].reader |
| 282 | assert reader |
| 283 | while True: |
| 284 | try: |
| 285 | data = await reader.read(65535) |
| 286 | if not data: |
| 287 | raise OSError("Connection closed by peer.") |
| 288 | except OSError: |
| 289 | break |
| 290 | except asyncio.CancelledError as e: |
| 291 | cancelled = e |
| 292 | break |
| 293 | |
| 294 | await self.server_event(events.DataReceived(connection, data)) |
| 295 | |
| 296 | try: |
| 297 | await self.drain_writers() |
| 298 | except asyncio.CancelledError as e: |
| 299 | cancelled = e |
| 300 | break |
| 301 | |
| 302 | if cancelled is None and connection.transport_protocol == "tcp": |
| 303 | # TCP connections can be half-closed. |
| 304 | connection.state &= ~ConnectionState.CAN_READ |
| 305 | else: |
| 306 | connection.state = ConnectionState.CLOSED |
| 307 | |
| 308 | await self.server_event(events.ConnectionClosed(connection)) |
| 309 | |
| 310 | if connection.state is ConnectionState.CAN_WRITE: |
| 311 | # we may still use this connection to *send* stuff, |
| 312 | # even though the remote has closed their side of the connection. |
| 313 | # to make this work we keep this task running and wait for cancellation. |
| 314 | try: |
| 315 | await asyncio.Event().wait() |
| 316 | except asyncio.CancelledError as e: |
| 317 | cancelled = e |
| 318 | |
| 319 | try: |
| 320 | writer = self.transports[connection].writer |
| 321 | assert writer |
| 322 | writer.close() |
| 323 | except OSError: |
| 324 | pass |
| 325 | self.transports.pop(connection) |
| 326 | |
| 327 | if cancelled: |
| 328 | raise cancelled |
| 329 | |
| 330 | async def drain_writers(self): |
| 331 | """ |
no test coverage detected