MCPcopy
hub / github.com/Bogdanp/dramatiq / run

Method run

dramatiq/worker.py:296–344  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

294 self.delay_queue: PriorityQueue[_WorkQueueItem] = PriorityQueue()
295
296 def run(self) -> None:
297 self.logger.debug("Running consumer thread...")
298 self.running = True
299 self.broker.emit_after("consumer_thread_boot", self)
300 while self.running:
301 if self.paused:
302 self.logger.debug("Consumer is paused. Sleeping for %.02fms...", self.worker_timeout)
303 self.paused_event.set()
304 time.sleep(self.worker_timeout / 1000)
305 continue
306
307 try:
308 self.consumer = self.broker.consume(
309 queue_name=self.queue_name,
310 prefetch=self.prefetch,
311 timeout=self.worker_timeout,
312 )
313
314 for message in self.consumer:
315 if message is not None:
316 self.handle_message(message)
317
318 elif self.paused:
319 break
320
321 self.handle_delayed_messages()
322 if not self.running:
323 break
324
325 except BrokerConnectionError as e:
326 self.logger.critical("Consumer encountered a connection error: %s", e)
327 self.delay_queue = PriorityQueue()
328
329 except Exception:
330 self.logger.critical("Consumer encountered an unexpected error.", exc_info=True)
331 # Avoid leaving any open file descriptors around when
332 # an exception occurs.
333 self.close()
334
335 # While the consumer is running (i.e. hasn't been shut down),
336 # try to restart it once a second.
337 if self.running:
338 self.logger.info("Restarting consumer in %0.2f seconds.", CONSUMER_RESTART_DELAY_SECS)
339 self.close()
340 time.sleep(CONSUMER_RESTART_DELAY_SECS)
341
342 # If it's no longer running, then shut it down gracefully.
343 self.broker.emit_before("consumer_thread_shutdown", self)
344 self.logger.debug("Consumer thread stopped.")
345
346 def handle_delayed_messages(self) -> None:
347 """Enqueue any delayed messages whose eta has passed."""

Callers

nothing calls this directly

Calls 6

handle_messageMethod · 0.95
closeMethod · 0.95
emit_afterMethod · 0.80
emit_beforeMethod · 0.80
consumeMethod · 0.45

Tested by

no test coverage detected