| 103 | self.consumer.consume() |
| 104 | |
| 105 | def loop(self, c): |
| 106 | resets = [self._resets] |
| 107 | shutdown = self._node_shutdown = threading.Event() |
| 108 | stopped = self._node_stopped = threading.Event() |
| 109 | try: |
| 110 | with c.connection_for_read() as connection: |
| 111 | info('pidbox: Connected to %s.', connection.as_uri()) |
| 112 | self._do_reset(c, connection) |
| 113 | while not shutdown.is_set() and c.connection: |
| 114 | if resets[0] < self._resets: |
| 115 | resets[0] += 1 |
| 116 | self._do_reset(c, connection) |
| 117 | try: |
| 118 | connection.drain_events(timeout=1.0) |
| 119 | except socket.timeout: |
| 120 | pass |
| 121 | finally: |
| 122 | stopped.set() |