(sockets)
| 301 | |
| 302 | |
| 303 | async def test_poll_base_socket(sockets): |
| 304 | ctx = zmq.Context() |
| 305 | url = "inproc://test" |
| 306 | a = ctx.socket(zmq.PUSH) |
| 307 | b = ctx.socket(zmq.PULL) |
| 308 | sockets.extend([a, b]) |
| 309 | a.bind(url) |
| 310 | b.connect(url) |
| 311 | |
| 312 | poller = zaio.Poller() |
| 313 | poller.register(b, zmq.POLLIN) |
| 314 | |
| 315 | f = poller.poll(timeout=1000) |
| 316 | assert not f.done() |
| 317 | a.send_multipart([b"hi", b"there"]) |
| 318 | evt = await f |
| 319 | assert evt == [(b, zmq.POLLIN)] |
| 320 | recvd = b.recv_multipart() |
| 321 | assert recvd == [b"hi", b"there"] |
| 322 | |
| 323 | |
| 324 | async def test_poll_on_closed_socket(push_pull): |
nothing calls this directly
no test coverage detected
searching dependent graphs…