#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import logging import argparse import tarfile import math try: from boto.s3.connection import S3Connection from boto.s3.key import Key except ImportError: raise ImportError("You have to install boto package 'pip install boto'") class S3API(object): def __init__(self, access_key, secret_access_key, mds_api, mds_url): self.connection = S3Connection( host=mds_api, aws_access_key_id=access_key, aws_secret_access_key=secret_access_key, ) self.mds_url = mds_url def upload_file(self, bucket_name, file_path, s3_path): logging.info("Start uploading file to bucket %s", bucket_name) bucket = self.connection.get_bucket(bucket_name) key = bucket.initiate_multipart_upload(s3_path) logging.info("Will upload to s3 path %s", s3_path) chunksize = 1024 * 1024 * 1024 # 1 GB filesize = os.stat(file_path).st_size logging.info("File size is %s", filesize) chunkcount = int(math.ceil(filesize / chunksize)) def call_back(x, y): print("Uploaded {}/{} bytes".format(x, y)) try: for i in range(chunkcount + 1): logging.info("Uploading chunk %s of %s", i, chunkcount + 1) offset = chunksize * i bytes_size = min(chunksize, filesize - offset) with open(file_path, "r") as fp: fp.seek(offset) key.upload_part_from_file( fp=fp, part_num=i + 1, size=bytes_size, cb=call_back, num_cb=100 ) key.complete_upload() except Exception as ex: key.cancel_upload() raise ex logging.info("Contents were set") return "https://{bucket}.{mds_url}/{path}".format( bucket=bucket_name, mds_url=self.mds_url, path=s3_path ) def set_file_contents(self, bucket, local_file_path, s3_file_path): key = Key(bucket) key.key = s3_file_path file_size = os.stat(local_file_path).st_size logging.info( "Uploading file `%s` to `%s`. Size is %s", local_file_path, s3_file_path, file_size, ) def call_back(x, y): print("Uploaded {}/{} bytes".format(x, y)) key.set_contents_from_filename(local_file_path, cb=call_back) def upload_data_for_static_files_disk(self, bucket_name, directory_path, s3_path): bucket = self.connection.get_bucket(bucket_name) if s3_path.endswith("/"): s3_path += "store/" else: s3_path += "/store/" print(s3_path) for root, dirs, files in os.walk(directory_path): path = root.split(os.sep) for file in files: local_file_path = os.path.join(root, file) s3_file = local_file_path[len(directory_path) + 1 :] s3_file_path = os.path.join(s3_path, s3_file) self.set_file_contents(bucket, local_file_path, s3_file_path) logging.info("Uploading finished") return "https://{bucket}.{mds_url}/{path}".format( bucket=bucket_name, mds_url=self.mds_url, path=s3_path ) def list_bucket_keys(self, bucket_name): bucket = self.connection.get_bucket(bucket_name) for obj in bucket.get_all_keys(): print(obj.key) def remove_folder_from_bucket(self, bucket_name, folder_path): bucket = self.connection.get_bucket(bucket_name) bucket.get_all_keys() for obj in bucket.get_all_keys(): if obj.key.startswith(folder_path): print("Removing " + obj.key) obj.delete() def make_tar_file_for_table(clickhouse_data_path, db_name, table_name, tmp_prefix): relative_data_path = os.path.join("data", db_name, table_name) relative_meta_path = os.path.join("metadata", db_name, table_name + ".sql") path_to_data = os.path.join(clickhouse_data_path, relative_data_path) path_to_metadata = os.path.join(clickhouse_data_path, relative_meta_path) temporary_file_name = tmp_prefix + "/{tname}.tar".format(tname=table_name) with tarfile.open(temporary_file_name, "w") as bundle: bundle.add(path_to_data, arcname=relative_data_path) bundle.add(path_to_metadata, arcname=relative_meta_path) return temporary_file_name USAGE_EXAMPLES = """ examples: \t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --clickhouse-data-path /opt/clickhouse/ --table-name default.some_tbl --bucket-name some-bucket \t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --file-path some_ds.tsv.xz --bucket-name some-bucket --s3-path /path/to/ """ if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s") parser = argparse.ArgumentParser( description="Simple tool for uploading datasets to clickhouse S3", usage="%(prog)s [options] {}".format(USAGE_EXAMPLES), ) parser.add_argument("--s3-api-url", default="s3.amazonaws.com") parser.add_argument("--s3-common-url", default="s3.amazonaws.com") parser.add_argument("--bucket-name", default="clickhouse-datasets") parser.add_argument( "--dataset-name", required=True, help="Name of dataset, will be used in uploaded path", ) parser.add_argument("--access-key-id", required=True) parser.add_argument("--secret-access-key", required=True) parser.add_argument( "--clickhouse-data-path", default="/var/lib/clickhouse/", help="Path to clickhouse database on filesystem", ) parser.add_argument("--s3-path", help="Path in s3, where to upload file") parser.add_argument( "--tmp-prefix", default="/tmp", help="Prefix to store temporary downloaded file" ) data_group = parser.add_mutually_exclusive_group(required=True) table_name_argument = data_group.add_argument( "--table-name", help="Name of table with database, if you are uploading partitions", ) data_group.add_argument("--file-path", help="Name of file, if you are uploading") data_group.add_argument( "--directory-path", help="Path to directory with files to upload" ) data_group.add_argument( "--list-directory", help="List s3 directory by --directory-path" ) data_group.add_argument( "--remove-directory", help="Remove s3 directory by --directory-path" ) args = parser.parse_args() if args.table_name is not None and args.clickhouse_data_path is None: raise argparse.ArgumentError( table_name_argument, "You should specify --clickhouse-data-path to upload --table", ) s3_conn = S3API( args.access_key_id, args.secret_access_key, args.s3_api_url, args.s3_common_url ) file_path = "" directory_path = args.directory_path s3_path = args.s3_path if args.list_directory: s3_conn.list_bucket_keys(args.bucket_name) elif args.remove_directory: print("Removing s3 path: " + args.remove_directory) s3_conn.remove_folder_from_bucket(args.bucket_name, args.remove_directory) elif args.directory_path is not None: url = s3_conn.upload_data_for_static_files_disk( args.bucket_name, directory_path, s3_path ) logging.info("Data uploaded: %s", url) else: if args.table_name is not None: if "." not in args.table_name: db_name = "default" else: db_name, table_name = args.table_name.split(".") file_path = make_tar_file_for_table( args.clickhouse_data_path, db_name, table_name, args.tmp_prefix ) else: file_path = args.file_path if "tsv" in file_path: s3_path = os.path.join( args.dataset_name, "tsv", os.path.basename(file_path) ) if args.table_name is not None: s3_path = os.path.join( args.dataset_name, "partitions", os.path.basename(file_path) ) elif args.s3_path is not None: s3_path = os.path.join( args.dataset_name, args.s3_path, os.path.basename(file_path) ) else: raise Exception("Don't know s3-path to upload") url = s3_conn.upload_file(args.bucket_name, file_path, s3_path) logging.info("Data uploaded: %s", url)