ClickHouse/docker/test/stress/download_previous_release

111 lines
4.2 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python3
import requests
import re
import os
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
CLICKHOUSE_TAGS_URL = "https://api.github.com/repos/ClickHouse/ClickHouse/tags"
CLICKHOUSE_COMMON_STATIC_DOWNLOAD_URL = "https://github.com/ClickHouse/ClickHouse/releases/download/v{version}-{type}/clickhouse-common-static_{version}_amd64.deb"
CLICKHOUSE_COMMON_STATIC_DBG_DOWNLOAD_URL = "https://github.com/ClickHouse/ClickHouse/releases/download/v{version}-{type}/clickhouse-common-static-dbg_{version}_amd64.deb"
CLICKHOUSE_SERVER_DOWNLOAD_URL = "https://github.com/ClickHouse/ClickHouse/releases/download/v{version}-{type}/clickhouse-server_{version}_all.deb"
CLICKHOUSE_CLIENT_DOWNLOAD_URL = "https://github.com/ClickHouse/ClickHouse/releases/download/v{version}-{type}/clickhouse-client_{version}_amd64.deb"
CLICKHOUSE_COMMON_STATIC_PACKET_NAME = "clickhouse-common-static_{version}_amd64.deb"
CLICKHOUSE_COMMON_STATIC_DBG_PACKET_NAME = "clickhouse-common-static-dbg_{version}_amd64.deb"
CLICKHOUSE_SERVER_PACKET_NAME = "clickhouse-server_{version}_all.deb"
CLICKHOUSE_CLIENT_PACKET_NAME = "clickhouse-client_{version}_all.deb"
PACKETS_DIR = "previous_release_package_folder/"
2021-11-15 20:16:35 +00:00
VERSION_PATTERN = r"((?:\d+\.)?(?:\d+\.)?(?:\d+\.)?\d+-[a-zA-Z]*)"
class Version:
def __init__(self, version):
self.version = version
def __lt__(self, other):
return list(map(int, self.version.split('.'))) < list(map(int, other.version.split('.')))
def __str__(self):
return self.version
class ReleaseInfo:
def __init__(self, version, release_type):
self.version = version
self.type = release_type
def find_previous_release(server_version, releases):
releases.sort(key=lambda x: x.version, reverse=True)
for release in releases:
if release.version < server_version:
return True, release
return False, None
def get_previous_release(server_version):
page = 1
found = False
while not found:
response = requests.get(CLICKHOUSE_TAGS_URL, {'page': page, 'per_page': 100})
if not response.ok:
raise Exception('Cannot load the list of tags from github: ' + response.reason)
releases_str = set(re.findall(VERSION_PATTERN, response.text))
if len(releases_str) == 0:
raise Exception('Cannot find previous release for ' + str(server_version) + ' server version')
releases = list(map(lambda x: ReleaseInfo(Version(x.split('-')[0]), x.split('-')[1]), releases_str))
found, previous_release = find_previous_release(server_version, releases)
page += 1
return previous_release
def download_packet(url, local_file_name, retries=10, backoff_factor=0.3):
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
response = session.get(url)
print(url)
if response.ok:
open(PACKETS_DIR + local_file_name, 'wb').write(response.content)
def download_packets(release):
if not os.path.exists(PACKETS_DIR):
os.makedirs(PACKETS_DIR)
download_packet(CLICKHOUSE_COMMON_STATIC_DOWNLOAD_URL.format(version=release.version, type=release.type),
CLICKHOUSE_COMMON_STATIC_PACKET_NAME.format(version=release.version))
download_packet(CLICKHOUSE_COMMON_STATIC_DBG_DOWNLOAD_URL.format(version=release.version, type=release.type),
CLICKHOUSE_COMMON_STATIC_DBG_PACKET_NAME.format(version=release.version))
download_packet(CLICKHOUSE_SERVER_DOWNLOAD_URL.format(version=release.version, type=release.type),
CLICKHOUSE_SERVER_PACKET_NAME.format(version=release.version))
download_packet(CLICKHOUSE_CLIENT_DOWNLOAD_URL.format(version=release.version, type=release.type),
CLICKHOUSE_CLIENT_PACKET_NAME.format(version=release.version))
if __name__ == '__main__':
server_version = Version(input())
previous_release = get_previous_release(server_version)
download_packets(previous_release)