(self)
| 141 | other_ioloop.close() |
| 142 | |
| 143 | def test_add_callback_while_closing(self): |
| 144 | # add_callback should not fail if it races with another thread |
| 145 | # closing the IOLoop. The callbacks are dropped silently |
| 146 | # without executing. |
| 147 | closing = threading.Event() |
| 148 | |
| 149 | def target(): |
| 150 | other_ioloop.add_callback(other_ioloop.stop) |
| 151 | other_ioloop.start() |
| 152 | closing.set() |
| 153 | other_ioloop.close(all_fds=True) |
| 154 | |
| 155 | other_ioloop = IOLoop(make_current=False) |
| 156 | thread = threading.Thread(target=target) |
| 157 | thread.start() |
| 158 | closing.wait() |
| 159 | for i in range(1000): |
| 160 | other_ioloop.add_callback(lambda: None) |
| 161 | |
| 162 | @skipIfNonUnix # just because socketpair is so convenient |
| 163 | def test_read_while_writeable(self): |
nothing calls this directly
no test coverage detected