ClickHouse/tests/integration/test_storage_hudi/test.py
2022-11-17 11:54:13 +00:00

204 lines
6.3 KiB
Python

import logging
import os
import json
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
def prepare_s3_bucket(started_cluster):
bucket_read_write_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*",
},
],
}
minio_client = started_cluster.minio_client
minio_client.set_bucket_policy(
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
)
def upload_test_table(started_cluster):
bucket = started_cluster.minio_bucket
for address, dirs, files in os.walk(SCRIPT_DIR + "/test_table"):
address_without_prefix = address[len(SCRIPT_DIR) :]
for name in files:
started_cluster.minio_client.fput_object(
bucket,
os.path.join(address_without_prefix, name),
os.path.join(address, name),
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("main_server", with_minio=True)
logging.info("Starting cluster...")
cluster.start()
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
upload_test_table(cluster)
logging.info("Test table uploaded")
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
def test_create_query(started_cluster):
instance = started_cluster.instances["main_server"]
bucket = started_cluster.minio_bucket
create_query = f"""CREATE TABLE hudi ENGINE=Hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123')"""
run_query(instance, create_query)
def test_select_query(started_cluster):
instance = started_cluster.instances["main_server"]
bucket = started_cluster.minio_bucket
columns = [
"_hoodie_commit_time",
"_hoodie_commit_seqno",
"_hoodie_record_key",
"_hoodie_partition_path",
"_hoodie_file_name",
"begin_lat",
"begin_lon",
"driver",
"end_lat",
"end_lon",
"fare",
"partitionpath",
"rider",
"ts",
"uuid",
]
# create query in case table doesn't exist
create_query = f"""CREATE TABLE IF NOT EXISTS hudi ENGINE=Hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123')"""
run_query(instance, create_query)
select_query = "SELECT {} FROM hudi FORMAT TSV"
select_table_function_query = "SELECT {col} FROM hudi('http://{ip}:{port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV"
for column_name in columns:
result = run_query(instance, select_query.format(column_name)).splitlines()
assert len(result) > 0
for column_name in columns:
result = run_query(
instance,
select_table_function_query.format(
col=column_name,
ip=started_cluster.minio_ip,
port=started_cluster.minio_port,
bucket=bucket,
),
).splitlines()
assert len(result) > 0
# test if all partition paths is presented in result
distinct_select_query = (
"SELECT DISTINCT partitionpath FROM hudi ORDER BY partitionpath FORMAT TSV"
)
distinct_select_table_function_query = "SELECT DISTINCT partitionpath FROM hudi('http://{ip}:{port}/{bucket}/test_table/', 'minio', 'minio123') ORDER BY partitionpath FORMAT TSV"
result = run_query(instance, distinct_select_query)
result_table_function = run_query(
instance,
distinct_select_table_function_query.format(
ip=started_cluster.minio_ip, port=started_cluster.minio_port, bucket=bucket
),
)
expected = [
"americas/brazil/sao_paulo",
"americas/united_states/san_francisco",
"asia/india/chennai",
]
assert TSV(result) == TSV(expected)
assert TSV(result_table_function) == TSV(expected)
def test_describe_query(started_cluster):
instance = started_cluster.instances["main_server"]
bucket = started_cluster.minio_bucket
result = instance.query(
f"DESCRIBE hudi('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/test_table/', 'minio', 'minio123') FORMAT TSV",
)
assert result == TSV(
[
["_hoodie_commit_time", "Nullable(String)"],
["_hoodie_commit_seqno", "Nullable(String)"],
["_hoodie_record_key", "Nullable(String)"],
["_hoodie_partition_path", "Nullable(String)"],
["_hoodie_file_name", "Nullable(String)"],
["begin_lat", "Nullable(Float64)"],
["begin_lon", "Nullable(Float64)"],
["driver", "Nullable(String)"],
["end_lat", "Nullable(Float64)"],
["end_lon", "Nullable(Float64)"],
["fare", "Nullable(Float64)"],
["partitionpath", "Nullable(String)"],
["rider", "Nullable(String)"],
["ts", "Nullable(Int64)"],
["uuid", "Nullable(String)"],
]
)