(stub_broker, stub_worker, result_backend)
| 115 | |
| 116 | |
| 117 | def test_pipelines_expose_completion_stats(stub_broker, stub_worker, result_backend): |
| 118 | # Given a result backend |
| 119 | # And a broker with the results middleware |
| 120 | stub_broker.add_middleware(Results(backend=result_backend)) |
| 121 | |
| 122 | # And an actor that waits some amount of time |
| 123 | condition = Condition() |
| 124 | |
| 125 | @dramatiq.actor(store_results=True) |
| 126 | def wait(n): |
| 127 | time.sleep(n) |
| 128 | with condition: |
| 129 | condition.notify_all() |
| 130 | return n |
| 131 | |
| 132 | # When I pipe some messages intended for that actor together and run the pipeline |
| 133 | pipe = wait.message(1) | wait.message() |
| 134 | pipe.run() |
| 135 | |
| 136 | # Then every time a job in the pipeline completes, the completed_count should increase |
| 137 | for count in range(1, len(pipe) + 1): |
| 138 | with condition: |
| 139 | condition.wait(2) |
| 140 | time.sleep(0.1) # give the worker time to set the result |
| 141 | assert pipe.completed_count == count |
| 142 | |
| 143 | # Finally, completed should be true |
| 144 | assert pipe.completed |
| 145 | |
| 146 | |
| 147 | def test_pipelines_can_be_incomplete(stub_broker, result_backend): |
nothing calls this directly
no test coverage detected