MCPcopy Index your code
hub / github.com/celery/celery / test_loop_ignores_socket_timeout

Method test_loop_ignores_socket_timeout

t/unit/worker/test_worker.py:309–322  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

307 c.timer and c.timer.stop()
308
309 def test_loop_ignores_socket_timeout(self):
310
311 class Connection(self.app.connection_for_read().__class__):
312 obj = None
313
314 def drain_events(self, **kwargs):
315 self.obj.connection = None
316 raise socket.timeout(10)
317
318 c = self.NoopConsumer()
319 c.connection = Connection(self.app.conf.broker_url)
320 c.connection.obj = c
321 c.qos = QoS(c.task_consumer.qos, 10)
322 c.loop(*c.loop_args())
323
324 def test_loop_when_socket_error(self):
325

Callers

nothing calls this directly

Calls 4

NoopConsumerMethod · 0.95
loopMethod · 0.80
loop_argsMethod · 0.80
ConnectionClass · 0.70

Tested by

no test coverage detected