MCPcopy
hub / github.com/eth0izzle/bucket-stream / CertStreamThread

Class CertStreamThread

bucket-stream.py:68–104  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

66
67
68class CertStreamThread(Thread):
69 def __init__(self, q, *args, **kwargs):
70 self.q = q
71 self.c = CertStreamClient(
72 self.process, skip_heartbeats=True, on_open=None, on_error=None)
73
74 super().__init__(*args, **kwargs)
75
76 def run(self):
77 global THREAD_EVENT
78 while not THREAD_EVENT.is_set():
79 cprint("Waiting for Certstream events - this could take a few minutes to queue up...",
80 "yellow", attrs=["bold"])
81 self.c.run_forever()
82 THREAD_EVENT.wait(10)
83
84 def process(self, message, context):
85 if message["message_type"] == "heartbeat":
86 return
87
88 if message["message_type"] == "certificate_update":
89 all_domains = message["data"]["leaf_cert"]["all_domains"]
90
91 if ARGS.skip_lets_encrypt and "Let's Encrypt" in message["data"]["chain"][0]["subject"]["aggregated"]:
92 return
93
94 for domain in set(all_domains):
95 # cut the crap
96 if not domain.startswith("*.")\
97 and "cloudflaressl" not in domain\
98 and "xn--" not in domain\
99 and domain.count("-") < 4\
100 and domain.count(".") < 4:
101
102 parts = tldextract.extract(domain)
103 for permutation in get_permutations(parts.domain, parts.subdomain):
104 self.q.put(BUCKET_HOST % permutation)
105
106
107class BucketQueue(queue.Queue):

Callers 1

mainFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected