Pop items off a queue and create an event on the Salt event bus to be processed by a Reactor. CLI Example: .. code-block:: bash salt-run queue.process_queue myqueue salt-run queue.process_queue myqueue 6 salt-run queue.process_queue myqueue all backend=sql
(queue, quantity=1, backend="sqlite", is_runner=False)
| 187 | |
| 188 | |
| 189 | def process_queue(queue, quantity=1, backend="sqlite", is_runner=False): |
| 190 | """ |
| 191 | Pop items off a queue and create an event on the Salt event bus to be |
| 192 | processed by a Reactor. |
| 193 | |
| 194 | CLI Example: |
| 195 | |
| 196 | .. code-block:: bash |
| 197 | |
| 198 | salt-run queue.process_queue myqueue |
| 199 | salt-run queue.process_queue myqueue 6 |
| 200 | salt-run queue.process_queue myqueue all backend=sqlite |
| 201 | """ |
| 202 | # get ready to send an event |
| 203 | with get_event( |
| 204 | "master", |
| 205 | __opts__["sock_dir"], |
| 206 | opts=__opts__, |
| 207 | listen=False, |
| 208 | ) as event_bus: |
| 209 | try: |
| 210 | items = pop( |
| 211 | queue=queue, quantity=quantity, backend=backend, is_runner=is_runner |
| 212 | ) |
| 213 | except SaltInvocationError as exc: |
| 214 | error_txt = f"{exc}" |
| 215 | __jid_event__.fire_event({"errors": error_txt}, "progress") |
| 216 | return False |
| 217 | |
| 218 | data = { |
| 219 | "items": items, |
| 220 | "backend": backend, |
| 221 | "queue": queue, |
| 222 | } |
| 223 | event_bus.fire_event(data, tagify([queue, "process"], prefix="queue")) |
| 224 | return data |
| 225 | |
| 226 | |
| 227 | def __get_queue_opts(queue=None, backend=None): |
no test coverage detected