MCPcopy Index your code
hub / github.com/celery/celery / test_loop_when_socket_error

Method test_loop_when_socket_error

t/unit/worker/test_worker.py:324–343  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

322 c.loop(*c.loop_args())
323
324 def test_loop_when_socket_error(self):
325
326 class Connection(self.app.connection_for_read().__class__):
327 obj = None
328
329 def drain_events(self, **kwargs):
330 self.obj.connection = None
331 raise OSError('foo')
332
333 c = self.LoopConsumer()
334 c.blueprint.state = RUN
335 conn = c.connection = Connection(self.app.conf.broker_url)
336 c.connection.obj = c
337 c.qos = QoS(c.task_consumer.qos, 10)
338 with pytest.raises(socket.error):
339 c.loop(*c.loop_args())
340
341 c.blueprint.state = CLOSE
342 c.connection = conn
343 c.loop(*c.loop_args())
344
345 def test_loop(self):
346

Callers

nothing calls this directly

Calls 5

LoopConsumerMethod · 0.95
loopMethod · 0.80
loop_argsMethod · 0.80
ConnectionClass · 0.70
raisesMethod · 0.45

Tested by

no test coverage detected