MCPcopy
hub / github.com/pytest-dev/pytest-xdist / loop_once

Method loop_once

src/xdist/dsession.py:146–166  ·  view source on GitHub ↗

Process one callback from one of the workers.

(self)

Source from the content-addressed store, hash-verified

144 return True
145
146 def loop_once(self) -> None:
147 """Process one callback from one of the workers."""
148 while 1:
149 if not self._active_nodes:
150 # If everything has died stop looping
151 self.triggershutdown()
152 raise RuntimeError("Unexpectedly no active workers available")
153 try:
154 eventcall = self.queue.get(timeout=2.0)
155 break
156 except Empty:
157 continue
158 callname, kwargs = eventcall
159 assert callname, kwargs
160 method = "worker_" + callname
161 call = getattr(self, method)
162 self.log("calling method", method, kwargs)
163 call(**kwargs)
164 assert self.sched is not None
165 if self.sched.tests_finished:
166 self.triggershutdown()
167
168 #
169 # callbacks for processing events from workers

Callers 1

pytest_runtestloopMethod · 0.95

Calls 2

triggershutdownMethod · 0.95
getMethod · 0.80

Tested by

no test coverage detected