mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
better
This commit is contained in:
parent
65a55b1010
commit
fc89b4fd35
@ -172,7 +172,7 @@ class ClickHouseCluster:
|
||||
self.minio_host = "minio1"
|
||||
self.minio_bucket = "root"
|
||||
self.minio_bucket_2 = "root2"
|
||||
self.minio_port = 9001
|
||||
self.minio_port = get_open_port()
|
||||
self.minio_client = None # type: Minio
|
||||
self.minio_redirect_host = "proxy1"
|
||||
self.minio_redirect_port = 8080
|
||||
|
@ -157,7 +157,7 @@ if __name__ == "__main__":
|
||||
parallel_args = ""
|
||||
if args.parallel:
|
||||
parallel_args += "--dist=loadfile"
|
||||
parallel_args += "-n {}".format(args.parallel)
|
||||
parallel_args += " -n {}".format(args.parallel)
|
||||
|
||||
net = ""
|
||||
if not args.disable_net_host:
|
||||
|
@ -14,9 +14,10 @@ from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
logging.getLogger().addHandler(logging.StreamHandler())
|
||||
|
||||
MINIO_INTERNAL_PORT = 9001
|
||||
|
||||
# Creates S3 bucket for tests and allows anonymous read-write access to it.
|
||||
def prepare_s3_bucket(cluster):
|
||||
def prepare_s3_bucket(started_cluster):
|
||||
# Allows read-write access for bucket without authorization.
|
||||
bucket_read_write_policy = {"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
@ -50,26 +51,26 @@ def prepare_s3_bucket(cluster):
|
||||
}
|
||||
]}
|
||||
|
||||
minio_client = cluster.minio_client
|
||||
minio_client.set_bucket_policy(cluster.minio_bucket, json.dumps(bucket_read_write_policy))
|
||||
minio_client = started_cluster.minio_client
|
||||
minio_client.set_bucket_policy(started_cluster.minio_bucket, json.dumps(bucket_read_write_policy))
|
||||
|
||||
cluster.minio_restricted_bucket = "{}-with-auth".format(cluster.minio_bucket)
|
||||
if minio_client.bucket_exists(cluster.minio_restricted_bucket):
|
||||
minio_client.remove_bucket(cluster.minio_restricted_bucket)
|
||||
started_cluster.minio_restricted_bucket = "{}-with-auth".format(started_cluster.minio_bucket)
|
||||
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
|
||||
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
|
||||
|
||||
minio_client.make_bucket(cluster.minio_restricted_bucket)
|
||||
minio_client.make_bucket(started_cluster.minio_restricted_bucket)
|
||||
|
||||
|
||||
def put_s3_file_content(cluster, bucket, filename, data):
|
||||
def put_s3_file_content(started_cluster, bucket, filename, data):
|
||||
buf = io.BytesIO(data)
|
||||
cluster.minio_client.put_object(bucket, filename, buf, len(data))
|
||||
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
|
||||
|
||||
|
||||
# Returns content of given S3 file as string.
|
||||
def get_s3_file_content(cluster, bucket, filename, decode=True):
|
||||
def get_s3_file_content(started_cluster, bucket, filename, decode=True):
|
||||
# type: (ClickHouseCluster, str) -> str
|
||||
|
||||
data = cluster.minio_client.get_object(bucket, filename)
|
||||
data = started_cluster.minio_client.get_object(bucket, filename)
|
||||
data_str = b""
|
||||
for chunk in data.stream():
|
||||
data_str += chunk
|
||||
@ -79,7 +80,7 @@ def get_s3_file_content(cluster, bucket, filename, decode=True):
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster():
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
cluster.add_instance("restricted_dummy", main_configs=["configs/config_for_test_remote_host_filter.xml"],
|
||||
@ -115,17 +116,17 @@ def run_query(instance, query, stdin=None, settings=None):
|
||||
("'minio','minio123',", True),
|
||||
("'wrongid','wrongkey',", False)
|
||||
])
|
||||
def test_put(cluster, maybe_auth, positive):
|
||||
def test_put(started_cluster, maybe_auth, positive):
|
||||
# type: (ClickHouseCluster) -> None
|
||||
|
||||
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
bucket = started_cluster.minio_bucket if not maybe_auth else started_cluster.minio_restricted_bucket
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
|
||||
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
|
||||
filename = "test.csv"
|
||||
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') values {}".format(
|
||||
cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format, values)
|
||||
started_cluster.minio_host, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format, values)
|
||||
|
||||
try:
|
||||
run_query(instance, put_query)
|
||||
@ -134,18 +135,18 @@ def test_put(cluster, maybe_auth, positive):
|
||||
raise
|
||||
else:
|
||||
assert positive
|
||||
assert values_csv == get_s3_file_content(cluster, bucket, filename)
|
||||
assert values_csv == get_s3_file_content(started_cluster, bucket, filename)
|
||||
|
||||
|
||||
# Test put no data to S3.
|
||||
@pytest.mark.parametrize("auth", [
|
||||
"'minio','minio123',"
|
||||
])
|
||||
def test_empty_put(cluster, auth):
|
||||
def test_empty_put(started_cluster, auth):
|
||||
# type: (ClickHouseCluster) -> None
|
||||
|
||||
bucket = cluster.minio_bucket
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
|
||||
create_empty_table_query = """
|
||||
@ -158,13 +159,13 @@ def test_empty_put(cluster, auth):
|
||||
|
||||
filename = "empty_put_test.csv"
|
||||
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') select * from empty_table".format(
|
||||
cluster.minio_host, cluster.minio_port, bucket, filename, auth, table_format)
|
||||
started_cluster.minio_host, MINIO_INTERNAL_PORT, bucket, filename, auth, table_format)
|
||||
|
||||
run_query(instance, put_query)
|
||||
|
||||
try:
|
||||
run_query(instance, "select count(*) from s3('http://{}:{}/{}/{}', {}'CSV', '{}')".format(
|
||||
cluster.minio_host, cluster.minio_port, bucket, filename, auth, table_format))
|
||||
started_cluster.minio_host, MINIO_INTERNAL_PORT, bucket, filename, auth, table_format))
|
||||
|
||||
assert False, "Query should be failed."
|
||||
except helpers.client.QueryRuntimeException as e:
|
||||
@ -177,15 +178,15 @@ def test_empty_put(cluster, auth):
|
||||
("'minio','minio123',", True),
|
||||
("'wrongid','wrongkey',", False)
|
||||
])
|
||||
def test_put_csv(cluster, maybe_auth, positive):
|
||||
def test_put_csv(started_cluster, maybe_auth, positive):
|
||||
# type: (ClickHouseCluster) -> None
|
||||
|
||||
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
bucket = started_cluster.minio_bucket if not maybe_auth else started_cluster.minio_restricted_bucket
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
filename = "test.csv"
|
||||
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
|
||||
cluster.minio_host, cluster.minio_port, bucket, filename, maybe_auth, table_format)
|
||||
started_cluster.minio_host, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format)
|
||||
csv_data = "8,9,16\n11,18,13\n22,14,2\n"
|
||||
|
||||
try:
|
||||
@ -195,27 +196,27 @@ def test_put_csv(cluster, maybe_auth, positive):
|
||||
raise
|
||||
else:
|
||||
assert positive
|
||||
assert csv_data == get_s3_file_content(cluster, bucket, filename)
|
||||
assert csv_data == get_s3_file_content(started_cluster, bucket, filename)
|
||||
|
||||
|
||||
# Test put and get with S3 server redirect.
|
||||
def test_put_get_with_redirect(cluster):
|
||||
def test_put_get_with_redirect(started_cluster):
|
||||
# type: (ClickHouseCluster) -> None
|
||||
|
||||
bucket = cluster.minio_bucket
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
|
||||
values_csv = "1,1,1\n1,1,1\n11,11,11\n"
|
||||
filename = "test.csv"
|
||||
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
||||
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format, values)
|
||||
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
|
||||
run_query(instance, query)
|
||||
|
||||
assert values_csv == get_s3_file_content(cluster, bucket, filename)
|
||||
assert values_csv == get_s3_file_content(started_cluster, bucket, filename)
|
||||
|
||||
query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/{}', 'CSV', '{}')".format(
|
||||
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format)
|
||||
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format)
|
||||
stdout = run_query(instance, query)
|
||||
|
||||
assert list(map(str.split, stdout.splitlines())) == [
|
||||
@ -226,23 +227,23 @@ def test_put_get_with_redirect(cluster):
|
||||
|
||||
|
||||
# Test put with restricted S3 server redirect.
|
||||
def test_put_with_zero_redirect(cluster):
|
||||
def test_put_with_zero_redirect(started_cluster):
|
||||
# type: (ClickHouseCluster) -> None
|
||||
|
||||
bucket = cluster.minio_bucket
|
||||
instance = cluster.instances["s3_max_redirects"] # type: ClickHouseInstance
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["s3_max_redirects"] # type: ClickHouseInstance
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
|
||||
filename = "test.csv"
|
||||
|
||||
# Should work without redirect
|
||||
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
||||
cluster.minio_host, cluster.minio_port, bucket, filename, table_format, values)
|
||||
started_cluster.minio_host, MINIO_INTERNAL_PORT, bucket, filename, table_format, values)
|
||||
run_query(instance, query)
|
||||
|
||||
# Should not work with redirect
|
||||
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
||||
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, table_format, values)
|
||||
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
|
||||
exception_raised = False
|
||||
try:
|
||||
run_query(instance, query)
|
||||
@ -253,11 +254,11 @@ def test_put_with_zero_redirect(cluster):
|
||||
assert exception_raised
|
||||
|
||||
|
||||
def test_put_get_with_globs(cluster):
|
||||
def test_put_get_with_globs(started_cluster):
|
||||
# type: (ClickHouseCluster) -> None
|
||||
|
||||
bucket = cluster.minio_bucket
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
max_path = ""
|
||||
for i in range(10):
|
||||
@ -266,11 +267,11 @@ def test_put_get_with_globs(cluster):
|
||||
max_path = max(path, max_path)
|
||||
values = "({},{},{})".format(i, j, i + j)
|
||||
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
||||
cluster.minio_host, cluster.minio_port, bucket, path, table_format, values)
|
||||
started_cluster.minio_host, MINIO_INTERNAL_PORT, bucket, path, table_format, values)
|
||||
run_query(instance, query)
|
||||
|
||||
query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from s3('http://{}:{}/{}/*_{{a,b,c,d}}/%3f.csv', 'CSV', '{}')".format(
|
||||
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, table_format)
|
||||
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, table_format)
|
||||
assert run_query(instance, query).splitlines() == [
|
||||
"450\t450\t900\t0.csv\t{bucket}/{max_path}".format(bucket=bucket, max_path=max_path)]
|
||||
|
||||
@ -281,11 +282,11 @@ def test_put_get_with_globs(cluster):
|
||||
# ("'minio','minio123',",True), Redirect with credentials not working with nginx.
|
||||
("'wrongid','wrongkey',", False)
|
||||
])
|
||||
def test_multipart_put(cluster, maybe_auth, positive):
|
||||
def test_multipart_put(started_cluster, maybe_auth, positive):
|
||||
# type: (ClickHouseCluster) -> None
|
||||
|
||||
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
bucket = started_cluster.minio_bucket if not maybe_auth else started_cluster.minio_restricted_bucket
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
|
||||
# Minimum size of part is 5 Mb for Minio.
|
||||
@ -303,7 +304,7 @@ def test_multipart_put(cluster, maybe_auth, positive):
|
||||
|
||||
filename = "test_multipart.csv"
|
||||
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
|
||||
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format)
|
||||
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format)
|
||||
|
||||
try:
|
||||
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes,
|
||||
@ -315,23 +316,23 @@ def test_multipart_put(cluster, maybe_auth, positive):
|
||||
assert positive
|
||||
|
||||
# Use proxy access logs to count number of parts uploaded to Minio.
|
||||
proxy_logs = cluster.get_container_logs("proxy1") # type: str
|
||||
proxy_logs = started_cluster.get_container_logs("proxy1") # type: str
|
||||
assert proxy_logs.count("PUT /{}/{}".format(bucket, filename)) >= 2
|
||||
|
||||
assert csv_data == get_s3_file_content(cluster, bucket, filename)
|
||||
assert csv_data == get_s3_file_content(started_cluster, bucket, filename)
|
||||
|
||||
|
||||
def test_remote_host_filter(cluster):
|
||||
instance = cluster.instances["restricted_dummy"]
|
||||
def test_remote_host_filter(started_cluster):
|
||||
instance = started_cluster.instances["restricted_dummy"]
|
||||
format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
|
||||
query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(
|
||||
"invalid_host", cluster.minio_port, cluster.minio_bucket, format)
|
||||
"invalid_host", MINIO_INTERNAL_PORT, started_cluster.minio_bucket, format)
|
||||
assert "not allowed in config.xml" in instance.query_and_get_error(query)
|
||||
|
||||
other_values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
|
||||
query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(
|
||||
"invalid_host", cluster.minio_port, cluster.minio_bucket, format, other_values)
|
||||
"invalid_host", MINIO_INTERNAL_PORT, started_cluster.minio_bucket, format, other_values)
|
||||
assert "not allowed in config.xml" in instance.query_and_get_error(query)
|
||||
|
||||
|
||||
@ -339,8 +340,8 @@ def test_remote_host_filter(cluster):
|
||||
"''", # 1 arguments
|
||||
"'','','','','',''" # 6 arguments
|
||||
])
|
||||
def test_wrong_s3_syntax(cluster, s3_storage_args):
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
def test_wrong_s3_syntax(started_cluster, s3_storage_args):
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
expected_err_msg = "Code: 42" # NUMBER_OF_ARGUMENTS_DOESNT_MATCH
|
||||
|
||||
query = "create table test_table_s3_syntax (id UInt32) ENGINE = S3({})".format(s3_storage_args)
|
||||
@ -348,9 +349,9 @@ def test_wrong_s3_syntax(cluster, s3_storage_args):
|
||||
|
||||
|
||||
# https://en.wikipedia.org/wiki/One_Thousand_and_One_Nights
|
||||
def test_s3_glob_scheherazade(cluster):
|
||||
bucket = cluster.minio_bucket
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
def test_s3_glob_scheherazade(started_cluster):
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
max_path = ""
|
||||
values = "(1, 1, 1)"
|
||||
@ -361,7 +362,7 @@ def test_s3_glob_scheherazade(cluster):
|
||||
for i in range(start, end):
|
||||
path = "night_{}/tale.csv".format(i)
|
||||
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
|
||||
cluster.minio_host, cluster.minio_port, bucket, path, table_format, values)
|
||||
started_cluster.minio_host, MINIO_INTERNAL_PORT, bucket, path, table_format, values)
|
||||
run_query(instance, query)
|
||||
|
||||
jobs.append(threading.Thread(target=add_tales, args=(night, min(night + nights_per_job, 1001))))
|
||||
@ -371,20 +372,20 @@ def test_s3_glob_scheherazade(cluster):
|
||||
job.join()
|
||||
|
||||
query = "select count(), sum(column1), sum(column2), sum(column3) from s3('http://{}:{}/{}/night_*/tale.csv', 'CSV', '{}')".format(
|
||||
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, table_format)
|
||||
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, table_format)
|
||||
assert run_query(instance, query).splitlines() == ["1001\t1001\t1001\t1001"]
|
||||
|
||||
|
||||
def run_s3_mock(cluster):
|
||||
def run_s3_mock(started_cluster):
|
||||
logging.info("Starting s3 mock")
|
||||
container_id = cluster.get_container_id('resolver')
|
||||
container_id = started_cluster.get_container_id('resolver')
|
||||
current_dir = os.path.dirname(__file__)
|
||||
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mock", "mock_s3.py"), "mock_s3.py")
|
||||
cluster.exec_in_container(container_id, ["python", "mock_s3.py"], detach=True)
|
||||
started_cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mock", "mock_s3.py"), "mock_s3.py")
|
||||
started_cluster.exec_in_container(container_id, ["python", "mock_s3.py"], detach=True)
|
||||
|
||||
# Wait for S3 mock start
|
||||
for attempt in range(10):
|
||||
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
|
||||
ping_response = started_cluster.exec_in_container(started_cluster.get_container_id('resolver'),
|
||||
["curl", "-s", "http://resolver:8080/"], nothrow=True)
|
||||
if ping_response != 'OK':
|
||||
if attempt == 9:
|
||||
@ -397,25 +398,25 @@ def run_s3_mock(cluster):
|
||||
logging.info("S3 mock started")
|
||||
|
||||
|
||||
def test_custom_auth_headers(cluster):
|
||||
def test_custom_auth_headers(started_cluster):
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
filename = "test.csv"
|
||||
get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format(
|
||||
bucket=cluster.minio_restricted_bucket,
|
||||
bucket=started_cluster.minio_restricted_bucket,
|
||||
file=filename,
|
||||
table_format=table_format)
|
||||
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
result = run_query(instance, get_query)
|
||||
assert result == '1\t2\t3\n'
|
||||
|
||||
|
||||
def test_custom_auth_headers_exclusion(cluster):
|
||||
def test_custom_auth_headers_exclusion(started_cluster):
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
filename = "test.csv"
|
||||
get_query = f"SELECT * FROM s3('http://resolver:8080/{cluster.minio_restricted_bucket}/restricteddirectory/{filename}', 'CSV', '{table_format}')"
|
||||
get_query = f"SELECT * FROM s3('http://resolver:8080/{started_cluster.minio_restricted_bucket}/restricteddirectory/{filename}', 'CSV', '{table_format}')"
|
||||
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
with pytest.raises(helpers.client.QueryRuntimeException) as ei:
|
||||
result = run_query(instance, get_query)
|
||||
print(result)
|
||||
@ -423,33 +424,13 @@ def test_custom_auth_headers_exclusion(cluster):
|
||||
assert ei.value.returncode == 243
|
||||
assert '403 Forbidden' in ei.value.stderr
|
||||
|
||||
|
||||
def test_infinite_redirect(cluster):
|
||||
bucket = "redirected"
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
filename = "test.csv"
|
||||
get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format(
|
||||
bucket=bucket,
|
||||
file=filename,
|
||||
table_format=table_format)
|
||||
instance = cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
exception_raised = False
|
||||
try:
|
||||
run_query(instance, get_query)
|
||||
except Exception as e:
|
||||
assert str(e).find("Too many redirects while trying to access") != -1
|
||||
exception_raised = True
|
||||
finally:
|
||||
assert exception_raised
|
||||
|
||||
|
||||
@pytest.mark.parametrize("extension,method", [
|
||||
("bin", "gzip"),
|
||||
("gz", "auto")
|
||||
])
|
||||
def test_storage_s3_get_gzip(cluster, extension, method):
|
||||
bucket = cluster.minio_bucket
|
||||
instance = cluster.instances["dummy"]
|
||||
def test_storage_s3_get_gzip(started_cluster, extension, method):
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"]
|
||||
filename = f"test_get_gzip.{extension}"
|
||||
name = "test_get_gzip"
|
||||
data = [
|
||||
@ -474,11 +455,11 @@ def test_storage_s3_get_gzip(cluster, extension, method):
|
||||
compressed = gzip.GzipFile(fileobj=buf, mode="wb")
|
||||
compressed.write(("\n".join(data)).encode())
|
||||
compressed.close()
|
||||
put_s3_file_content(cluster, bucket, filename, buf.getvalue())
|
||||
put_s3_file_content(started_cluster, bucket, filename, buf.getvalue())
|
||||
|
||||
try:
|
||||
run_query(instance, f"""CREATE TABLE {name} (name String, id UInt32) ENGINE = S3(
|
||||
'http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/{filename}',
|
||||
'http://{started_cluster.minio_host}:{MINIO_INTERNAL_PORT}/{bucket}/{filename}',
|
||||
'CSV',
|
||||
'{method}')""")
|
||||
|
||||
@ -488,9 +469,9 @@ def test_storage_s3_get_gzip(cluster, extension, method):
|
||||
run_query(instance, f"DROP TABLE {name}")
|
||||
|
||||
|
||||
def test_storage_s3_put_uncompressed(cluster):
|
||||
bucket = cluster.minio_bucket
|
||||
instance = cluster.instances["dummy"]
|
||||
def test_storage_s3_put_uncompressed(started_cluster):
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"]
|
||||
filename = "test_put_uncompressed.bin"
|
||||
name = "test_put_uncompressed"
|
||||
data = [
|
||||
@ -512,13 +493,13 @@ def test_storage_s3_put_uncompressed(cluster):
|
||||
]
|
||||
try:
|
||||
run_query(instance, "CREATE TABLE {} (name String, id UInt32) ENGINE = S3('http://{}:{}/{}/{}', 'CSV')".format(
|
||||
name, cluster.minio_host, cluster.minio_port, bucket, filename))
|
||||
name, started_cluster.minio_host, MINIO_INTERNAL_PORT, bucket, filename))
|
||||
|
||||
run_query(instance, "INSERT INTO {} VALUES ({})".format(name, "),(".join(data)))
|
||||
|
||||
run_query(instance, "SELECT sum(id) FROM {}".format(name)).splitlines() == ["753"]
|
||||
|
||||
uncompressed_content = get_s3_file_content(cluster, bucket, filename)
|
||||
uncompressed_content = get_s3_file_content(started_cluster, bucket, filename)
|
||||
assert sum([ int(i.split(',')[1]) for i in uncompressed_content.splitlines() ]) == 753
|
||||
finally:
|
||||
run_query(instance, f"DROP TABLE {name}")
|
||||
@ -528,9 +509,9 @@ def test_storage_s3_put_uncompressed(cluster):
|
||||
("bin", "gzip"),
|
||||
("gz", "auto")
|
||||
])
|
||||
def test_storage_s3_put_gzip(cluster, extension, method):
|
||||
bucket = cluster.minio_bucket
|
||||
instance = cluster.instances["dummy"]
|
||||
def test_storage_s3_put_gzip(started_cluster, extension, method):
|
||||
bucket = started_cluster.minio_bucket
|
||||
instance = started_cluster.instances["dummy"]
|
||||
filename = f"test_put_gzip.{extension}"
|
||||
name = "test_put_gzip"
|
||||
data = [
|
||||
@ -552,7 +533,7 @@ def test_storage_s3_put_gzip(cluster, extension, method):
|
||||
]
|
||||
try:
|
||||
run_query(instance, f"""CREATE TABLE {name} (name String, id UInt32) ENGINE = S3(
|
||||
'http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/{filename}',
|
||||
'http://{started_cluster.minio_host}:{MINIO_INTERNAL_PORT}/{bucket}/{filename}',
|
||||
'CSV',
|
||||
'{method}')""")
|
||||
|
||||
@ -560,7 +541,7 @@ def test_storage_s3_put_gzip(cluster, extension, method):
|
||||
|
||||
run_query(instance, "SELECT sum(id) FROM {}".format(name)).splitlines() == ["708"]
|
||||
|
||||
buf = io.BytesIO(get_s3_file_content(cluster, bucket, filename, decode=False))
|
||||
buf = io.BytesIO(get_s3_file_content(started_cluster, bucket, filename, decode=False))
|
||||
f = gzip.GzipFile(fileobj=buf, mode="rb")
|
||||
uncompressed_content = f.read().decode()
|
||||
assert sum([ int(i.split(',')[1]) for i in uncompressed_content.splitlines() ]) == 708
|
||||
|
125
tests/integration/test_storage_s3/test_redirect.py
Normal file
125
tests/integration/test_storage_s3/test_redirect.py
Normal file
@ -0,0 +1,125 @@
|
||||
import gzip
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import io
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
|
||||
import helpers.client
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
|
||||
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
logging.getLogger().addHandler(logging.StreamHandler())
|
||||
|
||||
# Creates S3 bucket for tests and allows anonymous read-write access to it.
|
||||
def prepare_s3_bucket(cluster):
|
||||
# Allows read-write access for bucket without authorization.
|
||||
bucket_read_write_policy = {"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Sid": "",
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": "s3:GetBucketLocation",
|
||||
"Resource": "arn:aws:s3:::root"
|
||||
},
|
||||
{
|
||||
"Sid": "",
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": "s3:ListBucket",
|
||||
"Resource": "arn:aws:s3:::root"
|
||||
},
|
||||
{
|
||||
"Sid": "",
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": "s3:GetObject",
|
||||
"Resource": "arn:aws:s3:::root/*"
|
||||
},
|
||||
{
|
||||
"Sid": "",
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": "s3:PutObject",
|
||||
"Resource": "arn:aws:s3:::root/*"
|
||||
}
|
||||
]}
|
||||
|
||||
minio_client = cluster.minio_client
|
||||
minio_client.set_bucket_policy(cluster.minio_bucket, json.dumps(bucket_read_write_policy))
|
||||
|
||||
cluster.minio_restricted_bucket = "{}-with-auth".format(cluster.minio_bucket)
|
||||
if minio_client.bucket_exists(cluster.minio_restricted_bucket):
|
||||
minio_client.remove_bucket(cluster.minio_restricted_bucket)
|
||||
|
||||
minio_client.make_bucket(cluster.minio_restricted_bucket)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__, name="redirect")
|
||||
cluster.add_instance("dummy", with_minio=True, main_configs=["configs/defaultS3.xml"])
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
|
||||
prepare_s3_bucket(cluster)
|
||||
logging.info("S3 bucket created")
|
||||
run_s3_mock(cluster)
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
def run_query(instance, query, stdin=None, settings=None):
|
||||
# type: (ClickHouseInstance, str, object, dict) -> str
|
||||
|
||||
logging.info("Running query '{}'...".format(query))
|
||||
result = instance.query(query, stdin=stdin, settings=settings)
|
||||
logging.info("Query finished")
|
||||
|
||||
return result
|
||||
|
||||
def run_s3_mock(cluster):
|
||||
logging.info("Starting s3 mock")
|
||||
container_id = cluster.get_container_id('resolver')
|
||||
current_dir = os.path.dirname(__file__)
|
||||
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mock", "mock_s3.py"), "mock_s3.py")
|
||||
cluster.exec_in_container(container_id, ["python", "mock_s3.py"], detach=True)
|
||||
|
||||
# Wait for S3 mock start
|
||||
for attempt in range(10):
|
||||
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
|
||||
["curl", "-s", "http://resolver:8080/"], nothrow=True)
|
||||
if ping_response != 'OK':
|
||||
if attempt == 9:
|
||||
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
|
||||
else:
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
|
||||
logging.info("S3 mock started")
|
||||
|
||||
def test_infinite_redirect(started_cluster):
|
||||
bucket = "redirected"
|
||||
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
|
||||
filename = "test.csv"
|
||||
get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format(
|
||||
bucket=bucket,
|
||||
file=filename,
|
||||
table_format=table_format)
|
||||
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
|
||||
exception_raised = False
|
||||
try:
|
||||
run_query(instance, get_query)
|
||||
except Exception as e:
|
||||
assert str(e).find("Too many redirects while trying to access") != -1
|
||||
exception_raised = True
|
||||
finally:
|
||||
assert exception_raised
|
Loading…
Reference in New Issue
Block a user