ClickHouse/docker/test/stateful/s3downloader

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

127 lines
4.2 KiB
Plaintext
Raw Normal View History

2020-10-02 16:54:07 +00:00
#!/usr/bin/env python3
2019-01-21 08:46:28 +00:00
# -*- coding: utf-8 -*-
import os
import sys
2020-09-08 08:45:01 +00:00
import time
2019-01-21 08:46:28 +00:00
import tarfile
import logging
import argparse
import requests
import tempfile
2023-03-23 15:33:23 +00:00
DEFAULT_URL = "https://clickhouse-datasets.s3.amazonaws.com"
2019-01-21 08:46:28 +00:00
AVAILABLE_DATASETS = {
2023-03-23 15:33:23 +00:00
"hits": "hits_v1.tar",
"visits": "visits_v1.tar",
2019-01-21 08:46:28 +00:00
}
2020-09-08 08:43:02 +00:00
RETRIES_COUNT = 5
2023-03-23 15:33:23 +00:00
2019-01-21 08:46:28 +00:00
def _get_temp_file_name():
2023-03-23 15:33:23 +00:00
return os.path.join(
tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())
)
2019-01-21 08:46:28 +00:00
def build_url(base_url, dataset):
2023-03-23 15:33:23 +00:00
return os.path.join(base_url, dataset, "partitions", AVAILABLE_DATASETS[dataset])
2019-01-21 08:46:28 +00:00
def dowload_with_progress(url, path):
logging.info("Downloading from %s to temp path %s", url, path)
2020-09-08 08:43:02 +00:00
for i in range(RETRIES_COUNT):
try:
2023-03-23 15:33:23 +00:00
with open(path, "wb") as f:
2020-09-08 08:43:02 +00:00
response = requests.get(url, stream=True)
response.raise_for_status()
2023-03-23 15:33:23 +00:00
total_length = response.headers.get("content-length")
2020-09-08 08:43:02 +00:00
if total_length is None or int(total_length) == 0:
2023-03-23 15:33:23 +00:00
logging.info(
"No content-length, will download file without progress"
)
2020-09-08 08:43:02 +00:00
f.write(response.content)
else:
dl = 0
total_length = int(total_length)
logging.info("Content length is %ld bytes", total_length)
for data in response.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
if sys.stdout.isatty():
done = int(50 * dl / total_length)
percent = int(100 * float(dl) / total_length)
2023-03-23 15:33:23 +00:00
sys.stdout.write(
"\r[{}{}] {}%".format(
"=" * done, " " * (50 - done), percent
)
)
2020-09-08 08:43:02 +00:00
sys.stdout.flush()
break
except Exception as ex:
sys.stdout.write("\n")
2020-09-08 08:45:01 +00:00
time.sleep(3)
2020-09-08 08:43:02 +00:00
logging.info("Exception while downloading %s, retry %s", ex, i + 1)
if os.path.exists(path):
os.remove(path)
2020-09-08 08:49:24 +00:00
else:
2023-03-23 15:33:23 +00:00
raise Exception(
"Cannot download dataset from {}, all retries exceeded".format(url)
)
2020-09-08 08:43:02 +00:00
2019-01-21 08:46:28 +00:00
sys.stdout.write("\n")
logging.info("Downloading finished")
2023-03-23 15:33:23 +00:00
2019-01-21 08:46:28 +00:00
def unpack_to_clickhouse_directory(tar_path, clickhouse_path):
2023-03-23 15:33:23 +00:00
logging.info(
"Will unpack data from temp path %s to clickhouse db %s",
tar_path,
clickhouse_path,
)
with tarfile.open(tar_path, "r") as comp_file:
2019-01-21 08:46:28 +00:00
comp_file.extractall(path=clickhouse_path)
logging.info("Unpack finished")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(
2023-03-23 15:33:23 +00:00
description="Simple tool for dowloading datasets for clickhouse from S3"
)
parser.add_argument(
"--dataset-names",
required=True,
nargs="+",
choices=list(AVAILABLE_DATASETS.keys()),
)
parser.add_argument("--url-prefix", default=DEFAULT_URL)
parser.add_argument("--clickhouse-data-path", default="/var/lib/clickhouse/")
2019-01-21 08:46:28 +00:00
args = parser.parse_args()
datasets = args.dataset_names
2023-03-23 15:33:23 +00:00
logging.info("Will fetch following datasets: %s", ", ".join(datasets))
2019-01-21 08:46:28 +00:00
for dataset in datasets:
logging.info("Processing %s", dataset)
temp_archive_path = _get_temp_file_name()
try:
download_url_for_dataset = build_url(args.url_prefix, dataset)
dowload_with_progress(download_url_for_dataset, temp_archive_path)
unpack_to_clickhouse_directory(temp_archive_path, args.clickhouse_data_path)
except Exception as ex:
logging.info("Some exception occured %s", str(ex))
raise
finally:
2023-03-23 15:33:23 +00:00
logging.info(
"Will remove downloaded file %s from filesystem if it exists",
temp_archive_path,
)
2019-01-21 08:46:28 +00:00
if os.path.exists(temp_archive_path):
os.remove(temp_archive_path)
logging.info("Processing of %s finished", dataset)
logging.info("Fetch finished, enjoy your tables!")