ClickHouse/tests/ci/get_previous_release_tag.py
2024-06-06 07:24:58 +02:00

115 lines
3.4 KiB
Python
Executable File

#!/usr/bin/env python3
import logging
import re
from typing import Dict, List, Optional, Tuple
from build_download_helper import get_gh_api
from git_helper import TAG_REGEXP
from version_helper import (
ClickHouseVersion,
get_version_from_string,
get_version_from_tag,
)
CLICKHOUSE_TAGS_URL = "https://api.github.com/repos/ClickHouse/ClickHouse/releases"
PACKAGE_REGEXP = r"\Aclickhouse-common-static_.+[.]deb"
logger = logging.getLogger(__name__)
class ReleaseInfo:
def __init__(self, release_tag: str, assets: Dict[str, str]):
self.version = get_version_from_tag(release_tag)
self.type = self.version.description
self.assets = assets
def __str__(self):
return self.version.describe
def __repr__(self):
return f"ReleaseInfo: {self.version.describe}"
def find_previous_release(
server_version: Optional[ClickHouseVersion], releases: List[ReleaseInfo]
) -> Tuple[bool, Optional[ReleaseInfo]]:
releases.sort(key=lambda x: x.version, reverse=True)
if server_version is None:
return True, releases[0]
for release in releases:
if release.version < server_version:
# Check if the artifact exists on GitHub.
# It can be not true for a short period of time
# after creating a tag for a new release before uploading the packages.
if any(re.match(PACKAGE_REGEXP, name) for name in release.assets.keys()):
return True, release
logger.debug(
"The tag v%s-%s exists but the package is not yet available on GitHub",
release.version,
release.type,
)
return False, None
def get_previous_release(
server_version: Optional[ClickHouseVersion],
) -> Optional[ReleaseInfo]:
page = 1
found = False
while not found:
response = get_gh_api(
CLICKHOUSE_TAGS_URL, params={"page": page, "per_page": 100}, timeout=10
)
if not response.ok:
logger.error(
"Cannot load the list of tags from github: %s", response.reason
)
response.raise_for_status()
releases = response.json()
release_infos = [] # type: List[ReleaseInfo]
for r in releases:
if re.match(TAG_REGEXP, r["tag_name"]):
assets = {
a["name"]: a["browser_download_url"]
for a in r["assets"]
if a["state"] == "uploaded"
}
release_infos.append(ReleaseInfo(r["tag_name"], assets))
found, previous_release = find_previous_release(server_version, release_infos)
page += 1
return previous_release
def get_release_by_tag(tag: str) -> ReleaseInfo:
response = get_gh_api(f"{CLICKHOUSE_TAGS_URL}/tags/{tag}", timeout=10)
release = response.json()
assets = {
a["name"]: a["browser_download_url"]
for a in release["assets"]
if a["state"] == "uploaded"
}
return ReleaseInfo(release["tag_name"], assets)
def main():
logging.basicConfig(level=logging.INFO)
version_string = input()
version_string = version_string.split("+", maxsplit=1)[0]
try:
server_version = get_version_from_string(version_string)
except ValueError:
server_version = get_version_from_tag(version_string)
print(get_previous_release(server_version))
if __name__ == "__main__":
main()