(
request: PackageCloudNextReleaseRequest,
)
| 90 | |
| 91 | @_uncacheable_rule |
| 92 | async def packagecloud_get_next_release( |
| 93 | request: PackageCloudNextReleaseRequest, |
| 94 | ) -> PackageCloudNextRelease: |
| 95 | env_vars: EnvironmentVars = await Get( |
| 96 | EnvironmentVars, EnvironmentVarsRequest(["PACKAGECLOUD_TOKEN"]) |
| 97 | ) |
| 98 | package_cloud_token = env_vars.get("PACKAGECLOUD_TOKEN") |
| 99 | if not package_cloud_token: |
| 100 | return PackageCloudNextRelease() |
| 101 | |
| 102 | client = requests.session() |
| 103 | client.auth = HTTPBasicAuth(package_cloud_token, "") |
| 104 | |
| 105 | def get(url_path: str) -> list[dict[str, Any]]: |
| 106 | response = client.get(f"https://packagecloud.io{url_path}") |
| 107 | response.raise_for_status() |
| 108 | ret: list[dict[str, Any]] = response.json() |
| 109 | next_url = response.links.get("next", {}).get("url") |
| 110 | while next_url: |
| 111 | response = client.get(f"https://packagecloud.io{next_url}") |
| 112 | response.raise_for_status() |
| 113 | ret.extend(response.json()) |
| 114 | next_url = response.links.get("next", {}).get("url") |
| 115 | return ret |
| 116 | |
| 117 | distro_id = request.distro_id |
| 118 | distro_info = DISTRO_INFO[distro_id] |
| 119 | pkg_is_unstable = "dev" in request.package_version |
| 120 | |
| 121 | # packagecloud url params: |
| 122 | org = "stackstorm" |
| 123 | repo = f"{'' if request.production else 'staging-'}{'unstable' if pkg_is_unstable else 'stable'}" |
| 124 | pkg_type = distro_info["pkg_type"] |
| 125 | distro = distro_info["distro"] |
| 126 | distro_version = distro_id if pkg_type == "deb" else distro_info["version"] |
| 127 | pkg_name = request.package_name |
| 128 | arch = ARCH_NAMES[request.nfpm_arch][pkg_type] |
| 129 | |
| 130 | # https://packagecloud.io/docs/api#resource_packages_method_index (api doc incorrectly drops /:package) |
| 131 | # /api/v1/repos/:user_id/:repo/packages/:type/:distro/:version/:package/:arch.json |
| 132 | index_url = f"/api/v1/repos/{org}/{repo}/packages/{pkg_type}/{distro}/{distro_version}/{pkg_name}/{arch}.json" |
| 133 | package_index: list[dict[str, Any]] = get(index_url) |
| 134 | if not package_index: |
| 135 | return PackageCloudNextRelease() |
| 136 | |
| 137 | versions_url: str = package_index[0]["versions_url"] |
| 138 | versions: list[dict[str, Any]] = get(versions_url) |
| 139 | releases = [ |
| 140 | version_info["release"] |
| 141 | for version_info in versions |
| 142 | if version_info["version"] == request.package_version |
| 143 | ] |
| 144 | if not releases: |
| 145 | return PackageCloudNextRelease() |
| 146 | |
| 147 | max_release = max(int(release) for release in releases) |
| 148 | next_release = max_release + 1 |
| 149 | return PackageCloudNextRelease(next_release) |
no test coverage detected