(self)
| 294 | self.delay_queue: PriorityQueue[_WorkQueueItem] = PriorityQueue() |
| 295 | |
| 296 | def run(self) -> None: |
| 297 | self.logger.debug("Running consumer thread...") |
| 298 | self.running = True |
| 299 | self.broker.emit_after("consumer_thread_boot", self) |
| 300 | while self.running: |
| 301 | if self.paused: |
| 302 | self.logger.debug("Consumer is paused. Sleeping for %.02fms...", self.worker_timeout) |
| 303 | self.paused_event.set() |
| 304 | time.sleep(self.worker_timeout / 1000) |
| 305 | continue |
| 306 | |
| 307 | try: |
| 308 | self.consumer = self.broker.consume( |
| 309 | queue_name=self.queue_name, |
| 310 | prefetch=self.prefetch, |
| 311 | timeout=self.worker_timeout, |
| 312 | ) |
| 313 | |
| 314 | for message in self.consumer: |
| 315 | if message is not None: |
| 316 | self.handle_message(message) |
| 317 | |
| 318 | elif self.paused: |
| 319 | break |
| 320 | |
| 321 | self.handle_delayed_messages() |
| 322 | if not self.running: |
| 323 | break |
| 324 | |
| 325 | except BrokerConnectionError as e: |
| 326 | self.logger.critical("Consumer encountered a connection error: %s", e) |
| 327 | self.delay_queue = PriorityQueue() |
| 328 | |
| 329 | except Exception: |
| 330 | self.logger.critical("Consumer encountered an unexpected error.", exc_info=True) |
| 331 | # Avoid leaving any open file descriptors around when |
| 332 | # an exception occurs. |
| 333 | self.close() |
| 334 | |
| 335 | # While the consumer is running (i.e. hasn't been shut down), |
| 336 | # try to restart it once a second. |
| 337 | if self.running: |
| 338 | self.logger.info("Restarting consumer in %0.2f seconds.", CONSUMER_RESTART_DELAY_SECS) |
| 339 | self.close() |
| 340 | time.sleep(CONSUMER_RESTART_DELAY_SECS) |
| 341 | |
| 342 | # If it's no longer running, then shut it down gracefully. |
| 343 | self.broker.emit_before("consumer_thread_shutdown", self) |
| 344 | self.logger.debug("Consumer thread stopped.") |
| 345 | |
| 346 | def handle_delayed_messages(self) -> None: |
| 347 | """Enqueue any delayed messages whose eta has passed.""" |
nothing calls this directly
no test coverage detected