MCPcopy
hub / github.com/Bogdanp/dramatiq / test_pipelines_expose_completion_stats

Function test_pipelines_expose_completion_stats

tests/test_composition.py:117–144  ·  view source on GitHub ↗
(stub_broker, stub_worker, result_backend)

Source from the content-addressed store, hash-verified

115
116
117def test_pipelines_expose_completion_stats(stub_broker, stub_worker, result_backend):
118 # Given a result backend
119 # And a broker with the results middleware
120 stub_broker.add_middleware(Results(backend=result_backend))
121
122 # And an actor that waits some amount of time
123 condition = Condition()
124
125 @dramatiq.actor(store_results=True)
126 def wait(n):
127 time.sleep(n)
128 with condition:
129 condition.notify_all()
130 return n
131
132 # When I pipe some messages intended for that actor together and run the pipeline
133 pipe = wait.message(1) | wait.message()
134 pipe.run()
135
136 # Then every time a job in the pipeline completes, the completed_count should increase
137 for count in range(1, len(pipe) + 1):
138 with condition:
139 condition.wait(2)
140 time.sleep(0.1) # give the worker time to set the result
141 assert pipe.completed_count == count
142
143 # Finally, completed should be true
144 assert pipe.completed
145
146
147def test_pipelines_can_be_incomplete(stub_broker, result_backend):

Callers

nothing calls this directly

Calls 5

ResultsClass · 0.90
add_middlewareMethod · 0.80
messageMethod · 0.80
runMethod · 0.45
waitMethod · 0.45

Tested by

no test coverage detected