mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Add s3uploader script in tools
This commit is contained in:
parent
aa06005a32
commit
b7583337a8
132
utils/s3tools/s3uploader
Executable file
132
utils/s3tools/s3uploader
Executable file
@ -0,0 +1,132 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
import logging
|
||||
import argparse
|
||||
import tarfile
|
||||
import math
|
||||
|
||||
try:
|
||||
from boto.s3.connection import S3Connection
|
||||
except ImportError:
|
||||
raise ImportError("You have to install boto package 'pip install boto'")
|
||||
|
||||
|
||||
class S3API(object):
|
||||
def __init__(self, access_key, secret_access_key, mds_api, mds_url):
|
||||
self.connection = S3Connection(
|
||||
host=mds_api,
|
||||
aws_access_key_id=access_key,
|
||||
aws_secret_access_key=secret_access_key,
|
||||
)
|
||||
self.mds_url = mds_url
|
||||
|
||||
def upload_file(self, bucket_name, file_path, s3_path):
|
||||
logging.info("Start uploading file to bucket %s", bucket_name)
|
||||
bucket = self.connection.get_bucket(bucket_name)
|
||||
key = bucket.initiate_multipart_upload(s3_path)
|
||||
logging.info("Will upload to s3 path %s", s3_path)
|
||||
chunksize = 1024 * 1024 * 1024 # 1 GB
|
||||
filesize = os.stat(file_path).st_size
|
||||
logging.info("File size if %s", filesize)
|
||||
chunkcount = int(math.ceil(filesize / chunksize))
|
||||
|
||||
def call_back(x, y):
|
||||
print "Uploaded {}/{} bytes".format(x, y)
|
||||
try:
|
||||
for i in range(chunkcount + 1):
|
||||
logging.info("Uploading chunk %s of %s", i, chunkcount + 1)
|
||||
offset = chunksize * i
|
||||
bytes_size = min(chunksize, filesize - offset)
|
||||
with open(file_path, 'r') as fp:
|
||||
fp.seek(offset)
|
||||
key.upload_part_from_file(fp=fp, part_num=i+1,
|
||||
size=bytes_size, cb=call_back,
|
||||
num_cb=100)
|
||||
key.complete_upload()
|
||||
except Exception as ex:
|
||||
key.cancel_upload()
|
||||
raise ex
|
||||
logging.info("Contents were set")
|
||||
return "https://{bucket}.{mds_url}/{path}".format(
|
||||
bucket=bucket_name, mds_url=self.mds_url, path=s3_path)
|
||||
|
||||
|
||||
def make_tar_file_for_table(clickhouse_data_path, db_name, table_name,
|
||||
tmp_prefix):
|
||||
|
||||
relative_data_path = os.path.join('data', db_name, table_name)
|
||||
relative_meta_path = os.path.join('metadata', db_name, table_name + '.sql')
|
||||
path_to_data = os.path.join(clickhouse_data_path, relative_data_path)
|
||||
path_to_metadata = os.path.join(clickhouse_data_path, relative_meta_path)
|
||||
temporary_file_name = tmp_prefix + '/{tname}.tar'.format(tname=table_name)
|
||||
with tarfile.open(temporary_file_name, "w") as bundle:
|
||||
bundle.add(path_to_data, arcname=relative_data_path)
|
||||
bundle.add(path_to_metadata, arcname=relative_meta_path)
|
||||
return temporary_file_name
|
||||
|
||||
|
||||
USAGE_EXAMPLES = '''
|
||||
examples:
|
||||
\ts3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --clickhouse-data-path /opt/clickhouse/ --table-name default.some_tbl --bucket-name some-bucket
|
||||
\ts3uploader --dataset-name some_ds --access-key-id XXX --secret-access-key YYY --file-name some_ds.tsv.xz --bucket-name some-bucket
|
||||
'''
|
||||
|
||||
if __name__ == "__main__":
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Simple tool for uploading datasets to clickhouse S3",
|
||||
usage='%(prog)s [options] {}'.format(USAGE_EXAMPLES))
|
||||
parser.add_argument('--s3-api-url', default='s3.mds.yandex.net')
|
||||
parser.add_argument('--s3-common-url', default='s3.yandex.net')
|
||||
parser.add_argument('--bucket-name', default='clickhouse-datasets')
|
||||
parser.add_argument('--dataset-name', required=True,
|
||||
help='Name of dataset, will be used in uploaded path')
|
||||
parser.add_argument('--access-key-id', required=True)
|
||||
parser.add_argument('--secret-access-key', required=True)
|
||||
parser.add_argument('--clickhouse-data-path',
|
||||
default='/var/lib/clickhouse/',
|
||||
help='Path to clickhouse database on filesystem')
|
||||
parser.add_argument('--s3-path', help='Path in s3, where to upload file')
|
||||
parser.add_argument('--tmp-prefix', default='/tmp',
|
||||
help='Prefix to store temporay downloaded file')
|
||||
data_group = parser.add_mutually_exclusive_group(required=True)
|
||||
data_group.add_argument('--table-name',
|
||||
help='Name of table with database, if you are uploading partitions')
|
||||
data_group.add_argument('--file-path',
|
||||
help='Name of file, if you are uploading')
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.table_name is not None and args.clickhouse_data_path is None:
|
||||
raise argparse.ArgumentError(
|
||||
"You should specify --clickhouse-data-path to upload --table")
|
||||
|
||||
s3_conn = S3API(
|
||||
args.access_key_id, args.secret_access_key,
|
||||
args.s3_api_url, args.s3_common_url)
|
||||
|
||||
if args.table_name is not None:
|
||||
if '.' not in args.table_name:
|
||||
db_name = 'default'
|
||||
else:
|
||||
db_name, table_name = args.table_name.split('.')
|
||||
file_path = make_tar_file_for_table(
|
||||
args.clickhouse_data_path, db_name, table_name, args.tmp_prefix)
|
||||
else:
|
||||
file_path = args.file_path
|
||||
|
||||
if 'tsv' in file_path:
|
||||
s3_path = os.path.join(
|
||||
args.dataset_name, 'tsv', os.path.basename(file_path))
|
||||
elif args.table_name is not None:
|
||||
s3_path = os.path.join(
|
||||
args.dataset_name, 'partitions', os.path.basename(file_path))
|
||||
elif args.s3_path is not None:
|
||||
s3_path = os.path.join(
|
||||
args.dataset_name, s3_path, os.path.base_name(file_path))
|
||||
else:
|
||||
raise Exception("Don't know s3-path to upload")
|
||||
|
||||
url = s3_conn.upload_file(args.bucket_name, file_path, s3_path)
|
||||
logging.info("Data uploaded: %s", url)
|
Loading…
Reference in New Issue
Block a user