(self, base_url, timeout=60,
pool_connections=constants.DEFAULT_NUM_POOLS,
max_pool_size=constants.DEFAULT_MAX_POOL_SIZE,
shell_out=False)
| 163 | ] |
| 164 | |
| 165 | def __init__(self, base_url, timeout=60, |
| 166 | pool_connections=constants.DEFAULT_NUM_POOLS, |
| 167 | max_pool_size=constants.DEFAULT_MAX_POOL_SIZE, |
| 168 | shell_out=False): |
| 169 | self.ssh_client = None |
| 170 | if not shell_out: |
| 171 | self._create_paramiko_client(base_url) |
| 172 | self._connect() |
| 173 | |
| 174 | self.ssh_host = base_url |
| 175 | if base_url.startswith('ssh://'): |
| 176 | self.ssh_host = base_url[len('ssh://'):] |
| 177 | |
| 178 | self.timeout = timeout |
| 179 | self.max_pool_size = max_pool_size |
| 180 | self.pools = RecentlyUsedContainer( |
| 181 | pool_connections, dispose_func=lambda p: p.close() |
| 182 | ) |
| 183 | super().__init__() |
| 184 | |
| 185 | def _create_paramiko_client(self, base_url): |
| 186 | logging.getLogger("paramiko").setLevel(logging.WARNING) |
nothing calls this directly
no test coverage detected