This function determines the Common Name (CN), Subject Alternative Names (SANs) and Organization Name our certificate should have and then fetches a matching cert from the certstore.
(self, conn_context: context.Context)
| 583 | return f"/mitmproxy-{self.certstore.default_ca.serial}.crl" |
| 584 | |
| 585 | def get_cert(self, conn_context: context.Context) -> certs.CertStoreEntry: |
| 586 | """ |
| 587 | This function determines the Common Name (CN), Subject Alternative Names (SANs) and Organization Name |
| 588 | our certificate should have and then fetches a matching cert from the certstore. |
| 589 | """ |
| 590 | altnames: list[x509.GeneralName] = [] |
| 591 | organization: str | None = None |
| 592 | crl_distribution_point: str | None = None |
| 593 | |
| 594 | # Use upstream certificate if available. |
| 595 | if ctx.options.upstream_cert and conn_context.server.certificate_list: |
| 596 | upstream_cert: certs.Cert = conn_context.server.certificate_list[0] |
| 597 | if upstream_cert.cn: |
| 598 | altnames.append(_ip_or_dns_name(upstream_cert.cn)) |
| 599 | altnames.extend(upstream_cert.altnames) |
| 600 | if upstream_cert.organization: |
| 601 | organization = upstream_cert.organization |
| 602 | |
| 603 | # Replace original URL path with the CA cert serial number, which acts as a magic token |
| 604 | if crls := upstream_cert.crl_distribution_points: |
| 605 | try: |
| 606 | scheme, netloc, *_ = urllib.parse.urlsplit(crls[0]) |
| 607 | except ValueError: |
| 608 | logger.info(f"Failed to parse CRL URL: {crls[0]!r}") |
| 609 | else: |
| 610 | # noinspection PyTypeChecker |
| 611 | crl_distribution_point = urllib.parse.urlunsplit( |
| 612 | (scheme, netloc, self.crl_path(), None, None) |
| 613 | ) |
| 614 | |
| 615 | # Add SNI or our local IP address. |
| 616 | if conn_context.client.sni: |
| 617 | altnames.append(_ip_or_dns_name(conn_context.client.sni)) |
| 618 | else: |
| 619 | altnames.append(_ip_or_dns_name(conn_context.client.sockname[0])) |
| 620 | |
| 621 | # If we already know of a server address, include that in the SANs as well. |
| 622 | if conn_context.server.address: |
| 623 | altnames.append(_ip_or_dns_name(conn_context.server.address[0])) |
| 624 | |
| 625 | # only keep first occurrence of each hostname |
| 626 | altnames = list(dict.fromkeys(altnames)) |
| 627 | |
| 628 | # RFC 2818: If a subjectAltName extension of type dNSName is present, that MUST be used as the identity. |
| 629 | # In other words, the Common Name is irrelevant then. |
| 630 | cn = next((str(x.value) for x in altnames), None) |
| 631 | return self.certstore.get_cert( |
| 632 | cn, altnames, organization, crl_distribution_point |
| 633 | ) |
| 634 | |
| 635 | def request(self, flow: http.HTTPFlow): |
| 636 | if not flow.live or flow.error or flow.response: |