MCPcopy
hub / github.com/zeromq/pyzmq / test_poll_base_socket

Function test_poll_base_socket

tests/test_asyncio.py:303–321  ·  view source on GitHub ↗
(sockets)

Source from the content-addressed store, hash-verified

301
302
303async def test_poll_base_socket(sockets):
304 ctx = zmq.Context()
305 url = "inproc://test"
306 a = ctx.socket(zmq.PUSH)
307 b = ctx.socket(zmq.PULL)
308 sockets.extend([a, b])
309 a.bind(url)
310 b.connect(url)
311
312 poller = zaio.Poller()
313 poller.register(b, zmq.POLLIN)
314
315 f = poller.poll(timeout=1000)
316 assert not f.done()
317 a.send_multipart([b"hi", b"there"])
318 evt = await f
319 assert evt == [(b, zmq.POLLIN)]
320 recvd = b.recv_multipart()
321 assert recvd == [b"hi", b"there"]
322
323
324async def test_poll_on_closed_socket(push_pull):

Callers

nothing calls this directly

Calls 9

socketMethod · 0.95
registerMethod · 0.95
pollMethod · 0.95
ContextMethod · 0.45
bindMethod · 0.45
connectMethod · 0.45
doneMethod · 0.45
send_multipartMethod · 0.45
recv_multipartMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…