Enqueue a message. Parameters: message(Message): The message to enqueue. delay(int): The minimum amount of time, in milliseconds, to delay the message by. Raises: ConnectionClosed: If the underlying channel or connection has bee
(self, message: Message, *, delay: Optional[int] = None)
| 353 | ) |
| 354 | |
| 355 | def enqueue(self, message: Message, *, delay: Optional[int] = None) -> Message: |
| 356 | """Enqueue a message. |
| 357 | |
| 358 | Parameters: |
| 359 | message(Message): The message to enqueue. |
| 360 | delay(int): The minimum amount of time, in milliseconds, to |
| 361 | delay the message by. |
| 362 | |
| 363 | Raises: |
| 364 | ConnectionClosed: If the underlying channel or connection |
| 365 | has been closed. |
| 366 | """ |
| 367 | canonical_queue_name = message.queue_name |
| 368 | queue_name = canonical_queue_name |
| 369 | |
| 370 | if delay is not None: |
| 371 | queue_name = dq_name(queue_name) |
| 372 | message_eta = current_millis() + delay |
| 373 | message = message.copy( |
| 374 | queue_name=queue_name, |
| 375 | options={ |
| 376 | "eta": message_eta, |
| 377 | }, |
| 378 | ) |
| 379 | |
| 380 | attempts = 0 |
| 381 | while True: |
| 382 | try: |
| 383 | self.declare_queue(canonical_queue_name, ensure=True) |
| 384 | self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name) |
| 385 | self.emit_before("enqueue", message, delay) |
| 386 | self.channel.basic_publish( |
| 387 | exchange="", |
| 388 | routing_key=queue_name, |
| 389 | body=message.encode(), |
| 390 | properties=pika.BasicProperties( |
| 391 | delivery_mode=2, |
| 392 | priority=message.options.get("broker_priority"), |
| 393 | ), |
| 394 | # mandatory flag ensures UnroutableError is raised if message could not be routed to a queue, |
| 395 | # but it only works when confirm_delivery is turned on, so only set it when that is the case. |
| 396 | # https://www.rabbitmq.com/docs/publishers#unroutable |
| 397 | mandatory=self.confirm_delivery, |
| 398 | ) |
| 399 | self.emit_after("enqueue", message, delay) |
| 400 | return message |
| 401 | |
| 402 | except ( |
| 403 | pika.exceptions.AMQPConnectionError, |
| 404 | pika.exceptions.AMQPChannelError, |
| 405 | ) as e: |
| 406 | # Delete the channel and the connection so that the |
| 407 | # next caller/attempt may initiate new ones of each. |
| 408 | del self.connection |
| 409 | |
| 410 | # If the queue disappears, add it to the set of pending queues |
| 411 | # so that it can be redeclared on retry or the next time |
| 412 | # a message is enqueued. |
nothing calls this directly
no test coverage detected