(self)
| 307 | c.timer and c.timer.stop() |
| 308 | |
| 309 | def test_loop_ignores_socket_timeout(self): |
| 310 | |
| 311 | class Connection(self.app.connection_for_read().__class__): |
| 312 | obj = None |
| 313 | |
| 314 | def drain_events(self, **kwargs): |
| 315 | self.obj.connection = None |
| 316 | raise socket.timeout(10) |
| 317 | |
| 318 | c = self.NoopConsumer() |
| 319 | c.connection = Connection(self.app.conf.broker_url) |
| 320 | c.connection.obj = c |
| 321 | c.qos = QoS(c.task_consumer.qos, 10) |
| 322 | c.loop(*c.loop_args()) |
| 323 | |
| 324 | def test_loop_when_socket_error(self): |
| 325 |
nothing calls this directly
no test coverage detected