Call LE to get a new signed CA.
(zappa_instance, log=LOGGER, CA=DEFAULT_CA)
| 244 | |
| 245 | |
| 246 | def get_cert(zappa_instance, log=LOGGER, CA=DEFAULT_CA): |
| 247 | """ |
| 248 | Call LE to get a new signed CA. |
| 249 | """ |
| 250 | out = parse_account_key() |
| 251 | header = get_boulder_header(out) |
| 252 | accountkey_json = json.dumps(header["jwk"], sort_keys=True, separators=(",", ":")) |
| 253 | thumbprint = _b64(hashlib.sha256(accountkey_json.encode("utf8")).digest()) |
| 254 | |
| 255 | # find domains |
| 256 | domains = parse_csr() |
| 257 | |
| 258 | # get the certificate domains and expiration |
| 259 | register_account() |
| 260 | |
| 261 | # verify each domain |
| 262 | for domain in domains: |
| 263 | log.info("Verifying {0}...".format(domain)) |
| 264 | |
| 265 | # get new challenge |
| 266 | code, result = _send_signed_request( |
| 267 | CA + "/acme/new-authz", |
| 268 | { |
| 269 | "resource": "new-authz", |
| 270 | "identifier": {"type": "dns", "value": domain}, |
| 271 | }, |
| 272 | ) |
| 273 | if code != 201: |
| 274 | raise ValueError("Error requesting challenges: {0} {1}".format(code, result)) |
| 275 | |
| 276 | challenge = [ch for ch in json.loads(result.decode("utf8"))["challenges"] if ch["type"] == "dns-01"][0] |
| 277 | token = re.sub(r"[^A-Za-z0-9_\-]", "_", challenge["token"]) |
| 278 | keyauthorization = "{0}.{1}".format(token, thumbprint).encode("utf-8") |
| 279 | |
| 280 | # sha256_b64 |
| 281 | digest = _b64(hashlib.sha256(keyauthorization).digest()) |
| 282 | |
| 283 | zone_id = zappa_instance.get_hosted_zone_id_for_domain(domain) |
| 284 | if not zone_id: |
| 285 | raise ValueError("Could not find Zone ID for: " + domain) |
| 286 | zappa_instance.set_dns_challenge_txt(zone_id, domain, digest) # resp is unused |
| 287 | |
| 288 | print("Waiting for DNS to propagate..") |
| 289 | |
| 290 | # What's optimal here? |
| 291 | # import time # double import; import in loop; shadowed import |
| 292 | time.sleep(45) |
| 293 | |
| 294 | # notify challenge are met |
| 295 | code, result = _send_signed_request( |
| 296 | challenge["uri"], |
| 297 | { |
| 298 | "resource": "challenge", |
| 299 | "keyAuthorization": keyauthorization.decode("utf-8"), |
| 300 | }, |
| 301 | ) |
| 302 | if code != 202: |
| 303 | raise ValueError("Error triggering challenge: {0} {1}".format(code, result)) |
no test coverage detected