(sock)
| 206 | auth = requests.auth.HTTPDigestAuth('user', 'pass') |
| 207 | |
| 208 | def digest_failed_response_handler(sock): |
| 209 | # Respond to initial GET with a challenge. |
| 210 | request_content = consume_socket_content(sock, timeout=0.5) |
| 211 | assert request_content.startswith(b"GET / HTTP/1.1") |
| 212 | sock.send(text_401) |
| 213 | |
| 214 | # Verify we receive an Authorization header in response, then |
| 215 | # challenge again. |
| 216 | request_content = consume_socket_content(sock, timeout=0.5) |
| 217 | assert expected_digest in request_content |
| 218 | sock.send(text_401) |
| 219 | |
| 220 | # Verify the client didn't respond to second challenge. |
| 221 | request_content = consume_socket_content(sock, timeout=0.5) |
| 222 | assert request_content == b'' |
| 223 | |
| 224 | return request_content |
| 225 | |
| 226 | close_server = threading.Event() |
| 227 | server = Server(digest_failed_response_handler, wait_to_close_event=close_server) |
nothing calls this directly
no test coverage detected
searching dependent graphs…