(self, message: ASGISendEvent)
| 460 | |
| 461 | # ASGI interface |
| 462 | async def send(self, message: ASGISendEvent) -> None: |
| 463 | if self.flow.write_paused and not self.disconnected: |
| 464 | await self.flow.drain() # pragma: full coverage |
| 465 | |
| 466 | if self.disconnected: |
| 467 | return # pragma: full coverage |
| 468 | |
| 469 | if not self.response_started: |
| 470 | # Sending response status line and headers |
| 471 | if message["type"] != "http.response.start": |
| 472 | raise RuntimeError(f"Expected ASGI message 'http.response.start', but got '{message['type']}'.") |
| 473 | |
| 474 | self.response_started = True |
| 475 | self.waiting_for_100_continue = False |
| 476 | |
| 477 | status_code = message["status"] |
| 478 | headers = self.default_headers + list(message.get("headers", [])) |
| 479 | |
| 480 | if CLOSE_HEADER in self.scope["headers"] and CLOSE_HEADER not in headers: |
| 481 | headers = headers + [CLOSE_HEADER] |
| 482 | |
| 483 | if self.access_log: |
| 484 | self.access_logger.info( |
| 485 | '%s - "%s %s HTTP/%s" %d', |
| 486 | get_client_addr(self.scope), |
| 487 | self.scope["method"], |
| 488 | get_path_with_query_string(self.scope), |
| 489 | self.scope["http_version"], |
| 490 | status_code, |
| 491 | ) |
| 492 | |
| 493 | # Write response status line and headers |
| 494 | content = [STATUS_LINE[status_code]] |
| 495 | |
| 496 | for name, value in headers: |
| 497 | if HEADER_RE.search(name): |
| 498 | raise RuntimeError("Invalid HTTP header name.") # pragma: full coverage |
| 499 | if HEADER_VALUE_RE.search(value): |
| 500 | raise RuntimeError("Invalid HTTP header value.") |
| 501 | |
| 502 | name = name.lower() |
| 503 | if name == b"content-length" and self.chunked_encoding is None: |
| 504 | self.expected_content_length = int(value.decode()) |
| 505 | self.chunked_encoding = False |
| 506 | elif name == b"transfer-encoding" and value.lower() == b"chunked": |
| 507 | self.expected_content_length = 0 |
| 508 | self.chunked_encoding = True |
| 509 | elif name == b"connection" and value.lower() == b"close": |
| 510 | self.keep_alive = False |
| 511 | content.extend([name, b": ", value, b"\r\n"]) |
| 512 | |
| 513 | if self.chunked_encoding is None and self.scope["method"] != "HEAD" and status_code not in (204, 304): |
| 514 | # Neither content-length nor transfer-encoding specified |
| 515 | self.chunked_encoding = True |
| 516 | content.append(b"transfer-encoding: chunked\r\n") |
| 517 | |
| 518 | content.append(b"\r\n") |
| 519 | self.transport.write(b"".join(content)) |
no test coverage detected