(self, fd, mode, flags)
| 66 | self._fds = {} |
| 67 | |
| 68 | def _control(self, fd, mode, flags): |
| 69 | events = [] |
| 70 | if mode & POLL_IN: |
| 71 | events.append(select.kevent(fd, select.KQ_FILTER_READ, flags)) |
| 72 | if mode & POLL_OUT: |
| 73 | events.append(select.kevent(fd, select.KQ_FILTER_WRITE, flags)) |
| 74 | for e in events: |
| 75 | self._kqueue.control([e], 0) |
| 76 | |
| 77 | def poll(self, timeout): |
| 78 | if timeout < 0: |
no outgoing calls
no test coverage detected