| 105 | |
| 106 | |
| 107 | class BucketQueue(queue.Queue): |
| 108 | def __init__(self, maxsize): |
| 109 | self.lock = Lock() |
| 110 | self.checked_buckets = list() |
| 111 | self.rate_limited = False |
| 112 | self.next_yield = 0 |
| 113 | |
| 114 | super().__init__(maxsize) |
| 115 | |
| 116 | def put(self, bucket_url): |
| 117 | if bucket_url not in self.checked_buckets: |
| 118 | self.checked_buckets.append(bucket_url) |
| 119 | super().put(bucket_url) |
| 120 | |
| 121 | def get(self): |
| 122 | global THREAD_EVENT |
| 123 | with self.lock: |
| 124 | t = time.monotonic() |
| 125 | if self.rate_limited and t < self.next_yield: |
| 126 | cprint("You have hit the AWS rate limit - slowing down... (tip: enter credentials in config.yaml)", "yellow") |
| 127 | THREAD_EVENT.wait(self.next_yield - t) |
| 128 | t = time.monotonic() |
| 129 | self.rate_limited = False |
| 130 | |
| 131 | self.next_yield = t + RATE_LIMIT_SLEEP |
| 132 | |
| 133 | return super().get() |
| 134 | |
| 135 | |
| 136 | class BucketWorker(Thread): |