(self)
| 664 | assert rcvd == b'hi' |
| 665 | |
| 666 | def test_subscribe_method(self): |
| 667 | pub, sub = self.create_bound_pair(zmq.PUB, zmq.SUB) |
| 668 | sub.subscribe('prefix') |
| 669 | sub.subscribe = 'c' |
| 670 | p = zmq.Poller() |
| 671 | p.register(sub, zmq.POLLIN) |
| 672 | # wait for subscription handshake |
| 673 | for i in range(100): |
| 674 | pub.send(b'canary') |
| 675 | events = p.poll(250) |
| 676 | if events: |
| 677 | break |
| 678 | self.recv(sub) |
| 679 | pub.send(b'prefixmessage') |
| 680 | msg = self.recv(sub) |
| 681 | assert msg == b'prefixmessage' |
| 682 | sub.unsubscribe('prefix') |
| 683 | pub.send(b'prefixmessage') |
| 684 | events = p.poll(1000) |
| 685 | assert events == [] |
| 686 | |
| 687 | # CI often can't handle how much memory PyPy uses on this test |
| 688 | @mark.skipif( |
nothing calls this directly
no test coverage detected