MCPcopy
hub / github.com/Bogdanp/dramatiq / post_process_message

Method post_process_message

dramatiq/worker.py:400–452  ·  view source on GitHub ↗

Called by worker threads whenever they're done processing individual messages, signaling that each message is ready to be acked or rejected.

(self, message: MessageProxy)

Source from the content-addressed store, hash-verified

398 self.post_process_message(message)
399
400 def post_process_message(self, message: MessageProxy) -> None:
401 """Called by worker threads whenever they're done processing
402 individual messages, signaling that each message is ready to
403 be acked or rejected.
404 """
405 if TYPE_CHECKING:
406 assert self.consumer
407 while True:
408 try:
409 if message.failed:
410 self.logger.debug("Rejecting message %r.", message.message_id)
411 self.broker.emit_before("nack", message)
412 self.consumer.nack(message)
413 self.broker.emit_after("nack", message)
414
415 else:
416 self.logger.debug("Acknowledging message %r.", message.message_id)
417 self.broker.emit_before("ack", message)
418 self.consumer.ack(message)
419 self.broker.emit_after("ack", message)
420
421 return
422
423 # This applies to the Redis broker. The alternative to
424 # constantly retrying would be to give up here and let the
425 # message be re-processed after the worker is eventually
426 # stopped or restarted, but we'd be doing the same work
427 # twice in that case and the behaviour would surprise
428 # users who don't deploy frequently.
429 except BrokerConnectionError as e:
430 self.logger.warning(
431 "Failed to post_process_message(%s) due to a connection error: %s\n"
432 "The operation will be retried in %s seconds until the connection recovers.\n"
433 "If you restart this worker before this operation succeeds, the message will be re-processed later.",
434 message,
435 e,
436 POST_PROCESS_MESSAGE_RETRY_DELAY_SECS,
437 )
438
439 time.sleep(POST_PROCESS_MESSAGE_RETRY_DELAY_SECS)
440 continue
441
442 # Not much point retrying here so we bail. Most likely,
443 # the message will be re-run after the worker is stopped
444 # or restarted (because its ack lease will have expired).
445 except Exception: # pragma: no cover
446 self.logger.exception(
447 "Unhandled error during post_process_message(%s). You've found a bug in Dramatiq. Please report it!\n"
448 "Although your message has been processed, it will be processed again once this worker is restarted.",
449 message,
450 )
451
452 return
453
454 def requeue_messages(self, messages: Iterable[MessageProxy]) -> None:
455 """Called on worker shutdown and whenever there is a

Callers 3

handle_messageMethod · 0.95
process_messageMethod · 0.80

Calls 4

emit_beforeMethod · 0.80
emit_afterMethod · 0.80
nackMethod · 0.45
ackMethod · 0.45

Tested by

no test coverage detected