()
| 65 | |
| 66 | |
| 67 | def test_AsyncQueue(): |
| 68 | queue = AsyncQueue() |
| 69 | |
| 70 | # put data to queue sync in a thread |
| 71 | # async get data from queue in the current event loop |
| 72 | # NOTE: the event loop in the two threads are different |
| 73 | |
| 74 | def put_data_to_queue(): |
| 75 | for i in range(10): |
| 76 | time.sleep(0.1) |
| 77 | queue.put(i) |
| 78 | |
| 79 | async def get_data_from_queue(): |
| 80 | for i in range(10): |
| 81 | print(f"get: {queue.get()}") |
| 82 | |
| 83 | thread = threading.Thread(target=put_data_to_queue) |
| 84 | thread.start() |
| 85 | asyncio.run(get_data_from_queue()) |
| 86 | thread.join() |
nothing calls this directly
no test coverage detected