MCPcopy
hub / github.com/StackStorm/st2 / packagecloud_get_next_release

Function packagecloud_get_next_release

pants-plugins/release/packagecloud_rules.py:92–149  ·  view source on GitHub ↗
(
    request: PackageCloudNextReleaseRequest,
)

Source from the content-addressed store, hash-verified

90
91@_uncacheable_rule
92async def packagecloud_get_next_release(
93 request: PackageCloudNextReleaseRequest,
94) -> PackageCloudNextRelease:
95 env_vars: EnvironmentVars = await Get(
96 EnvironmentVars, EnvironmentVarsRequest(["PACKAGECLOUD_TOKEN"])
97 )
98 package_cloud_token = env_vars.get("PACKAGECLOUD_TOKEN")
99 if not package_cloud_token:
100 return PackageCloudNextRelease()
101
102 client = requests.session()
103 client.auth = HTTPBasicAuth(package_cloud_token, "")
104
105 def get(url_path: str) -> list[dict[str, Any]]:
106 response = client.get(f"https://packagecloud.io{url_path}")
107 response.raise_for_status()
108 ret: list[dict[str, Any]] = response.json()
109 next_url = response.links.get("next", {}).get("url")
110 while next_url:
111 response = client.get(f"https://packagecloud.io{next_url}")
112 response.raise_for_status()
113 ret.extend(response.json())
114 next_url = response.links.get("next", {}).get("url")
115 return ret
116
117 distro_id = request.distro_id
118 distro_info = DISTRO_INFO[distro_id]
119 pkg_is_unstable = "dev" in request.package_version
120
121 # packagecloud url params:
122 org = "stackstorm"
123 repo = f"{'' if request.production else 'staging-'}{'unstable' if pkg_is_unstable else 'stable'}"
124 pkg_type = distro_info["pkg_type"]
125 distro = distro_info["distro"]
126 distro_version = distro_id if pkg_type == "deb" else distro_info["version"]
127 pkg_name = request.package_name
128 arch = ARCH_NAMES[request.nfpm_arch][pkg_type]
129
130 # https://packagecloud.io/docs/api#resource_packages_method_index (api doc incorrectly drops /:package)
131 # /api/v1/repos/:user_id/:repo/packages/:type/:distro/:version/:package/:arch.json
132 index_url = f"/api/v1/repos/{org}/{repo}/packages/{pkg_type}/{distro}/{distro_version}/{pkg_name}/{arch}.json"
133 package_index: list[dict[str, Any]] = get(index_url)
134 if not package_index:
135 return PackageCloudNextRelease()
136
137 versions_url: str = package_index[0]["versions_url"]
138 versions: list[dict[str, Any]] = get(versions_url)
139 releases = [
140 version_info["release"]
141 for version_info in versions
142 if version_info["version"] == request.package_version
143 ]
144 if not releases:
145 return PackageCloudNextRelease()
146
147 max_release = max(int(release) for release in releases)
148 next_release = max_release + 1
149 return PackageCloudNextRelease(next_release)

Callers 1

inject_package_fieldsFunction · 0.85

Calls 3

getFunction · 0.85
getMethod · 0.45

Tested by

no test coverage detected