Stream wrapper for real-time events, logs, etc. from the server. Example: >>> events = client.events() >>> for event in events: ... print(event) >>> # and cancel from another thread >>> events.close()
| 6 | |
| 7 | |
| 8 | class CancellableStream: |
| 9 | """ |
| 10 | Stream wrapper for real-time events, logs, etc. from the server. |
| 11 | |
| 12 | Example: |
| 13 | >>> events = client.events() |
| 14 | >>> for event in events: |
| 15 | ... print(event) |
| 16 | >>> # and cancel from another thread |
| 17 | >>> events.close() |
| 18 | """ |
| 19 | |
| 20 | def __init__(self, stream, response): |
| 21 | self._stream = stream |
| 22 | self._response = response |
| 23 | |
| 24 | def __iter__(self): |
| 25 | return self |
| 26 | |
| 27 | def __next__(self): |
| 28 | try: |
| 29 | return next(self._stream) |
| 30 | except urllib3.exceptions.ProtocolError: |
| 31 | raise StopIteration from None |
| 32 | except OSError: |
| 33 | raise StopIteration from None |
| 34 | |
| 35 | next = __next__ |
| 36 | |
| 37 | def close(self): |
| 38 | """ |
| 39 | Closes the event streaming. |
| 40 | """ |
| 41 | |
| 42 | if not self._response.raw.closed: |
| 43 | # find the underlying socket object |
| 44 | # based on api.client._get_raw_response_socket |
| 45 | |
| 46 | sock_fp = self._response.raw._fp.fp |
| 47 | |
| 48 | if hasattr(sock_fp, 'raw'): |
| 49 | sock_raw = sock_fp.raw |
| 50 | |
| 51 | if hasattr(sock_raw, 'sock'): |
| 52 | sock = sock_raw.sock |
| 53 | |
| 54 | elif hasattr(sock_raw, '_sock'): |
| 55 | sock = sock_raw._sock |
| 56 | |
| 57 | elif hasattr(sock_fp, 'channel'): |
| 58 | # We're working with a paramiko (SSH) channel, which doesn't |
| 59 | # support cancelable streams with the current implementation |
| 60 | raise DockerException( |
| 61 | 'Cancellable streams not supported for the SSH protocol' |
| 62 | ) |
| 63 | else: |
| 64 | sock = sock_fp._sock |
| 65 |
no outgoing calls
no test coverage detected