Called by worker threads whenever they're done processing individual messages, signaling that each message is ready to be acked or rejected.
(self, message: MessageProxy)
| 398 | self.post_process_message(message) |
| 399 | |
| 400 | def post_process_message(self, message: MessageProxy) -> None: |
| 401 | """Called by worker threads whenever they're done processing |
| 402 | individual messages, signaling that each message is ready to |
| 403 | be acked or rejected. |
| 404 | """ |
| 405 | if TYPE_CHECKING: |
| 406 | assert self.consumer |
| 407 | while True: |
| 408 | try: |
| 409 | if message.failed: |
| 410 | self.logger.debug("Rejecting message %r.", message.message_id) |
| 411 | self.broker.emit_before("nack", message) |
| 412 | self.consumer.nack(message) |
| 413 | self.broker.emit_after("nack", message) |
| 414 | |
| 415 | else: |
| 416 | self.logger.debug("Acknowledging message %r.", message.message_id) |
| 417 | self.broker.emit_before("ack", message) |
| 418 | self.consumer.ack(message) |
| 419 | self.broker.emit_after("ack", message) |
| 420 | |
| 421 | return |
| 422 | |
| 423 | # This applies to the Redis broker. The alternative to |
| 424 | # constantly retrying would be to give up here and let the |
| 425 | # message be re-processed after the worker is eventually |
| 426 | # stopped or restarted, but we'd be doing the same work |
| 427 | # twice in that case and the behaviour would surprise |
| 428 | # users who don't deploy frequently. |
| 429 | except BrokerConnectionError as e: |
| 430 | self.logger.warning( |
| 431 | "Failed to post_process_message(%s) due to a connection error: %s\n" |
| 432 | "The operation will be retried in %s seconds until the connection recovers.\n" |
| 433 | "If you restart this worker before this operation succeeds, the message will be re-processed later.", |
| 434 | message, |
| 435 | e, |
| 436 | POST_PROCESS_MESSAGE_RETRY_DELAY_SECS, |
| 437 | ) |
| 438 | |
| 439 | time.sleep(POST_PROCESS_MESSAGE_RETRY_DELAY_SECS) |
| 440 | continue |
| 441 | |
| 442 | # Not much point retrying here so we bail. Most likely, |
| 443 | # the message will be re-run after the worker is stopped |
| 444 | # or restarted (because its ack lease will have expired). |
| 445 | except Exception: # pragma: no cover |
| 446 | self.logger.exception( |
| 447 | "Unhandled error during post_process_message(%s). You've found a bug in Dramatiq. Please report it!\n" |
| 448 | "Although your message has been processed, it will be processed again once this worker is restarted.", |
| 449 | message, |
| 450 | ) |
| 451 | |
| 452 | return |
| 453 | |
| 454 | def requeue_messages(self, messages: Iterable[MessageProxy]) -> None: |
| 455 | """Called on worker shutdown and whenever there is a |
no test coverage detected