(self)
| 39 | ctx.term() |
| 40 | |
| 41 | def test_context_manager(self): |
| 42 | url = 'inproc://a' |
| 43 | with self.Context() as ctx: |
| 44 | with ctx.socket(zmq.PUSH) as a: |
| 45 | a.bind(url) |
| 46 | with ctx.socket(zmq.PULL) as b: |
| 47 | b.connect(url) |
| 48 | msg = b'hi' |
| 49 | a.send(msg) |
| 50 | rcvd = self.recv(b) |
| 51 | assert rcvd == msg |
| 52 | assert b.closed is True |
| 53 | assert a.closed is True |
| 54 | assert ctx.closed is True |
| 55 | |
| 56 | def test_connectbind_context_managers(self): |
| 57 | url = 'inproc://a' |