ClickHouse/tests/integration/helpers/s3_tools.py
2024-10-02 11:15:16 +00:00

175 lines
5.6 KiB
Python

import glob
import json
import os
import shutil
from enum import Enum
from minio import Minio
from pyhdfs import HdfsClient
class CloudUploader:
def upload_directory(self, local_path, remote_blob_path, **kwargs):
print(kwargs)
result_files = []
# print(f"Arguments: {local_path}, {s3_path}")
# for local_file in glob.glob(local_path + "/**"):
# print("Local file: {}", local_file)
for local_file in glob.glob(local_path + "/**"):
result_local_path = os.path.join(local_path, local_file)
result_remote_blob_path = os.path.join(remote_blob_path, local_file)
if os.path.isfile(local_file):
self.upload_file(result_local_path, result_remote_blob_path, **kwargs)
result_files.append(result_remote_blob_path)
else:
files = self.upload_directory(
result_local_path, result_remote_blob_path, **kwargs
)
result_files.extend(files)
return result_files
class S3Uploader(CloudUploader):
def __init__(self, minio_client, bucket_name):
self.minio_client = minio_client
self.bucket_name = bucket_name
def upload_file(self, local_path, remote_blob_path, bucket=None):
print(f"Upload to bucket: {bucket}")
if bucket is None:
bucket = self.bucket_name
self.minio_client.fput_object(
bucket_name=bucket,
object_name=remote_blob_path,
file_path=local_path,
)
class LocalUploader(CloudUploader):
def __init__(self, clickhouse_node):
self.clickhouse_node = clickhouse_node
def upload_file(self, local_path, remote_blob_path):
dir_path = os.path.dirname(remote_blob_path)
if dir_path != "":
self.clickhouse_node.exec_in_container(
[
"bash",
"-c",
"mkdir -p {}".format(dir_path),
]
)
self.clickhouse_node.copy_file_to_container(local_path, remote_blob_path)
class HDFSUploader(CloudUploader):
def __init__(self, clickhouse_node):
self.clickhouse_node = clickhouse_node
def upload_file(self, local_path, remote_blob_path):
dir_path = os.path.dirname(remote_blob_path)
fs = HdfsClient(hosts=started_cluster.hdfs_ip)
exists = fs.exists(dir_path)
if not exists:
fs.mkdirs(dir_path)
hdfs_api = self.clickhouse_node.hdfs_api
hdfs_api.write_file(remote_blob_path, local_path)
class AzureUploader(CloudUploader):
def __init__(self, blob_service_client, container_name):
self.blob_service_client = blob_service_client
self.container_client = self.blob_service_client.get_container_client(
container_name
)
def upload_file(self, local_path, remote_blob_path, container_name=None):
if container_name is None:
container_client = self.container_client
else:
container_client = self.blob_service_client.get_container_client(
container_name
)
blob_client = container_client.get_blob_client(remote_blob_path)
with open(local_path, "rb") as data:
blob_client.upload_blob(data, overwrite=True)
def upload_directory(minio_client, bucket, local_path, remote_path):
return S3Uploader(minio_client=minio_client, bucket_name=bucket).upload_directory(
local_path, remote_path
)
def get_file_contents(minio_client, bucket, s3_path):
data = minio_client.get_object(bucket, s3_path)
data_str = b""
for chunk in data.stream():
data_str += chunk
return data_str.decode()
def list_s3_objects(minio_client, bucket, prefix=""):
prefix_len = len(prefix)
return [
obj.object_name[prefix_len:]
for obj in minio_client.list_objects(bucket, prefix=prefix, recursive=True)
]
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization.
bucket_read_write_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*",
},
],
}
minio_client = started_cluster.minio_client
minio_client.set_bucket_policy(
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
)
started_cluster.minio_restricted_bucket = "{}-with-auth".format(
started_cluster.minio_bucket
)
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
minio_client.make_bucket(started_cluster.minio_restricted_bucket)