(self, buf: bytes)
| 403 | self.quic.connect(self.address, now=self.now) |
| 404 | |
| 405 | def write(self, buf: bytes): |
| 406 | self.now = self.now + 0.1 |
| 407 | if self.quic is None: |
| 408 | quic_buf = QuicBuffer(data=buf) |
| 409 | header = pull_quic_header(quic_buf, host_cid_length=8) |
| 410 | self.quic = QuicConnection( |
| 411 | configuration=self.ctx, |
| 412 | original_destination_connection_id=header.destination_cid, |
| 413 | ) |
| 414 | self.quic.receive_datagram(buf, self.address, self.now) |
| 415 | |
| 416 | def read(self) -> bytes: |
| 417 | self.now = self.now + 0.1 |
no outgoing calls
no test coverage detected