mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-16 19:32:07 +00:00
97f2a2213e
* Move some code outside dbms/src folder * Fix paths
287 lines
11 KiB
Python
287 lines
11 KiB
Python
import json
|
|
import logging
|
|
import random
|
|
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
|
|
|
import helpers.client
|
|
|
|
|
|
logging.getLogger().setLevel(logging.INFO)
|
|
logging.getLogger().addHandler(logging.StreamHandler())
|
|
|
|
|
|
# Creates S3 bucket for tests and allows anonymous read-write access to it.
|
|
def prepare_s3_bucket(cluster):
|
|
minio_client = cluster.minio_client
|
|
|
|
if minio_client.bucket_exists(cluster.minio_bucket):
|
|
minio_client.remove_bucket(cluster.minio_bucket)
|
|
|
|
minio_client.make_bucket(cluster.minio_bucket)
|
|
|
|
# Allows read-write access for bucket without authorization.
|
|
bucket_read_write_policy = {"Version": "2012-10-17",
|
|
"Statement": [
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": "s3:GetBucketLocation",
|
|
"Resource": "arn:aws:s3:::root"
|
|
},
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": "s3:ListBucket",
|
|
"Resource": "arn:aws:s3:::root"
|
|
},
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": "s3:GetObject",
|
|
"Resource": "arn:aws:s3:::root/*"
|
|
},
|
|
{
|
|
"Sid": "",
|
|
"Effect": "Allow",
|
|
"Principal": {"AWS": "*"},
|
|
"Action": "s3:PutObject",
|
|
"Resource": "arn:aws:s3:::root/*"
|
|
}
|
|
]}
|
|
|
|
minio_client.set_bucket_policy(cluster.minio_bucket, json.dumps(bucket_read_write_policy))
|
|
|
|
cluster.minio_restricted_bucket = "{}-with-auth".format(cluster.minio_bucket)
|
|
if minio_client.bucket_exists(cluster.minio_restricted_bucket):
|
|
minio_client.remove_bucket(cluster.minio_restricted_bucket)
|
|
|
|
minio_client.make_bucket(cluster.minio_restricted_bucket)
|
|
|
|
|
|
# Returns content of given S3 file as string.
|
|
def get_s3_file_content(cluster, bucket, filename):
|
|
# type: (ClickHouseCluster, str) -> str
|
|
|
|
data = cluster.minio_client.get_object(bucket, filename)
|
|
data_str = ""
|
|
for chunk in data.stream():
|
|
data_str += chunk
|
|
return data_str
|
|
|
|
|
|
# Returns nginx access log lines.
|
|
def get_nginx_access_logs():
|
|
handle = open("/nginx/access.log", "r")
|
|
data = handle.readlines()
|
|
handle.close()
|
|
return data
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def cluster():
|
|
try:
|
|
cluster = ClickHouseCluster(__file__)
|
|
cluster.add_instance("restricted_dummy", main_configs=["configs/config_for_test_remote_host_filter.xml"], with_minio=True)
|
|
cluster.add_instance("dummy", with_minio=True)
|
|
logging.info("Starting cluster...")
|
|
cluster.start()
|
|
logging.info("Cluster started")
|
|
|
|
prepare_s3_bucket(cluster)
|
|
logging.info("S3 bucket created")
|
|
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def run_query(instance, query, stdin=None, settings=None):
|
|
# type: (ClickHouseInstance, str, object, dict) -> str
|
|
|
|
logging.info("Running query '{}'...".format(query))
|
|
result = instance.query(query, stdin=stdin, settings=settings)
|
|
logging.info("Query finished")
|
|
|
|
return result
|
|
|
|
|
|
# Test simple put.
|
|
@pytest.mark.parametrize("maybe_auth,positive", [
|
|
("", True),
|
|
("'minio','minio123',", True),
|
|
("'wrongid','wrongkey',", False)
|
|
])
|
|
def test_put(cluster, maybe_auth, positive):
|
|
# type: (ClickHouseCluster) -> None
|
|
|
|
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
|
|
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
|
|
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
|
|
filename = "test.csv"
|
|
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') values {}".format(
|
|
cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format, values)
|
|
|
|
try:
|
|
run_query(instance, put_query)
|
|
except helpers.client.QueryRuntimeException:
|
|
if positive:
|
|
raise
|
|
else:
|
|
assert positive
|
|
assert values_csv == get_s3_file_content(cluster, bucket, filename)
|
|
|
|
|
|
# Test put values in CSV format.
|
|
@pytest.mark.parametrize("maybe_auth,positive", [
|
|
("", True),
|
|
("'minio','minio123',", True),
|
|
("'wrongid','wrongkey',", False)
|
|
])
|
|
def test_put_csv(cluster, maybe_auth, positive):
|
|
# type: (ClickHouseCluster) -> None
|
|
|
|
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
|
|
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
filename = "test.csv"
|
|
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
|
|
cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format)
|
|
csv_data = "8,9,16\n11,18,13\n22,14,2\n"
|
|
|
|
try:
|
|
run_query(instance, put_query, stdin=csv_data)
|
|
except helpers.client.QueryRuntimeException:
|
|
if positive:
|
|
raise
|
|
else:
|
|
assert positive
|
|
assert csv_data == get_s3_file_content(cluster, bucket, filename)
|
|
|
|
|
|
# Test put and get with S3 server redirect.
|
|
def test_put_get_with_redirect(cluster):
|
|
# type: (ClickHouseCluster) -> None
|
|
|
|
bucket = cluster.minio_bucket
|
|
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
|
|
values_csv = "1,1,1\n1,1,1\n11,11,11\n"
|
|
filename = "test.csv"
|
|
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
|
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format, values)
|
|
run_query(instance, query)
|
|
|
|
assert values_csv == get_s3_file_content(cluster, bucket, filename)
|
|
|
|
query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/{}', 'CSV', '{}')".format(
|
|
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format)
|
|
stdout = run_query(instance, query)
|
|
|
|
assert list(map(str.split, stdout.splitlines())) == [
|
|
["1", "1", "1", "1"],
|
|
["1", "1", "1", "1"],
|
|
["11", "11", "11", "1331"],
|
|
]
|
|
|
|
|
|
def test_put_get_with_globs(cluster):
|
|
# type: (ClickHouseCluster) -> None
|
|
|
|
bucket = cluster.minio_bucket
|
|
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
max_path = ""
|
|
for i in range(10):
|
|
for j in range(10):
|
|
path = "{}_{}/{}.csv".format(i, random.choice(['a', 'b', 'c', 'd']), j)
|
|
max_path = max(path, max_path)
|
|
values = "({},{},{})".format(i, j, i+j)
|
|
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
|
cluster.minio_host, cluster.minio_port, bucket, path, table_format, values)
|
|
run_query(instance, query)
|
|
|
|
query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from s3('http://{}:{}/{}/*_{{a,b,c,d}}/%3f.csv', 'CSV', '{}')".format(
|
|
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, table_format)
|
|
assert run_query(instance, query).splitlines() == ["450\t450\t900\t0.csv\t{bucket}/{max_path}".format(bucket=bucket, max_path=max_path)]
|
|
|
|
|
|
# Test multipart put.
|
|
@pytest.mark.parametrize("maybe_auth,positive", [
|
|
("", True),
|
|
# ("'minio','minio123',",True), Redirect with credentials not working with nginx.
|
|
("'wrongid','wrongkey',", False)
|
|
])
|
|
def test_multipart_put(cluster, maybe_auth, positive):
|
|
# type: (ClickHouseCluster) -> None
|
|
|
|
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
|
|
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
|
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
|
|
# Minimum size of part is 5 Mb for Minio.
|
|
# See: https://github.com/minio/minio/blob/master/docs/minio-limits.md
|
|
min_part_size_bytes = 5 * 1024 * 1024
|
|
csv_size_bytes = int(min_part_size_bytes * 1.5) # To have 2 parts.
|
|
|
|
one_line_length = 6 # 3 digits, 2 commas, 1 line separator.
|
|
|
|
# Generate data having size more than one part
|
|
int_data = [[1, 2, 3] for i in range(csv_size_bytes / one_line_length)]
|
|
csv_data = "".join(["{},{},{}\n".format(x, y, z) for x, y, z in int_data])
|
|
|
|
assert len(csv_data) > min_part_size_bytes
|
|
|
|
filename = "test_multipart.csv"
|
|
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
|
|
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format)
|
|
|
|
try:
|
|
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes})
|
|
except helpers.client.QueryRuntimeException:
|
|
if positive:
|
|
raise
|
|
else:
|
|
assert positive
|
|
|
|
# Use Nginx access logs to count number of parts uploaded to Minio.
|
|
nginx_logs = get_nginx_access_logs()
|
|
uploaded_parts = filter(lambda log_line: log_line.find(filename) >= 0 and log_line.find("PUT") >= 0, nginx_logs)
|
|
assert len(uploaded_parts) > 1
|
|
|
|
assert csv_data == get_s3_file_content(cluster, bucket, filename)
|
|
|
|
|
|
def test_remote_host_filter(cluster):
|
|
instance = cluster.instances["restricted_dummy"]
|
|
format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
|
|
|
query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(
|
|
"invalid_host", cluster.minio_port, cluster.minio_bucket, format)
|
|
assert "not allowed in config.xml" in instance.query_and_get_error(query)
|
|
|
|
other_values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
|
|
query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(
|
|
"invalid_host", cluster.minio_port, cluster.minio_bucket, format, other_values)
|
|
assert "not allowed in config.xml" in instance.query_and_get_error(query)
|
|
|
|
|
|
@pytest.mark.parametrize("s3_storage_args", [
|
|
"''", # 1 arguments
|
|
"'','','','','',''" # 6 arguments
|
|
])
|
|
def test_wrong_s3_syntax(cluster, s3_storage_args):
|
|
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
|
expected_err_msg = "Code: 42" # NUMBER_OF_ARGUMENTS_DOESNT_MATCH
|
|
|
|
query = "create table test_table_s3_syntax (id UInt32) ENGINE = S3({})".format(s3_storage_args)
|
|
assert expected_err_msg in instance.query_and_get_error(query)
|