MCPcopy
hub / github.com/eth0izzle/bucket-stream / process

Method process

bucket-stream.py:84–104  ·  view source on GitHub ↗
(self, message, context)

Source from the content-addressed store, hash-verified

82 THREAD_EVENT.wait(10)
83
84 def process(self, message, context):
85 if message["message_type"] == "heartbeat":
86 return
87
88 if message["message_type"] == "certificate_update":
89 all_domains = message["data"]["leaf_cert"]["all_domains"]
90
91 if ARGS.skip_lets_encrypt and "Let's Encrypt" in message["data"]["chain"][0]["subject"]["aggregated"]:
92 return
93
94 for domain in set(all_domains):
95 # cut the crap
96 if not domain.startswith("*.")\
97 and "cloudflaressl" not in domain\
98 and "xn--" not in domain\
99 and domain.count("-") < 4\
100 and domain.count(".") < 4:
101
102 parts = tldextract.extract(domain)
103 for permutation in get_permutations(parts.domain, parts.subdomain):
104 self.q.put(BUCKET_HOST % permutation)
105
106
107class BucketQueue(queue.Queue):

Callers

nothing calls this directly

Calls 2

get_permutationsFunction · 0.85
putMethod · 0.80

Tested by

no test coverage detected