MCPcopy
hub / github.com/zeromq/pyzmq / test_subscribe_method

Method test_subscribe_method

tests/test_socket.py:666–685  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

664 assert rcvd == b'hi'
665
666 def test_subscribe_method(self):
667 pub, sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
668 sub.subscribe('prefix')
669 sub.subscribe = 'c'
670 p = zmq.Poller()
671 p.register(sub, zmq.POLLIN)
672 # wait for subscription handshake
673 for i in range(100):
674 pub.send(b'canary')
675 events = p.poll(250)
676 if events:
677 break
678 self.recv(sub)
679 pub.send(b'prefixmessage')
680 msg = self.recv(sub)
681 assert msg == b'prefixmessage'
682 sub.unsubscribe('prefix')
683 pub.send(b'prefixmessage')
684 events = p.poll(1000)
685 assert events == []
686
687 # CI often can't handle how much memory PyPy uses on this test
688 @mark.skipif(

Callers

nothing calls this directly

Calls 7

registerMethod · 0.95
pollMethod · 0.95
subscribeMethod · 0.80
unsubscribeMethod · 0.80
create_bound_pairMethod · 0.45
sendMethod · 0.45
recvMethod · 0.45

Tested by

no test coverage detected