MCPcopy
hub / github.com/Bogdanp/dramatiq / enqueue

Method enqueue

dramatiq/brokers/rabbitmq.py:355–425  ·  view source on GitHub ↗

Enqueue a message. Parameters: message(Message): The message to enqueue. delay(int): The minimum amount of time, in milliseconds, to delay the message by. Raises: ConnectionClosed: If the underlying channel or connection has bee

(self, message: Message, *, delay: Optional[int] = None)

Source from the content-addressed store, hash-verified

353 )
354
355 def enqueue(self, message: Message, *, delay: Optional[int] = None) -> Message:
356 """Enqueue a message.
357
358 Parameters:
359 message(Message): The message to enqueue.
360 delay(int): The minimum amount of time, in milliseconds, to
361 delay the message by.
362
363 Raises:
364 ConnectionClosed: If the underlying channel or connection
365 has been closed.
366 """
367 canonical_queue_name = message.queue_name
368 queue_name = canonical_queue_name
369
370 if delay is not None:
371 queue_name = dq_name(queue_name)
372 message_eta = current_millis() + delay
373 message = message.copy(
374 queue_name=queue_name,
375 options={
376 "eta": message_eta,
377 },
378 )
379
380 attempts = 0
381 while True:
382 try:
383 self.declare_queue(canonical_queue_name, ensure=True)
384 self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name)
385 self.emit_before("enqueue", message, delay)
386 self.channel.basic_publish(
387 exchange="",
388 routing_key=queue_name,
389 body=message.encode(),
390 properties=pika.BasicProperties(
391 delivery_mode=2,
392 priority=message.options.get("broker_priority"),
393 ),
394 # mandatory flag ensures UnroutableError is raised if message could not be routed to a queue,
395 # but it only works when confirm_delivery is turned on, so only set it when that is the case.
396 # https://www.rabbitmq.com/docs/publishers#unroutable
397 mandatory=self.confirm_delivery,
398 )
399 self.emit_after("enqueue", message, delay)
400 return message
401
402 except (
403 pika.exceptions.AMQPConnectionError,
404 pika.exceptions.AMQPChannelError,
405 ) as e:
406 # Delete the channel and the connection so that the
407 # next caller/attempt may initiate new ones of each.
408 del self.connection
409
410 # If the queue disappears, add it to the set of pending queues
411 # so that it can be redeclared on retry or the next time
412 # a message is enqueued.

Callers

nothing calls this directly

Calls 10

declare_queueMethod · 0.95
dq_nameFunction · 0.85
current_millisFunction · 0.85
q_nameFunction · 0.85
ConnectionClosedClass · 0.85
copyMethod · 0.80
emit_beforeMethod · 0.80
emit_afterMethod · 0.80
encodeMethod · 0.45
addMethod · 0.45

Tested by

no test coverage detected