(read_fn: Callable[[int], bytes])
| 663 | |
| 664 | |
| 665 | def _read_loop(read_fn: Callable[[int], bytes]) -> bytes: |
| 666 | chunks = [] |
| 667 | while True: |
| 668 | try: |
| 669 | chunk = read_fn(2**14) # max TLS record size |
| 670 | except SSL.WantReadError: |
| 671 | break |
| 672 | chunks.append(chunk) |
| 673 | return b"".join(chunks) |
| 674 | |
| 675 | |
| 676 | async def handle_client_hello_untrusted( |
no outgoing calls
no test coverage detected
searching dependent graphs…