()
| 70 | |
| 71 | @pytest.fixture |
| 72 | def event_loop(): |
| 73 | try: |
| 74 | old_loop = asyncio.get_event_loop() |
| 75 | except RuntimeError: |
| 76 | old_loop = None |
| 77 | new_loop = asyncio.new_event_loop() |
| 78 | try: |
| 79 | asyncio.set_event_loop(new_loop) |
| 80 | yield new_loop |
| 81 | finally: |
| 82 | pending = asyncio.all_tasks(new_loop) |
| 83 | for task in pending: |
| 84 | task.cancel() |
| 85 | if pending: |
| 86 | new_loop.run_until_complete( |
| 87 | asyncio.gather(*pending, return_exceptions=True)) |
| 88 | new_loop.run_until_complete(new_loop.shutdown_asyncgens()) |
| 89 | new_loop.stop() |
| 90 | new_loop.close() |
| 91 | asyncio.set_event_loop(old_loop) |
| 92 | |
| 93 | |
| 94 | def _build_sleeping_test_engine(event_loop): |
nothing calls this directly
no test coverage detected