(self)
| 149 | super().__init__(*args, **kwargs) |
| 150 | |
| 151 | def run(self): |
| 152 | global THREAD_EVENT |
| 153 | while not THREAD_EVENT.is_set(): |
| 154 | try: |
| 155 | bucket_url = self.q.get() |
| 156 | self.__check_boto( |
| 157 | bucket_url) if self.use_aws else self.__check_http(bucket_url) |
| 158 | except Exception as e: |
| 159 | print(e) |
| 160 | pass |
| 161 | finally: |
| 162 | self.q.task_done() |
| 163 | |
| 164 | def __check_http(self, bucket_url): |
| 165 | check_response = self.session.head( |
nothing calls this directly
no test coverage detected