| 215 | |
| 216 | |
| 217 | class URLFetcher(object): |
| 218 | USER_AGENT = "pex/{version}".format(version=__version__) |
| 219 | |
| 220 | def __init__( |
| 221 | self, |
| 222 | network_configuration=None, # type: Optional[NetworkConfiguration] |
| 223 | handle_file_urls=False, # type: bool |
| 224 | password_entries=(), # type: Iterable[PasswordEntry] |
| 225 | netrc_file="~/.netrc", # type: Optional[str] |
| 226 | ): |
| 227 | # type: (...) -> None |
| 228 | network_configuration = network_configuration or NetworkConfiguration() |
| 229 | |
| 230 | self._timeout = network_configuration.timeout |
| 231 | self._max_retries = network_configuration.retries |
| 232 | self._proxy = network_configuration.proxy # type: Optional[str] |
| 233 | self._cert = network_configuration.cert # type: Optional[str] |
| 234 | |
| 235 | proxies = None # type: Optional[Dict[str, str]] |
| 236 | if network_configuration.proxy: |
| 237 | proxies = {protocol: network_configuration.proxy for protocol in ("http", "https")} |
| 238 | |
| 239 | handlers = [ |
| 240 | ProxyHandler(proxies), |
| 241 | HTTPSHandler(context=get_ssl_context(network_configuration=network_configuration)), |
| 242 | UnixHTTPHandler(), |
| 243 | ] |
| 244 | if handle_file_urls: |
| 245 | handlers.append(FileHandler()) |
| 246 | |
| 247 | self._password_database = PasswordDatabase.from_netrc(netrc_file=netrc_file).append( |
| 248 | password_entries |
| 249 | ) |
| 250 | self._handlers = tuple(handlers) |
| 251 | |
| 252 | def network_env(self): |
| 253 | # type: () -> Dict[str, str] |
| 254 | env = {} # type: Dict[str, str] |
| 255 | if self._proxy: |
| 256 | env.update( |
| 257 | ("{protocol}_proxy".format(protocol=protocol), self._proxy) |
| 258 | for protocol in ("http", "https") |
| 259 | ) |
| 260 | if self._cert: |
| 261 | env["SSL_CERT_DIR" if os.path.isdir(self._cert) else "SSL_CERT_FILE"] = self._cert |
| 262 | return env |
| 263 | |
| 264 | @contextmanager |
| 265 | def get_body_stream( |
| 266 | self, |
| 267 | url, # type: Text |
| 268 | extra_headers=None, # type: Optional[Mapping[str, str]] |
| 269 | ): |
| 270 | # type: (...) -> Iterator[BinaryIO] |
| 271 | |
| 272 | handlers = list(self._handlers) |
| 273 | if self._password_database.entries: |
| 274 | password_manager = HTTPPasswordMgrWithDefaultRealm() |
no outgoing calls