#!/usr/bin/env python # -*- coding: utf-8 -*- import os import sys import tarfile import logging import argparse import requests import tempfile DEFAULT_URL = 'https://clickhouse-datasets.s3.yandex.net' AVAILABLE_DATASETS = { 'hits': 'hits_v1.tar', 'visits': 'visits_v1.tar', 'values_with_expressions': 'test_values.tar', 'hits_100m_single': 'hits_100m_single.tar', 'hits_1000m_single': 'hits_1000m_single.tar', 'hits_10m_single': 'hits_10m_single.tar', 'trips_mergetree': 'trips_mergetree.tar', } def _get_temp_file_name(): return os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())) def build_url(base_url, dataset): return os.path.join(base_url, dataset, 'partitions', AVAILABLE_DATASETS[dataset]) def dowload_with_progress(url, path): logging.info("Downloading from %s to temp path %s", url, path) with open(path, 'w') as f: response = requests.get(url, stream=True) response.raise_for_status() total_length = response.headers.get('content-length') if total_length is None or int(total_length) == 0: logging.info("No content-length, will download file without progress") f.write(response.content) else: dl = 0 total_length = int(total_length) logging.info("Content length is %ld bytes", total_length) counter = 0 for data in response.iter_content(chunk_size=4096): dl += len(data) counter += 1 f.write(data) done = int(50 * dl / total_length) percent = int(100 * float(dl) / total_length) if sys.stdout.isatty(): sys.stdout.write("\r[{}{}] {}%".format('=' * done, ' ' * (50-done), percent)) sys.stdout.flush() elif counter % 1000 == 0: sys.stdout.write("{}%".format(percent)) sys.stdout.flush() sys.stdout.write("\n") logging.info("Downloading finished") def unpack_to_clickhouse_directory(tar_path, clickhouse_path): logging.info("Will unpack data from temp path %s to clickhouse db %s", tar_path, clickhouse_path) with tarfile.open(tar_path, 'r') as comp_file: comp_file.extractall(path=clickhouse_path) logging.info("Unpack finished") if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s: %(message)s') parser = argparse.ArgumentParser( description="Simple tool for dowloading datasets for clickhouse from S3") parser.add_argument('--dataset-names', required=True, nargs='+', choices=AVAILABLE_DATASETS.keys()) parser.add_argument('--url-prefix', default=DEFAULT_URL) parser.add_argument('--clickhouse-data-path', default='/var/lib/clickhouse/') args = parser.parse_args() datasets = args.dataset_names logging.info("Will fetch following datasets: %s", ', '.join(datasets)) for dataset in datasets: logging.info("Processing %s", dataset) temp_archive_path = _get_temp_file_name() try: download_url_for_dataset = build_url(args.url_prefix, dataset) dowload_with_progress(download_url_for_dataset, temp_archive_path) unpack_to_clickhouse_directory(temp_archive_path, args.clickhouse_data_path) except Exception as ex: logging.info("Some exception occured %s", str(ex)) raise finally: logging.info("Will remove dowloaded file %s from filesystem if it exists", temp_archive_path) if os.path.exists(temp_archive_path): os.remove(temp_archive_path) logging.info("Processing of %s finished, table placed at", dataset) logging.info("Fetch finished, enjoy your tables!")