Open *url* with config-driven auth, redirect stripping, and fallthrough. 1. Find ``auth.json`` entries whose hosts match the URL. 2. For each entry, resolve the token and try the request. 3. On 401/403 move to the next matching entry. 4. After all entries exhausted (or none matched)
(
url: str,
timeout: int = 10,
extra_headers: dict[str, str] | None = None,
redirect_validator: RedirectValidator | None = None,
)
| 133 | |
| 134 | |
| 135 | def open_url( |
| 136 | url: str, |
| 137 | timeout: int = 10, |
| 138 | extra_headers: dict[str, str] | None = None, |
| 139 | redirect_validator: RedirectValidator | None = None, |
| 140 | ): |
| 141 | """Open *url* with config-driven auth, redirect stripping, and fallthrough. |
| 142 | |
| 143 | 1. Find ``auth.json`` entries whose hosts match the URL. |
| 144 | 2. For each entry, resolve the token and try the request. |
| 145 | 3. On 401/403 move to the next matching entry. |
| 146 | 4. After all entries exhausted (or none matched), try unauthenticated. |
| 147 | 5. Non-auth errors (404, 500, network) raise immediately. |
| 148 | |
| 149 | *extra_headers* (e.g. ``Accept``) are merged into every attempt. |
| 150 | *redirect_validator*, when provided, is called with ``(old_url, new_url)`` |
| 151 | before following each redirect and may raise to reject the redirect. |
| 152 | """ |
| 153 | entries = find_entries_for_url(url, _load_config()) |
| 154 | |
| 155 | def _make_req(auth_headers: dict[str, str]) -> urllib.request.Request: |
| 156 | merged = {} |
| 157 | if extra_headers: |
| 158 | # Strip Authorization from extra_headers to prevent bypass |
| 159 | merged.update({k: v for k, v in extra_headers.items() if k.lower() != "authorization"}) |
| 160 | # Auth headers applied last — cannot be overridden by extra_headers |
| 161 | merged.update(auth_headers) |
| 162 | return urllib.request.Request(url, headers=merged) |
| 163 | |
| 164 | # Try each matching entry |
| 165 | for entry in entries: |
| 166 | provider = get_provider(entry.provider) |
| 167 | if provider is None: |
| 168 | continue |
| 169 | token = provider.resolve_token(entry) |
| 170 | if not token: |
| 171 | continue |
| 172 | |
| 173 | req = _make_req(provider.auth_headers(token, entry.auth)) |
| 174 | opener = urllib.request.build_opener(_StripAuthOnRedirect(entry.hosts, redirect_validator)) |
| 175 | try: |
| 176 | return opener.open(req, timeout=timeout) |
| 177 | except urllib.error.HTTPError as exc: |
| 178 | if exc.code in (401, 403): |
| 179 | exc.close() |
| 180 | continue # try next entry |
| 181 | raise |
| 182 | |
| 183 | # No entry worked (or none matched) — unauthenticated fallback |
| 184 | req = _make_req({}) |
| 185 | if redirect_validator is not None: |
| 186 | opener = urllib.request.build_opener(_StripAuthOnRedirect((), redirect_validator)) |
| 187 | return opener.open(req, timeout=timeout) |
| 188 | return urllib.request.urlopen(req, timeout=timeout) # noqa: S310 |