ClickHouse/tests/integration/test_storage_s3/test.py

703 lines
30 KiB
Python
Raw Normal View History

2020-09-28 23:30:41 +00:00
import gzip
2019-09-19 09:34:33 +00:00
import json
import logging
import os
2020-10-02 16:54:07 +00:00
import io
2020-01-27 21:44:18 +00:00
import random
2020-05-25 09:15:11 +00:00
import threading
import time
2019-09-19 09:34:33 +00:00
import helpers.client
2019-06-26 00:41:14 +00:00
import pytest
2021-06-21 08:02:27 +00:00
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_instances_dir
2019-09-19 09:34:33 +00:00
2021-02-20 14:59:39 +00:00
MINIO_INTERNAL_PORT = 9001
2019-09-19 09:34:33 +00:00
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
2021-06-21 08:02:27 +00:00
CONFIG_PATH = os.path.join(SCRIPT_DIR, './{}/dummy/configs/config.d/defaultS3.xml'.format(get_instances_dir()))
2019-09-19 09:34:33 +00:00
# Creates S3 bucket for tests and allows anonymous read-write access to it.
2021-02-20 14:59:39 +00:00
def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization.
bucket_read_write_policy = {"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root"
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root"
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*"
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*"
}
]}
2021-02-20 14:59:39 +00:00
minio_client = started_cluster.minio_client
minio_client.set_bucket_policy(started_cluster.minio_bucket, json.dumps(bucket_read_write_policy))
2021-02-20 14:59:39 +00:00
started_cluster.minio_restricted_bucket = "{}-with-auth".format(started_cluster.minio_bucket)
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
2021-02-20 14:59:39 +00:00
minio_client.make_bucket(started_cluster.minio_restricted_bucket)
2021-02-20 14:59:39 +00:00
def put_s3_file_content(started_cluster, bucket, filename, data):
2020-10-02 16:54:07 +00:00
buf = io.BytesIO(data)
2021-02-20 14:59:39 +00:00
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
2020-09-28 23:30:41 +00:00
# Returns content of given S3 file as string.
2021-02-20 14:59:39 +00:00
def get_s3_file_content(started_cluster, bucket, filename, decode=True):
2021-06-02 15:08:16 +00:00
# type: (ClickHouseCluster, str, str, bool) -> str
2021-02-20 14:59:39 +00:00
data = started_cluster.minio_client.get_object(bucket, filename)
2020-10-02 16:54:07 +00:00
data_str = b""
for chunk in data.stream():
data_str += chunk
2020-10-02 16:54:07 +00:00
if decode:
return data_str.decode()
return data_str
2019-09-19 09:34:33 +00:00
2019-06-26 00:41:14 +00:00
@pytest.fixture(scope="module")
2021-02-20 14:59:39 +00:00
def started_cluster():
2019-06-26 00:41:14 +00:00
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("restricted_dummy", main_configs=["configs/config_for_test_remote_host_filter.xml"],
with_minio=True)
cluster.add_instance("dummy", with_minio=True, main_configs=["configs/defaultS3.xml"])
cluster.add_instance("s3_max_redirects", with_minio=True, main_configs=["configs/defaultS3.xml"],
user_configs=["configs/s3_max_redirects.xml"])
logging.info("Starting cluster...")
2019-06-26 00:41:14 +00:00
cluster.start()
logging.info("Cluster started")
2019-09-19 09:34:33 +00:00
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
2021-04-12 08:55:54 +00:00
run_s3_mocks(cluster)
2019-09-19 09:34:33 +00:00
2019-06-26 00:41:14 +00:00
yield cluster
finally:
cluster.shutdown()
2019-09-24 10:58:42 +00:00
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
2019-09-22 10:42:47 +00:00
logging.info("Running query '{}'...".format(query))
2019-09-24 10:58:42 +00:00
result = instance.query(query, stdin=stdin, settings=settings)
2019-09-22 10:42:47 +00:00
logging.info("Query finished")
2019-06-26 00:41:14 +00:00
return result
2019-09-22 10:42:47 +00:00
2021-03-02 16:53:03 +00:00
# Test simple put. Also checks that wrong credentials produce an error with every compression method.
@pytest.mark.parametrize("maybe_auth,positive,compression", [
2021-04-12 07:03:12 +00:00
pytest.param("", True, 'auto', id="positive"),
pytest.param("'minio','minio123',", True, 'auto', id="auth_positive"),
2021-04-29 14:26:41 +00:00
pytest.param("'wrongid','wrongkey',", False, 'auto', id="auto"),
pytest.param("'wrongid','wrongkey',", False, 'gzip', id="gzip"),
pytest.param("'wrongid','wrongkey',", False, 'deflate', id="deflate"),
pytest.param("'wrongid','wrongkey',", False, 'brotli', id="brotli"),
pytest.param("'wrongid','wrongkey',", False, 'xz', id="xz"),
pytest.param("'wrongid','wrongkey',", False, 'zstd', id="zstd")
])
def test_put(started_cluster, maybe_auth, positive, compression):
# type: (ClickHouseCluster) -> None
2019-09-19 09:34:33 +00:00
2021-02-20 14:59:39 +00:00
bucket = started_cluster.minio_bucket if not maybe_auth else started_cluster.minio_restricted_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
2019-09-22 10:42:47 +00:00
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test.csv"
put_query = f"""insert into table function s3('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{filename}',
{maybe_auth}'CSV', '{table_format}', {compression}) values {values}"""
try:
run_query(instance, put_query)
except helpers.client.QueryRuntimeException:
2019-12-03 16:23:24 +00:00
if positive:
raise
else:
assert positive
2021-02-20 14:59:39 +00:00
assert values_csv == get_s3_file_content(started_cluster, bucket, filename)
2021-05-27 06:14:12 +00:00
def test_distributed_put(cluster):
bucket = cluster.minio_bucket if not maybe_auth else cluster.minio_restricted_bucket
instance = cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test_{_partition_id}.csv"
put_query = f"""insert into table function s3('http://{cluster.minio_host}:{cluster.minio_port}/{bucket}/{filename}',
'CSV', '{table_format}') PARTITION BY column3 values {values}"""
try:
run_query(instance, put_query)
except helpers.client.QueryRuntimeException:
if positive:
raise
else:
assert positive
assert "1,2,3\n" == get_s3_file_content(cluster, bucket, "test_3.csv")
assert "3,2,1\n" == get_s3_file_content(cluster, bucket, "test_1.csv")
assert "78,43,45\n" == get_s3_file_content(cluster, bucket, "test_45.csv")
@pytest.mark.parametrize("special", [
"space",
"plus"
])
2021-05-12 07:03:53 +00:00
def test_get_file_with_special(started_cluster, special):
symbol = {"space": " ", "plus": "+"}[special]
urlsafe_symbol = {"space": "%20", "plus": "%2B"}[special]
auth = "'minio','minio123',"
2021-05-12 07:03:53 +00:00
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["dummy"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [[12549, 2463, 19893], [64021, 38652, 66703], [81611, 39650, 83516], [11079, 59507, 61546], [51764, 69952, 6876], [41165, 90293, 29095], [40167, 78432, 48309], [81629, 81327, 11855], [55852, 21643, 98507], [6738, 54643, 41155]]
values_csv = ('\n'.join((','.join(map(str, row)) for row in values)) + '\n').encode()
filename = f"get_file_with_{special}_{symbol}two.csv"
2021-05-12 07:03:53 +00:00
put_s3_file_content(started_cluster, bucket, filename, values_csv)
2021-05-12 07:03:53 +00:00
get_query = f"SELECT * FROM s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/get_file_with_{special}_{urlsafe_symbol}two.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values
2021-05-12 07:03:53 +00:00
get_query = f"SELECT * FROM s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/get_file_with_{special}*.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values
2021-05-12 07:03:53 +00:00
get_query = f"SELECT * FROM s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/get_file_with_{special}_{urlsafe_symbol}*.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert [list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()] == values
@pytest.mark.parametrize("special", [
"space",
"plus",
"plus2"
])
2021-05-12 07:03:53 +00:00
def test_get_path_with_special(started_cluster, special):
symbol = {"space": "%20", "plus": "%2B", "plus2": "%2B"}[special]
safe_symbol = {"space": "%20", "plus": "+", "plus2": "%2B"}[special]
auth = "'minio','minio123',"
table_format = "column1 String"
2021-05-12 07:03:53 +00:00
instance = started_cluster.instances["dummy"]
get_query = f"SELECT * FROM s3('http://resolver:8082/get-my-path/{safe_symbol}.csv', {auth}'CSV', '{table_format}') FORMAT TSV"
assert run_query(instance, get_query).splitlines() == [f"/{symbol}.csv"]
# Test put no data to S3.
@pytest.mark.parametrize("auth", [
2021-04-13 10:52:22 +00:00
pytest.param("'minio','minio123',", id="minio")
])
2021-02-20 14:59:39 +00:00
def test_empty_put(started_cluster, auth):
2021-06-02 15:08:16 +00:00
# type: (ClickHouseCluster, str) -> None
2021-02-20 14:59:39 +00:00
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
drop_empty_table_query = "DROP TABLE IF EXISTS empty_table"
create_empty_table_query = """
CREATE TABLE empty_table (
{}
) ENGINE = Null()
""".format(table_format)
run_query(instance, drop_empty_table_query)
run_query(instance, create_empty_table_query)
filename = "empty_put_test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') select * from empty_table".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, auth, table_format)
run_query(instance, put_query)
try:
run_query(instance, "select count(*) from s3('http://{}:{}/{}/{}', {}'CSV', '{}')".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, auth, table_format))
assert False, "Query should be failed."
except helpers.client.QueryRuntimeException as e:
assert str(e).find("The specified key does not exist") != 0
# Test put values in CSV format.
@pytest.mark.parametrize("maybe_auth,positive", [
2021-04-12 07:03:12 +00:00
pytest.param("", True, id="positive"),
pytest.param("'minio','minio123',", True, id="auth_positive"),
pytest.param("'wrongid','wrongkey',", False, id="negative"),
])
2021-02-20 14:59:39 +00:00
def test_put_csv(started_cluster, maybe_auth, positive):
2021-06-02 15:08:16 +00:00
# type: (ClickHouseCluster, bool, str) -> None
2021-02-20 14:59:39 +00:00
bucket = started_cluster.minio_bucket if not maybe_auth else started_cluster.minio_restricted_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, maybe_auth, table_format)
2019-09-22 10:42:47 +00:00
csv_data = "8,9,16\n11,18,13\n22,14,2\n"
try:
run_query(instance, put_query, stdin=csv_data)
except helpers.client.QueryRuntimeException:
2019-12-03 16:23:24 +00:00
if positive:
raise
else:
assert positive
2021-02-20 14:59:39 +00:00
assert csv_data == get_s3_file_content(started_cluster, bucket, filename)
# Test put and get with S3 server redirect.
2021-02-20 14:59:39 +00:00
def test_put_get_with_redirect(started_cluster):
# type: (ClickHouseCluster) -> None
2021-02-20 14:59:39 +00:00
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
values_csv = "1,1,1\n1,1,1\n11,11,11\n"
filename = "test.csv"
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
2021-02-20 14:59:39 +00:00
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
2019-09-19 09:34:33 +00:00
run_query(instance, query)
2021-02-20 14:59:39 +00:00
assert values_csv == get_s3_file_content(started_cluster, bucket, filename)
query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/{}', 'CSV', '{}')".format(
2021-02-20 14:59:39 +00:00
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format)
2019-09-19 09:34:33 +00:00
stdout = run_query(instance, query)
2019-09-19 09:34:33 +00:00
assert list(map(str.split, stdout.splitlines())) == [
2019-09-22 10:42:47 +00:00
["1", "1", "1", "1"],
["1", "1", "1", "1"],
["11", "11", "11", "1331"],
2019-06-26 00:41:14 +00:00
]
# Test put with restricted S3 server redirect.
2021-02-20 14:59:39 +00:00
def test_put_with_zero_redirect(started_cluster):
2021-06-21 16:07:17 +00:00
# type: (ClickHouseCluster) -> None
2020-11-20 08:18:44 +00:00
2021-02-20 14:59:39 +00:00
bucket = started_cluster.minio_bucket
2021-06-21 16:07:17 +00:00
instance = started_cluster.instances["s3_max_redirects"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
2020-11-20 08:18:44 +00:00
values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
filename = "test.csv"
2021-06-21 16:07:17 +00:00
# Should work without redirect
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename, table_format, values)
2020-11-20 08:18:44 +00:00
run_query(instance, query)
2021-06-21 16:07:17 +00:00
# Should not work with redirect
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
2021-02-20 14:59:39 +00:00
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, table_format, values)
2021-06-21 16:07:17 +00:00
exception_raised = False
2020-11-20 08:18:44 +00:00
try:
run_query(instance, query)
2021-06-21 16:07:17 +00:00
except Exception as e:
assert str(e).find("Too many redirects while trying to access") != -1
exception_raised = True
2020-11-20 08:18:44 +00:00
finally:
assert exception_raised
2021-02-20 14:59:39 +00:00
def test_put_get_with_globs(started_cluster):
2020-01-27 21:44:18 +00:00
# type: (ClickHouseCluster) -> None
unique_prefix = random.randint(1,10000)
2021-02-20 14:59:39 +00:00
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
2020-01-27 21:44:18 +00:00
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
max_path = ""
2020-01-27 21:44:18 +00:00
for i in range(10):
for j in range(10):
path = "{}/{}_{}/{}.csv".format(unique_prefix, i, random.choice(['a', 'b', 'c', 'd']), j)
max_path = max(path, max_path)
values = "({},{},{})".format(i, j, i + j)
2020-01-27 21:44:18 +00:00
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, path, table_format, values)
2020-01-27 21:44:18 +00:00
run_query(instance, query)
query = "select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from s3('http://{}:{}/{}/{}/*_{{a,b,c,d}}/%3f.csv', 'CSV', '{}')".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, unique_prefix, table_format)
assert run_query(instance, query).splitlines() == [
"450\t450\t900\t0.csv\t{bucket}/{max_path}".format(bucket=bucket, max_path=max_path)]
2020-01-27 21:44:18 +00:00
# Test multipart put.
@pytest.mark.parametrize("maybe_auth,positive", [
2021-04-12 07:03:12 +00:00
pytest.param("", True, id="positive"),
pytest.param("'wrongid','wrongkey'", False, id="negative"),
2019-12-03 16:23:24 +00:00
# ("'minio','minio123',",True), Redirect with credentials not working with nginx.
])
2021-02-20 14:59:39 +00:00
def test_multipart_put(started_cluster, maybe_auth, positive):
# type: (ClickHouseCluster) -> None
2021-02-20 14:59:39 +00:00
bucket = started_cluster.minio_bucket if not maybe_auth else started_cluster.minio_restricted_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
# Minimum size of part is 5 Mb for Minio.
# See: https://github.com/minio/minio/blob/master/docs/minio-limits.md
min_part_size_bytes = 5 * 1024 * 1024
csv_size_bytes = int(min_part_size_bytes * 1.5) # To have 2 parts.
one_line_length = 6 # 3 digits, 2 commas, 1 line separator.
# Generate data having size more than one part
2020-10-02 16:54:07 +00:00
int_data = [[1, 2, 3] for i in range(csv_size_bytes // one_line_length)]
csv_data = "".join(["{},{},{}\n".format(x, y, z) for x, y, z in int_data])
assert len(csv_data) > min_part_size_bytes
filename = "test_multipart.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
2021-02-20 14:59:39 +00:00
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format)
try:
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes,
's3_max_single_part_upload_size': 0})
except helpers.client.QueryRuntimeException:
2019-12-03 16:23:24 +00:00
if positive:
raise
else:
assert positive
2020-07-10 19:42:18 +00:00
# Use proxy access logs to count number of parts uploaded to Minio.
2021-02-20 14:59:39 +00:00
proxy_logs = started_cluster.get_container_logs("proxy1") # type: str
2020-07-10 19:42:18 +00:00
assert proxy_logs.count("PUT /{}/{}".format(bucket, filename)) >= 2
2021-02-20 14:59:39 +00:00
assert csv_data == get_s3_file_content(started_cluster, bucket, filename)
2019-11-06 17:06:50 +00:00
2019-12-03 17:36:02 +00:00
2021-02-20 14:59:39 +00:00
def test_remote_host_filter(started_cluster):
instance = started_cluster.instances["restricted_dummy"]
2019-11-06 17:06:50 +00:00
format = "column1 UInt32, column2 UInt32, column3 UInt32"
query = "select *, column1*column2*column3 from s3('http://{}:{}/{}/test.csv', 'CSV', '{}')".format(
2021-02-20 14:59:39 +00:00
"invalid_host", MINIO_INTERNAL_PORT, started_cluster.minio_bucket, format)
2019-11-06 17:06:50 +00:00
assert "not allowed in config.xml" in instance.query_and_get_error(query)
other_values = "(1, 1, 1), (1, 1, 1), (11, 11, 11)"
query = "insert into table function s3('http://{}:{}/{}/test.csv', 'CSV', '{}') values {}".format(
2021-02-20 14:59:39 +00:00
"invalid_host", MINIO_INTERNAL_PORT, started_cluster.minio_bucket, format, other_values)
2019-11-06 17:06:50 +00:00
assert "not allowed in config.xml" in instance.query_and_get_error(query)
@pytest.mark.parametrize("s3_storage_args", [
2021-04-12 07:03:12 +00:00
pytest.param("''", id="1_argument"),
pytest.param("'','','','','',''", id="6_arguments"),
])
2021-02-20 14:59:39 +00:00
def test_wrong_s3_syntax(started_cluster, s3_storage_args):
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
expected_err_msg = "Code: 42" # NUMBER_OF_ARGUMENTS_DOESNT_MATCH
query = "create table test_table_s3_syntax (id UInt32) ENGINE = S3({})".format(s3_storage_args)
assert expected_err_msg in instance.query_and_get_error(query)
2020-05-25 09:15:11 +00:00
2020-05-25 21:05:15 +00:00
# https://en.wikipedia.org/wiki/One_Thousand_and_One_Nights
2021-02-20 14:59:39 +00:00
def test_s3_glob_scheherazade(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
2020-05-25 09:15:11 +00:00
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
max_path = ""
values = "(1, 1, 1)"
nights_per_job = 1001 // 30
jobs = []
for night in range(0, 1001, nights_per_job):
def add_tales(start, end):
for i in range(start, end):
path = "night_{}/tale.csv".format(i)
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format(
started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, path, table_format, values)
2020-05-25 09:15:11 +00:00
run_query(instance, query)
jobs.append(threading.Thread(target=add_tales, args=(night, min(night + nights_per_job, 1001))))
2020-05-25 09:15:11 +00:00
jobs[-1].start()
for job in jobs:
job.join()
query = "select count(), sum(column1), sum(column2), sum(column3) from s3('http://{}:{}/{}/night_*/tale.csv', 'CSV', '{}')".format(
2021-02-20 14:59:39 +00:00
started_cluster.minio_redirect_host, started_cluster.minio_redirect_port, bucket, table_format)
2020-05-25 09:15:11 +00:00
assert run_query(instance, query).splitlines() == ["1001\t1001\t1001\t1001"]
def run_s3_mocks(started_cluster):
2021-04-12 08:55:54 +00:00
logging.info("Starting s3 mocks")
mocks = (
("mock_s3.py", "resolver", "8080"),
("unstable_server.py", "resolver", "8081"),
("echo.py", "resolver", "8082"),
2021-04-12 08:55:54 +00:00
)
for mock_filename, container, port in mocks:
container_id = started_cluster.get_container_id(container)
2021-04-12 08:55:54 +00:00
current_dir = os.path.dirname(__file__)
started_cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_mocks", mock_filename), mock_filename)
started_cluster.exec_in_container(container_id, ["python", mock_filename, port], detach=True)
2021-04-12 08:55:54 +00:00
# Wait for S3 mocks to start
for mock_filename, container, port in mocks:
num_attempts = 100
for attempt in range(num_attempts):
ping_response = started_cluster.exec_in_container(started_cluster.get_container_id(container),
2021-06-01 14:18:35 +00:00
["curl", "-s", f"http://localhost:{port}/"], nothrow=True)
2021-04-12 08:55:54 +00:00
if ping_response != 'OK':
if attempt == num_attempts - 1:
2021-04-12 08:55:54 +00:00
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else:
time.sleep(1)
else:
2021-05-12 07:03:53 +00:00
logging.debug(f"mock {mock_filename} ({port}) answered {ping_response} on attempt {attempt}")
2021-04-12 08:55:54 +00:00
break
2021-04-12 08:55:54 +00:00
logging.info("S3 mocks started")
def replace_config(old, new):
config = open(CONFIG_PATH, 'r')
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(CONFIG_PATH, 'w')
config.writelines(config_lines)
config.close()
2021-02-20 14:59:39 +00:00
def test_custom_auth_headers(started_cluster):
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format(
2021-02-20 14:59:39 +00:00
bucket=started_cluster.minio_restricted_bucket,
file=filename,
table_format=table_format)
2021-02-20 14:59:39 +00:00
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
result = run_query(instance, get_query)
assert result == '1\t2\t3\n'
instance.query("DROP TABLE IF EXISTS test")
instance.query(
"CREATE TABLE test ({table_format}) ENGINE = S3('http://resolver:8080/{bucket}/{file}', 'CSV')".format(
2021-03-26 18:46:42 +00:00
bucket=started_cluster.minio_restricted_bucket,
file=filename,
table_format=table_format
))
assert run_query(instance, "SELECT * FROM test") == '1\t2\t3\n'
replace_config("<header>Authorization: Bearer TOKEN", "<header>Authorization: Bearer INVALID_TOKEN")
instance.query("SYSTEM RELOAD CONFIG")
ret, err = instance.query_and_get_answer_with_error("SELECT * FROM test")
assert ret == "" and err != ""
replace_config("<header>Authorization: Bearer INVALID_TOKEN", "<header>Authorization: Bearer TOKEN")
instance.query("SYSTEM RELOAD CONFIG")
assert run_query(instance, "SELECT * FROM test") == '1\t2\t3\n'
instance.query("DROP TABLE test")
2021-02-20 14:59:39 +00:00
def test_custom_auth_headers_exclusion(started_cluster):
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
2021-02-20 14:59:39 +00:00
get_query = f"SELECT * FROM s3('http://resolver:8080/{started_cluster.minio_restricted_bucket}/restricteddirectory/{filename}', 'CSV', '{table_format}')"
2021-02-20 14:59:39 +00:00
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
with pytest.raises(helpers.client.QueryRuntimeException) as ei:
result = run_query(instance, get_query)
print(result)
assert ei.value.returncode == 243
2021-05-02 10:55:24 +00:00
assert 'Forbidden Error' in ei.value.stderr
2021-04-29 11:57:48 +00:00
def test_infinite_redirect(started_cluster):
bucket = "redirected"
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
get_query = f"select * from s3('http://resolver:{started_cluster.minio_redirect_port}/{bucket}/{filename}', 'CSV', '{table_format}')"
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
exception_raised = False
try:
run_query(instance, get_query)
except Exception as e:
assert str(e).find("Too many redirects while trying to access") != -1
exception_raised = True
finally:
assert exception_raised
@pytest.mark.parametrize("extension,method", [
2021-04-12 07:03:12 +00:00
pytest.param("bin", "gzip", id="bin"),
pytest.param("gz", "auto", id="gz"),
])
2021-02-20 14:59:39 +00:00
def test_storage_s3_get_gzip(started_cluster, extension, method):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
filename = f"test_get_gzip.{extension}"
name = f"test_get_gzip_{extension}"
2020-09-28 23:30:41 +00:00
data = [
"Sophia Intrieri,55",
"Jack Taylor,71",
"Christopher Silva,66",
"Clifton Purser,35",
"Richard Aceuedo,43",
"Lisa Hensley,31",
"Alice Wehrley,1",
"Mary Farmer,47",
"Samara Ramirez,19",
"Shirley Lloyd,51",
"Santos Cowger,0",
"Richard Mundt,88",
"Jerry Gonzalez,15",
"Angela James,10",
"Norman Ortega,33",
""
]
run_query(instance, f"DROP TABLE IF EXISTS {name}")
2020-10-02 16:54:07 +00:00
buf = io.BytesIO()
2020-09-28 23:30:41 +00:00
compressed = gzip.GzipFile(fileobj=buf, mode="wb")
2020-10-02 16:54:07 +00:00
compressed.write(("\n".join(data)).encode())
2020-09-28 23:30:41 +00:00
compressed.close()
2021-02-20 14:59:39 +00:00
put_s3_file_content(started_cluster, bucket, filename, buf.getvalue())
2020-09-28 23:30:41 +00:00
run_query(instance, f"""CREATE TABLE {name} (name String, id UInt32) ENGINE = S3(
'http://{started_cluster.minio_ip}:{MINIO_INTERNAL_PORT}/{bucket}/{filename}',
'CSV',
'{method}')""")
2020-09-28 23:30:41 +00:00
run_query(instance, f"SELECT sum(id) FROM {name}").splitlines() == ["565"]
run_query(instance, f"DROP TABLE {name}")
2020-09-28 23:30:41 +00:00
def test_storage_s3_get_unstable(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
2021-04-12 21:38:45 +00:00
table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64"
get_query = f"SELECT count(), sum(column3), sum(column4) FROM s3('http://resolver:8081/{started_cluster.minio_bucket}/test.csv', 'CSV', '{table_format}') FORMAT CSV"
2021-04-12 08:55:54 +00:00
result = run_query(instance, get_query)
assert result.splitlines() == ["500001,500000,0"]
2021-04-12 08:55:54 +00:00
2021-02-20 14:59:39 +00:00
def test_storage_s3_put_uncompressed(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
filename = "test_put_uncompressed.bin"
name = "test_put_uncompressed"
data = [
"'Gloria Thompson',99",
"'Matthew Tang',98",
"'Patsy Anderson',23",
"'Nancy Badillo',93",
"'Roy Hunt',5",
"'Adam Kirk',51",
"'Joshua Douds',28",
"'Jolene Ryan',0",
"'Roxanne Padilla',50",
"'Howard Roberts',41",
"'Ricardo Broughton',13",
"'Roland Speer',83",
"'Cathy Cohan',58",
"'Kathie Dawson',100",
"'Gregg Mcquistion',11",
]
run_query(instance, "CREATE TABLE {} (name String, id UInt32) ENGINE = S3('http://{}:{}/{}/{}', 'CSV')".format(
name, started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, filename))
run_query(instance, "INSERT INTO {} VALUES ({})".format(name, "),(".join(data)))
run_query(instance, "SELECT sum(id) FROM {}".format(name)).splitlines() == ["753"]
uncompressed_content = get_s3_file_content(started_cluster, bucket, filename)
assert sum([ int(i.split(',')[1]) for i in uncompressed_content.splitlines() ]) == 753
@pytest.mark.parametrize("extension,method", [
2021-04-12 07:03:12 +00:00
pytest.param("bin", "gzip", id="bin"),
pytest.param("gz", "auto", id="gz")
])
2021-02-20 14:59:39 +00:00
def test_storage_s3_put_gzip(started_cluster, extension, method):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
filename = f"test_put_gzip.{extension}"
name = f"test_put_gzip_{extension}"
2020-09-28 23:30:41 +00:00
data = [
"'Joseph Tomlinson',5",
"'Earnest Essary',44",
"'Matha Pannell',24",
"'Michael Shavers',46",
"'Elias Groce',38",
"'Pamela Bramlet',50",
"'Lewis Harrell',49",
"'Tamara Fyall',58",
"'George Dixon',38",
"'Alice Walls',49",
"'Paula Mais',24",
"'Myrtle Pelt',93",
"'Sylvia Naffziger',18",
"'Amanda Cave',83",
"'Yolanda Joseph',89"
]
run_query(instance, f"""CREATE TABLE {name} (name String, id UInt32) ENGINE = S3(
'http://{started_cluster.minio_ip}:{MINIO_INTERNAL_PORT}/{bucket}/{filename}',
'CSV',
'{method}')""")
2020-09-28 23:30:41 +00:00
run_query(instance, f"INSERT INTO {name} VALUES ({'),('.join(data)})")
run_query(instance, f"SELECT sum(id) FROM {name}").splitlines() == ["708"]
2020-09-28 23:30:41 +00:00
buf = io.BytesIO(get_s3_file_content(started_cluster, bucket, filename, decode=False))
f = gzip.GzipFile(fileobj=buf, mode="rb")
uncompressed_content = f.read().decode()
assert sum([ int(i.split(',')[1]) for i in uncompressed_content.splitlines() ]) == 708
2021-06-21 15:44:36 +00:00
def test_truncate_table(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
name = "truncate"
instance.query("CREATE TABLE {} (id UInt32) ENGINE = S3('http://{}:{}/{}/{}', 'CSV')".format(
name, started_cluster.minio_ip, MINIO_INTERNAL_PORT, bucket, name))
instance.query("INSERT INTO {} SELECT number FROM numbers(10)".format(name))
result = instance.query("SELECT * FROM {}".format(name))
assert result == instance.query("SELECT number FROM numbers(10)")
instance.query("TRUNCATE TABLE {}".format(name))
minio = started_cluster.minio_client
timeout = 30
while timeout > 0:
if len(list(minio.list_objects(started_cluster.minio_bucket, 'truncate/'))) == 0:
return
timeout -= 1
time.sleep(1)
assert(len(list(minio.list_objects(started_cluster.minio_bucket, 'truncate/'))) == 0)
assert instance.query("SELECT * FROM {}".format(name)) == ""