| 136 | |
| 137 | |
| 138 | class Fingerprint: |
| 139 | HASHFUNC_BY_DIGESTLEN = { |
| 140 | 16: md5, |
| 141 | 20: sha1, |
| 142 | 32: sha256, |
| 143 | } |
| 144 | |
| 145 | def __init__(self, fingerprint: bytes) -> None: |
| 146 | digestlen = len(fingerprint) |
| 147 | hashfunc = self.HASHFUNC_BY_DIGESTLEN.get(digestlen) |
| 148 | if not hashfunc: |
| 149 | raise ValueError("fingerprint has invalid length") |
| 150 | elif hashfunc is md5 or hashfunc is sha1: |
| 151 | raise ValueError("md5 and sha1 are insecure and not supported. Use sha256.") |
| 152 | self._hashfunc = hashfunc |
| 153 | self._fingerprint = fingerprint |
| 154 | |
| 155 | @property |
| 156 | def fingerprint(self) -> bytes: |
| 157 | return self._fingerprint |
| 158 | |
| 159 | def check(self, transport: asyncio.Transport) -> None: |
| 160 | if not transport.get_extra_info("sslcontext"): |
| 161 | return |
| 162 | sslobj = transport.get_extra_info("ssl_object") |
| 163 | cert = sslobj.getpeercert(binary_form=True) |
| 164 | got = self._hashfunc(cert).digest() |
| 165 | if got != self._fingerprint: |
| 166 | host, port, *_ = transport.get_extra_info("peername") |
| 167 | raise ServerFingerprintMismatch(self._fingerprint, got, host, port) |
| 168 | |
| 169 | |
| 170 | if ssl is not None: |
no outgoing calls
searching dependent graphs…