(self)
| 105 | self.ssh_host = host |
| 106 | |
| 107 | def connect(self): |
| 108 | if self.ssh_transport: |
| 109 | sock = self.ssh_transport.open_session() |
| 110 | sock.settimeout(self.timeout) |
| 111 | sock.exec_command('docker system dial-stdio') |
| 112 | else: |
| 113 | sock = SSHSocket(self.ssh_host) |
| 114 | sock.settimeout(self.timeout) |
| 115 | sock.connect() |
| 116 | |
| 117 | self.sock = sock |
| 118 | |
| 119 | |
| 120 | class SSHConnectionPool(urllib3.connectionpool.HTTPConnectionPool): |
nothing calls this directly
no test coverage detected