mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 21:42:39 +00:00
223 lines
8.4 KiB
Python
Executable File
223 lines
8.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
import os
|
|
import logging
|
|
import argparse
|
|
import tarfile
|
|
import math
|
|
|
|
try:
|
|
from boto.s3.connection import S3Connection
|
|
from boto.s3.key import Key
|
|
except ImportError:
|
|
raise ImportError("You have to install boto package 'pip install boto'")
|
|
|
|
|
|
class S3API(object):
|
|
def __init__(self, access_key, secret_access_key, mds_api, mds_url):
|
|
self.connection = S3Connection(
|
|
host=mds_api,
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_access_key,
|
|
)
|
|
self.mds_url = mds_url
|
|
|
|
def upload_file(self, bucket_name, file_path, s3_path):
|
|
logging.info("Start uploading file to bucket %s", bucket_name)
|
|
|
|
bucket = self.connection.get_bucket(bucket_name)
|
|
key = bucket.initiate_multipart_upload(s3_path)
|
|
logging.info("Will upload to s3 path %s", s3_path)
|
|
chunksize = 1024 * 1024 * 1024 # 1 GB
|
|
filesize = os.stat(file_path).st_size
|
|
logging.info("File size is %s", filesize)
|
|
chunkcount = int(math.ceil(filesize / chunksize))
|
|
|
|
def call_back(x, y):
|
|
print("Uploaded {}/{} bytes".format(x, y))
|
|
|
|
try:
|
|
for i in range(chunkcount + 1):
|
|
logging.info("Uploading chunk %s of %s", i, chunkcount + 1)
|
|
offset = chunksize * i
|
|
bytes_size = min(chunksize, filesize - offset)
|
|
|
|
with open(file_path, "r") as fp:
|
|
fp.seek(offset)
|
|
key.upload_part_from_file(
|
|
fp=fp, part_num=i + 1, size=bytes_size, cb=call_back, num_cb=100
|
|
)
|
|
key.complete_upload()
|
|
except Exception as ex:
|
|
key.cancel_upload()
|
|
raise ex
|
|
logging.info("Contents were set")
|
|
return "https://{bucket}.{mds_url}/{path}".format(
|
|
bucket=bucket_name, mds_url=self.mds_url, path=s3_path
|
|
)
|
|
|
|
def set_file_contents(self, bucket, local_file_path, s3_file_path):
|
|
key = Key(bucket)
|
|
key.key = s3_file_path
|
|
file_size = os.stat(local_file_path).st_size
|
|
logging.info(
|
|
"Uploading file `%s` to `%s`. Size is %s",
|
|
local_file_path,
|
|
s3_file_path,
|
|
file_size,
|
|
)
|
|
|
|
def call_back(x, y):
|
|
print("Uploaded {}/{} bytes".format(x, y))
|
|
|
|
key.set_contents_from_filename(local_file_path, cb=call_back)
|
|
|
|
def upload_data_for_static_files_disk(self, bucket_name, directory_path, s3_path):
|
|
bucket = self.connection.get_bucket(bucket_name)
|
|
if s3_path.endswith("/"):
|
|
s3_path += "store/"
|
|
else:
|
|
s3_path += "/store/"
|
|
print(s3_path)
|
|
for root, dirs, files in os.walk(directory_path):
|
|
path = root.split(os.sep)
|
|
for file in files:
|
|
local_file_path = os.path.join(root, file)
|
|
s3_file = local_file_path[len(directory_path) + 1 :]
|
|
s3_file_path = os.path.join(s3_path, s3_file)
|
|
self.set_file_contents(bucket, local_file_path, s3_file_path)
|
|
|
|
logging.info("Uploading finished")
|
|
return "https://{bucket}.{mds_url}/{path}".format(
|
|
bucket=bucket_name, mds_url=self.mds_url, path=s3_path
|
|
)
|
|
|
|
def list_bucket_keys(self, bucket_name):
|
|
bucket = self.connection.get_bucket(bucket_name)
|
|
for obj in bucket.get_all_keys():
|
|
print(obj.key)
|
|
|
|
def remove_folder_from_bucket(self, bucket_name, folder_path):
|
|
bucket = self.connection.get_bucket(bucket_name)
|
|
bucket.get_all_keys()
|
|
for obj in bucket.get_all_keys():
|
|
if obj.key.startswith(folder_path):
|
|
print("Removing " + obj.key)
|
|
obj.delete()
|
|
|
|
|
|
def make_tar_file_for_table(clickhouse_data_path, db_name, table_name, tmp_prefix):
|
|
relative_data_path = os.path.join("data", db_name, table_name)
|
|
relative_meta_path = os.path.join("metadata", db_name, table_name + ".sql")
|
|
path_to_data = os.path.join(clickhouse_data_path, relative_data_path)
|
|
path_to_metadata = os.path.join(clickhouse_data_path, relative_meta_path)
|
|
temporary_file_name = tmp_prefix + "/{tname}.tar".format(tname=table_name)
|
|
with tarfile.open(temporary_file_name, "w") as bundle:
|
|
bundle.add(path_to_data, arcname=relative_data_path)
|
|
bundle.add(path_to_metadata, arcname=relative_meta_path)
|
|
return temporary_file_name
|
|
|
|
|
|
USAGE_EXAMPLES = """
|
|
examples:
|
|
\t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --clickhouse-data-path /opt/clickhouse/ --table-name default.some_tbl --bucket-name some-bucket
|
|
\t./s3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --file-path some_ds.tsv.xz --bucket-name some-bucket --s3-path /path/to/
|
|
"""
|
|
|
|
if __name__ == "__main__":
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Simple tool for uploading datasets to clickhouse S3",
|
|
usage="%(prog)s [options] {}".format(USAGE_EXAMPLES),
|
|
)
|
|
parser.add_argument("--s3-api-url", default="s3.amazonaws.com")
|
|
parser.add_argument("--s3-common-url", default="s3.amazonaws.com")
|
|
parser.add_argument("--bucket-name", default="clickhouse-datasets")
|
|
parser.add_argument(
|
|
"--dataset-name",
|
|
required=True,
|
|
help="Name of dataset, will be used in uploaded path",
|
|
)
|
|
parser.add_argument("--access-key-id", required=True)
|
|
parser.add_argument("--secret-access-key", required=True)
|
|
parser.add_argument(
|
|
"--clickhouse-data-path",
|
|
default="/var/lib/clickhouse/",
|
|
help="Path to clickhouse database on filesystem",
|
|
)
|
|
parser.add_argument("--s3-path", help="Path in s3, where to upload file")
|
|
parser.add_argument(
|
|
"--tmp-prefix", default="/tmp", help="Prefix to store temporary downloaded file"
|
|
)
|
|
data_group = parser.add_mutually_exclusive_group(required=True)
|
|
table_name_argument = data_group.add_argument(
|
|
"--table-name",
|
|
help="Name of table with database, if you are uploading partitions",
|
|
)
|
|
data_group.add_argument("--file-path", help="Name of file, if you are uploading")
|
|
data_group.add_argument(
|
|
"--directory-path", help="Path to directory with files to upload"
|
|
)
|
|
data_group.add_argument(
|
|
"--list-directory", help="List s3 directory by --directory-path"
|
|
)
|
|
data_group.add_argument(
|
|
"--remove-directory", help="Remove s3 directory by --directory-path"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
if args.table_name is not None and args.clickhouse_data_path is None:
|
|
raise argparse.ArgumentError(
|
|
table_name_argument,
|
|
"You should specify --clickhouse-data-path to upload --table",
|
|
)
|
|
|
|
s3_conn = S3API(
|
|
args.access_key_id, args.secret_access_key, args.s3_api_url, args.s3_common_url
|
|
)
|
|
|
|
file_path = ""
|
|
directory_path = args.directory_path
|
|
s3_path = args.s3_path
|
|
|
|
if args.list_directory:
|
|
s3_conn.list_bucket_keys(args.bucket_name)
|
|
elif args.remove_directory:
|
|
print("Removing s3 path: " + args.remove_directory)
|
|
s3_conn.remove_folder_from_bucket(args.bucket_name, args.remove_directory)
|
|
elif args.directory_path is not None:
|
|
url = s3_conn.upload_data_for_static_files_disk(
|
|
args.bucket_name, directory_path, s3_path
|
|
)
|
|
logging.info("Data uploaded: %s", url)
|
|
else:
|
|
if args.table_name is not None:
|
|
if "." not in args.table_name:
|
|
db_name = "default"
|
|
else:
|
|
db_name, table_name = args.table_name.split(".")
|
|
file_path = make_tar_file_for_table(
|
|
args.clickhouse_data_path, db_name, table_name, args.tmp_prefix
|
|
)
|
|
else:
|
|
file_path = args.file_path
|
|
|
|
if "tsv" in file_path:
|
|
s3_path = os.path.join(
|
|
args.dataset_name, "tsv", os.path.basename(file_path)
|
|
)
|
|
if args.table_name is not None:
|
|
s3_path = os.path.join(
|
|
args.dataset_name, "partitions", os.path.basename(file_path)
|
|
)
|
|
elif args.s3_path is not None:
|
|
s3_path = os.path.join(
|
|
args.dataset_name, args.s3_path, os.path.basename(file_path)
|
|
)
|
|
else:
|
|
raise Exception("Don't know s3-path to upload")
|
|
|
|
url = s3_conn.upload_file(args.bucket_name, file_path, s3_path)
|
|
logging.info("Data uploaded: %s", url)
|