Create Celery app configured for native delayed delivery testing. Args: unique_id: Unique identifier to ensure queue/exchange names don't conflict Returns: Tuple of (app, exchange_name, queue_a_name, queue_b_name)
(unique_id)
| 80 | |
| 81 | |
| 82 | def create_test_app(unique_id): |
| 83 | """Create Celery app configured for native delayed delivery testing. |
| 84 | |
| 85 | Args: |
| 86 | unique_id: Unique identifier to ensure queue/exchange names don't |
| 87 | conflict |
| 88 | |
| 89 | Returns: |
| 90 | Tuple of (app, exchange_name, queue_a_name, queue_b_name) |
| 91 | """ |
| 92 | broker_url = get_rabbitmq_url() |
| 93 | |
| 94 | # Get Redis backend URL from environment |
| 95 | redis_host = os.environ.get("REDIS_HOST", "localhost") |
| 96 | redis_port = os.environ.get("REDIS_PORT", "6379") |
| 97 | backend_url = os.environ.get( |
| 98 | "TEST_BACKEND", f"redis://{redis_host}:{redis_port}/0") |
| 99 | |
| 100 | app = Celery( |
| 101 | "test_native_delayed_delivery_binding", |
| 102 | broker=broker_url, |
| 103 | backend=backend_url, |
| 104 | ) |
| 105 | |
| 106 | # Configure topic exchange with unique name |
| 107 | exchange_name = f'celery.topic_{unique_id}' |
| 108 | default_exchange = Exchange(exchange_name, type='topic') |
| 109 | |
| 110 | # Define task queues with queue-a first, queue-b second |
| 111 | queue_a_name = f'queue-a_{unique_id}' |
| 112 | queue_b_name = f'queue-b_{unique_id}' |
| 113 | app.conf.task_queues = [ |
| 114 | Queue(queue_a_name, exchange=default_exchange, |
| 115 | routing_key=queue_a_name, |
| 116 | queue_arguments={'x-queue-type': 'quorum'}), |
| 117 | Queue(queue_b_name, exchange=default_exchange, |
| 118 | routing_key=queue_b_name, |
| 119 | queue_arguments={'x-queue-type': 'quorum'}), |
| 120 | ] |
| 121 | |
| 122 | # Recommended setting for using celery with quorum queues |
| 123 | app.conf.broker_transport_options = {"confirm_publish": True} |
| 124 | |
| 125 | # Enable quorum queue detection to disable global QoS |
| 126 | app.conf.worker_detect_quorum_queues = True |
| 127 | |
| 128 | return app, exchange_name, queue_a_name, queue_b_name |
| 129 | |
| 130 | |
| 131 | @pytest.mark.amqp |
no test coverage detected
searching dependent graphs…