Merge remote-tracking branch 'qoega/improve-integration-tests-1' into improve-integration-tests-3

This commit is contained in:
Yatsishin Ilya 2024-07-23 22:14:37 +00:00
commit 39d4cf8b7f

View File

@ -154,6 +154,7 @@ def test_put(started_cluster, maybe_auth, positive, compression):
def test_partition_by(started_cluster): def test_partition_by(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32" table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -161,26 +162,37 @@ def test_partition_by(started_cluster):
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)" values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_{_partition_id}.csv" filename = "test_{_partition_id}.csv"
put_query = f"""INSERT INTO TABLE FUNCTION put_query = f"""INSERT INTO TABLE FUNCTION
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV', '{table_format}') s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{filename}', 'CSV', '{table_format}')
PARTITION BY {partition_by} VALUES {values}""" PARTITION BY {partition_by} VALUES {values}"""
run_query(instance, put_query) run_query(instance, put_query)
assert "1,2,3\n" == get_s3_file_content(started_cluster, bucket, "test_3.csv") assert "1,2,3\n" == get_s3_file_content(started_cluster, bucket, f"{id}/test_3.csv")
assert "3,2,1\n" == get_s3_file_content(started_cluster, bucket, "test_1.csv") assert "3,2,1\n" == get_s3_file_content(started_cluster, bucket, f"{id}/test_1.csv")
assert "78,43,45\n" == get_s3_file_content(started_cluster, bucket, "test_45.csv") assert "78,43,45\n" == get_s3_file_content(
started_cluster, bucket, f"{id}/test_45.csv"
)
filename = "test2_{_partition_id}.csv" filename = "test2_{_partition_id}.csv"
instance.query( instance.query(
f"create table p ({table_format}) engine=S3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV') partition by column3" f"create table p ({table_format}) engine=S3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{filename}', 'CSV') partition by column3"
) )
instance.query(f"insert into p values {values}") instance.query(f"insert into p values {values}")
assert "1,2,3\n" == get_s3_file_content(started_cluster, bucket, "test2_3.csv") assert "1,2,3\n" == get_s3_file_content(
assert "3,2,1\n" == get_s3_file_content(started_cluster, bucket, "test2_1.csv") started_cluster, bucket, f"{id}/test2_3.csv"
assert "78,43,45\n" == get_s3_file_content(started_cluster, bucket, "test2_45.csv") )
assert "3,2,1\n" == get_s3_file_content(
started_cluster, bucket, f"{id}/test2_1.csv"
)
assert "78,43,45\n" == get_s3_file_content(
started_cluster, bucket, f"{id}/test2_45.csv"
)
instance.query("drop table p")
def test_partition_by_string_column(started_cluster): def test_partition_by_string_column(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "col_num UInt32, col_str String" table_format = "col_num UInt32, col_str String"
@ -188,21 +200,24 @@ def test_partition_by_string_column(started_cluster):
values = "(1, 'foo/bar'), (3, 'йцук'), (78, '你好')" values = "(1, 'foo/bar'), (3, 'йцук'), (78, '你好')"
filename = "test_{_partition_id}.csv" filename = "test_{_partition_id}.csv"
put_query = f"""INSERT INTO TABLE FUNCTION put_query = f"""INSERT INTO TABLE FUNCTION
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV', '{table_format}') s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{filename}', 'CSV', '{table_format}')
PARTITION BY {partition_by} VALUES {values}""" PARTITION BY {partition_by} VALUES {values}"""
run_query(instance, put_query) run_query(instance, put_query)
assert '1,"foo/bar"\n' == get_s3_file_content( assert '1,"foo/bar"\n' == get_s3_file_content(
started_cluster, bucket, "test_foo/bar.csv" started_cluster, bucket, f"{id}/test_foo/bar.csv"
)
assert '3,"йцук"\n' == get_s3_file_content(
started_cluster, bucket, f"{id}/test_йцук.csv"
) )
assert '3,"йцук"\n' == get_s3_file_content(started_cluster, bucket, "test_йцук.csv")
assert '78,"你好"\n' == get_s3_file_content( assert '78,"你好"\n' == get_s3_file_content(
started_cluster, bucket, "test_你好.csv" started_cluster, bucket, f"{id}/test_你好.csv"
) )
def test_partition_by_const_column(started_cluster): def test_partition_by_const_column(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32" table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -211,12 +226,14 @@ def test_partition_by_const_column(started_cluster):
values_csv = "1,2,3\n3,2,1\n78,43,45\n" values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test_{_partition_id}.csv" filename = "test_{_partition_id}.csv"
put_query = f"""INSERT INTO TABLE FUNCTION put_query = f"""INSERT INTO TABLE FUNCTION
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV', '{table_format}') s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{filename}', 'CSV', '{table_format}')
PARTITION BY {partition_by} VALUES {values}""" PARTITION BY {partition_by} VALUES {values}"""
run_query(instance, put_query) run_query(instance, put_query)
assert values_csv == get_s3_file_content(started_cluster, bucket, "test_88.csv") assert values_csv == get_s3_file_content(
started_cluster, bucket, f"{id}/test_88.csv"
)
@pytest.mark.parametrize("special", ["space", "plus"]) @pytest.mark.parametrize("special", ["space", "plus"])
@ -276,46 +293,31 @@ def test_get_path_with_special(started_cluster, special):
@pytest.mark.parametrize("auth", [pytest.param("'minio','minio123',", id="minio")]) @pytest.mark.parametrize("auth", [pytest.param("'minio','minio123',", id="minio")])
def test_empty_put(started_cluster, auth): def test_empty_put(started_cluster, auth):
# type: (ClickHouseCluster, str) -> None # type: (ClickHouseCluster, str) -> None
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32" table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
drop_empty_table_query = "DROP TABLE IF EXISTS empty_table" drop_empty_table_query = "DROP TABLE IF EXISTS empty_table"
create_empty_table_query = """ create_empty_table_query = (
CREATE TABLE empty_table ( f"CREATE TABLE empty_table ({table_format}) ENGINE = Null()"
{}
) ENGINE = Null()
""".format(
table_format
) )
run_query(instance, drop_empty_table_query) run_query(instance, drop_empty_table_query)
run_query(instance, create_empty_table_query) run_query(instance, create_empty_table_query)
filename = "empty_put_test.csv" filename = "empty_put_test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') select * from empty_table".format( put_query = f"""insert into table function
started_cluster.minio_ip, s3('http://{started_cluster.minio_ip}:{MINIO_INTERNAL_PORT}/{bucket}/{id}/{filename}', {auth} 'CSV', '{table_format}')
MINIO_INTERNAL_PORT, select * from empty_table"""
bucket,
filename,
auth,
table_format,
)
run_query(instance, put_query) run_query(instance, put_query)
assert ( assert (
run_query( run_query(
instance, instance,
"select count(*) from s3('http://{}:{}/{}/{}', {}'CSV', '{}')".format( f"""select count(*) from
started_cluster.minio_ip, s3('http://{started_cluster.minio_ip}:{MINIO_INTERNAL_PORT}/{bucket}/{id}/{filename}', {auth} 'CSV', '{table_format}')""",
MINIO_INTERNAL_PORT,
bucket,
filename,
auth,
table_format,
),
) )
== "0\n" == "0\n"
) )
@ -499,6 +501,7 @@ def test_put_get_with_globs(started_cluster):
def test_multipart(started_cluster, maybe_auth, positive): def test_multipart(started_cluster, maybe_auth, positive):
# type: (ClickHouseCluster, str, bool) -> None # type: (ClickHouseCluster, str, bool) -> None
id = uuid.uuid4()
bucket = ( bucket = (
started_cluster.minio_bucket started_cluster.minio_bucket
if not maybe_auth if not maybe_auth
@ -521,7 +524,7 @@ def test_multipart(started_cluster, maybe_auth, positive):
assert len(csv_data) > min_part_size_bytes assert len(csv_data) > min_part_size_bytes
filename = "test_multipart.csv" filename = f"{id}/test_multipart.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format( put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') format CSV".format(
started_cluster.minio_redirect_host, started_cluster.minio_redirect_host,
started_cluster.minio_redirect_port, started_cluster.minio_redirect_port,
@ -693,7 +696,7 @@ def test_s3_glob_many_objects_under_selection(started_cluster):
def create_files(thread_num): def create_files(thread_num):
for f_num in range(thread_num * 63, thread_num * 63 + 63): for f_num in range(thread_num * 63, thread_num * 63 + 63):
path = f"folder1/file{f_num}.csv" path = f"folder1/file{f_num}.csv"
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') settings s3_truncate_on_insert=1 values {}".format(
started_cluster.minio_ip, started_cluster.minio_ip,
MINIO_INTERNAL_PORT, MINIO_INTERNAL_PORT,
bucket, bucket,
@ -706,7 +709,7 @@ def test_s3_glob_many_objects_under_selection(started_cluster):
jobs.append(threading.Thread(target=create_files, args=(thread_num,))) jobs.append(threading.Thread(target=create_files, args=(thread_num,)))
jobs[-1].start() jobs[-1].start()
query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') values {}".format( query = "insert into table function s3('http://{}:{}/{}/{}', 'CSV', '{}') settings s3_truncate_on_insert=1 values {}".format(
started_cluster.minio_ip, started_cluster.minio_ip,
MINIO_INTERNAL_PORT, MINIO_INTERNAL_PORT,
bucket, bucket,
@ -881,7 +884,7 @@ def test_storage_s3_get_unstable(started_cluster):
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64" table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64"
get_query = f"SELECT count(), sum(column3), sum(column4) FROM s3('http://resolver:8081/{started_cluster.minio_bucket}/test.csv', 'CSV', '{table_format}') FORMAT CSV" get_query = f"SELECT count(), sum(column3), sum(column4) FROM s3('http://resolver:8081/{started_cluster.minio_bucket}/test.csv', 'CSV', '{table_format}') SETTINGS s3_max_single_read_retries=30 FORMAT CSV"
result = run_query(instance, get_query) result = run_query(instance, get_query)
assert result.splitlines() == ["500001,500000,0"] assert result.splitlines() == ["500001,500000,0"]
@ -896,9 +899,10 @@ def test_storage_s3_get_slow(started_cluster):
def test_storage_s3_put_uncompressed(started_cluster): def test_storage_s3_put_uncompressed(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
filename = "test_put_uncompressed.bin" filename = f"{id}/test_put_uncompressed.bin"
name = "test_put_uncompressed" name = "test_put_uncompressed"
data = [ data = [
"'Gloria Thompson',99", "'Gloria Thompson',99",
@ -950,6 +954,7 @@ def test_storage_s3_put_uncompressed(started_cluster):
r = result.strip().split("\t") r = result.strip().split("\t")
assert int(r[0]) >= 1, blob_storage_log assert int(r[0]) >= 1, blob_storage_log
assert all(col == r[0] for col in r), blob_storage_log assert all(col == r[0] for col in r), blob_storage_log
run_query(instance, f"DROP TABLE {name}")
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -957,9 +962,10 @@ def test_storage_s3_put_uncompressed(started_cluster):
[pytest.param("bin", "gzip", id="bin"), pytest.param("gz", "auto", id="gz")], [pytest.param("bin", "gzip", id="bin"), pytest.param("gz", "auto", id="gz")],
) )
def test_storage_s3_put_gzip(started_cluster, extension, method): def test_storage_s3_put_gzip(started_cluster, extension, method):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
filename = f"test_put_gzip.{extension}" filename = f"{id}/test_put_gzip.{extension}"
name = f"test_put_gzip_{extension}" name = f"test_put_gzip_{extension}"
data = [ data = [
"'Joseph Tomlinson',5", "'Joseph Tomlinson',5",
@ -996,6 +1002,7 @@ def test_storage_s3_put_gzip(started_cluster, extension, method):
f = gzip.GzipFile(fileobj=buf, mode="rb") f = gzip.GzipFile(fileobj=buf, mode="rb")
uncompressed_content = f.read().decode() uncompressed_content = f.read().decode()
assert sum([int(i.split(",")[1]) for i in uncompressed_content.splitlines()]) == 708 assert sum([int(i.split(",")[1]) for i in uncompressed_content.splitlines()]) == 708
run_query(instance, f"DROP TABLE {name}")
def test_truncate_table(started_cluster): def test_truncate_table(started_cluster):
@ -1021,14 +1028,24 @@ def test_truncate_table(started_cluster):
len(list(minio.list_objects(started_cluster.minio_bucket, "truncate/"))) len(list(minio.list_objects(started_cluster.minio_bucket, "truncate/")))
== 0 == 0
): ):
return break
timeout -= 1 timeout -= 1
time.sleep(1) time.sleep(1)
assert len(list(minio.list_objects(started_cluster.minio_bucket, "truncate/"))) == 0 assert len(list(minio.list_objects(started_cluster.minio_bucket, "truncate/"))) == 0
assert instance.query("SELECT * FROM {}".format(name)) == "" # FIXME: there was a bug in test and it was never checked.
# Currently read from truncated table fails with
# DB::Exception: Failed to get object info: No response body..
# HTTP response code: 404: while reading truncate: While executing S3Source
# assert instance.query("SELECT * FROM {}".format(name)) == ""
instance.query(f"DROP TABLE {name} SYNC")
assert (
instance.query(f"SELECT count() FROM system.tables where name='{name}'")
== "0\n"
)
def test_predefined_connection_configuration(started_cluster): def test_predefined_connection_configuration(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances[ instance = started_cluster.instances[
"dummy_without_named_collections" "dummy_without_named_collections"
@ -1056,7 +1073,9 @@ def test_predefined_connection_configuration(started_cluster):
user="user", user="user",
) )
instance.query(f"INSERT INTO {name} SELECT number FROM numbers(10)") instance.query(
f"INSERT INTO {name} SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1"
)
result = instance.query(f"SELECT * FROM {name}") result = instance.query(f"SELECT * FROM {name}")
assert result == instance.query("SELECT number FROM numbers(10)") assert result == instance.query("SELECT number FROM numbers(10)")
@ -1070,9 +1089,11 @@ def test_predefined_connection_configuration(started_cluster):
"To execute this query, it's necessary to have the grant NAMED COLLECTION ON no_collection" "To execute this query, it's necessary to have the grant NAMED COLLECTION ON no_collection"
in error in error
) )
instance = started_cluster.instances["dummy"] # has named collection access instance2 = started_cluster.instances["dummy"] # has named collection access
error = instance.query_and_get_error("SELECT * FROM s3(no_collection)") error = instance2.query_and_get_error("SELECT * FROM s3(no_collection)")
assert "There is no named collection `no_collection`" in error assert "There is no named collection `no_collection`" in error
instance.query("DROP USER user")
instance.query(f"DROP TABLE {name}")
result = "" result = ""
@ -1222,7 +1243,7 @@ def test_s3_schema_inference(started_cluster):
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
instance.query( instance.query(
f"insert into table function s3(s3_native, structure='a Int32, b String', format='Native') select number, randomString(100) from numbers(5000000)" f"insert into table function s3(s3_native, structure='a Int32, b String', format='Native') select number, randomString(100) from numbers(5000000) SETTINGS s3_truncate_on_insert=1"
) )
result = instance.query(f"desc s3(s3_native, format='Native')") result = instance.query(f"desc s3(s3_native, format='Native')")
assert result == "a\tInt32\t\t\t\t\t\nb\tString\t\t\t\t\t\n" assert result == "a\tInt32\t\t\t\t\t\nb\tString\t\t\t\t\t\n"
@ -1262,6 +1283,9 @@ def test_s3_schema_inference(started_cluster):
result = instance.query(f"select count(*) from {table_function}") result = instance.query(f"select count(*) from {table_function}")
assert int(result) == 5000000 assert int(result) == 5000000
instance.query("drop table schema_inference")
instance.query("drop table schema_inference_2")
def test_empty_file(started_cluster): def test_empty_file(started_cluster):
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
@ -1297,6 +1321,7 @@ def test_overwrite(started_cluster):
result = instance.query(f"select count() from test_overwrite") result = instance.query(f"select count() from test_overwrite")
assert int(result) == 200 assert int(result) == 200
instance.query(f"drop table test_overwrite")
def test_create_new_files_on_insert(started_cluster): def test_create_new_files_on_insert(started_cluster):
@ -1338,6 +1363,7 @@ def test_create_new_files_on_insert(started_cluster):
result = instance.query(f"select count() from test_multiple_inserts") result = instance.query(f"select count() from test_multiple_inserts")
assert int(result) == 60 assert int(result) == 60
instance.query("drop table test_multiple_inserts")
def test_format_detection(started_cluster): def test_format_detection(started_cluster):
@ -1345,7 +1371,9 @@ def test_format_detection(started_cluster):
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
instance.query(f"create table arrow_table_s3 (x UInt64) engine=S3(s3_arrow)") instance.query(f"create table arrow_table_s3 (x UInt64) engine=S3(s3_arrow)")
instance.query(f"insert into arrow_table_s3 select 1") instance.query(
f"insert into arrow_table_s3 select 1 settings s3_truncate_on_insert=1"
)
result = instance.query(f"select * from s3(s3_arrow)") result = instance.query(f"select * from s3(s3_arrow)")
assert int(result) == 1 assert int(result) == 1
@ -1360,7 +1388,9 @@ def test_format_detection(started_cluster):
assert int(result) == 1 assert int(result) == 1
instance.query(f"create table parquet_table_s3 (x UInt64) engine=S3(s3_parquet2)") instance.query(f"create table parquet_table_s3 (x UInt64) engine=S3(s3_parquet2)")
instance.query(f"insert into parquet_table_s3 select 1") instance.query(
f"insert into parquet_table_s3 select 1 settings s3_truncate_on_insert=1"
)
result = instance.query(f"select * from s3(s3_parquet2)") result = instance.query(f"select * from s3(s3_parquet2)")
assert int(result) == 1 assert int(result) == 1
@ -1373,64 +1403,67 @@ def test_format_detection(started_cluster):
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.parquet')" f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test.parquet')"
) )
assert int(result) == 1 assert int(result) == 1
instance.query(f"drop table arrow_table_s3")
instance.query(f"drop table parquet_table_s3")
def test_schema_inference_from_globs(started_cluster): def test_schema_inference_from_globs(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
instance.query( instance.query(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test1.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL" f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test1.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
) )
instance.query( instance.query(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test2.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select 0" f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test2.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select 0"
) )
url_filename = "test{1,2}.jsoncompacteachrow" url_filename = "test{1,2}.jsoncompacteachrow"
result = instance.query( result = instance.query(
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings input_format_json_infer_incomplete_types_as_strings=0" f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{url_filename}') settings input_format_json_infer_incomplete_types_as_strings=0"
) )
assert result.strip() == "c1\tNullable(Int64)" assert result.strip() == "c1\tNullable(Int64)"
result = instance.query( result = instance.query(
f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings input_format_json_infer_incomplete_types_as_strings=0" f"select * from url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{url_filename}') settings input_format_json_infer_incomplete_types_as_strings=0"
) )
assert sorted(result.split()) == ["0", "\\N"] assert sorted(result.split()) == ["0", "\\N"]
result = instance.query( result = instance.query(
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow') settings input_format_json_infer_incomplete_types_as_strings=0" f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test*.jsoncompacteachrow') settings input_format_json_infer_incomplete_types_as_strings=0"
) )
assert result.strip() == "c1\tNullable(Int64)" assert result.strip() == "c1\tNullable(Int64)"
result = instance.query( result = instance.query(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow') settings input_format_json_infer_incomplete_types_as_strings=0" f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test*.jsoncompacteachrow') settings input_format_json_infer_incomplete_types_as_strings=0"
) )
assert sorted(result.split()) == ["0", "\\N"] assert sorted(result.split()) == ["0", "\\N"]
instance.query( instance.query(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test3.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL" f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test3.jsoncompacteachrow', 'JSONCompactEachRow', 'x Nullable(UInt32)') select NULL"
) )
url_filename = "test{1,3}.jsoncompacteachrow" url_filename = "test{1,3}.jsoncompacteachrow"
result = instance.query_and_get_error( result = instance.query_and_get_error(
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings schema_inference_use_cache_for_s3=0, input_format_json_infer_incomplete_types_as_strings=0" f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{url_filename}') settings schema_inference_use_cache_for_s3=0, input_format_json_infer_incomplete_types_as_strings=0"
) )
assert "All attempts to extract table structure from files failed" in result assert "All attempts to extract table structure from files failed" in result
result = instance.query_and_get_error( result = instance.query_and_get_error(
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings schema_inference_use_cache_for_url=0, input_format_json_infer_incomplete_types_as_strings=0" f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{url_filename}') settings schema_inference_use_cache_for_url=0, input_format_json_infer_incomplete_types_as_strings=0"
) )
assert "All attempts to extract table structure from files failed" in result assert "All attempts to extract table structure from files failed" in result
instance.query( instance.query(
f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'" f"insert into table function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test0.jsoncompacteachrow', 'TSV', 'x String') select '[123;]'"
) )
result = instance.query_and_get_error( result = instance.query_and_get_error(
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test*.jsoncompacteachrow') settings schema_inference_use_cache_for_s3=0, input_format_json_infer_incomplete_types_as_strings=0" f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test*.jsoncompacteachrow') settings schema_inference_use_cache_for_s3=0, input_format_json_infer_incomplete_types_as_strings=0"
) )
assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in result assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in result
@ -1438,7 +1471,7 @@ def test_schema_inference_from_globs(started_cluster):
url_filename = "test{0,1,2,3}.jsoncompacteachrow" url_filename = "test{0,1,2,3}.jsoncompacteachrow"
result = instance.query_and_get_error( result = instance.query_and_get_error(
f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{url_filename}') settings schema_inference_use_cache_for_url=0, input_format_json_infer_incomplete_types_as_strings=0" f"desc url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{url_filename}') settings schema_inference_use_cache_for_url=0, input_format_json_infer_incomplete_types_as_strings=0"
) )
assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in result assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in result
@ -1498,9 +1531,12 @@ def test_signatures(started_cluster):
) )
assert "S3_ERROR" in error assert "S3_ERROR" in error
instance.query(f"drop table test_signatures")
def test_select_columns(started_cluster): def test_select_columns(started_cluster):
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
id = uuid.uuid4()
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
name = "test_table2" name = "test_table2"
structure = "id UInt32, value1 Int32, value2 Int32" structure = "id UInt32, value1 Int32, value2 Int32"
@ -1514,36 +1550,37 @@ def test_select_columns(started_cluster):
instance.query( instance.query(
f"INSERT INTO {name} SELECT * FROM generateRandom('{structure}') LIMIT {limit} SETTINGS s3_truncate_on_insert=1" f"INSERT INTO {name} SELECT * FROM generateRandom('{structure}') LIMIT {limit} SETTINGS s3_truncate_on_insert=1"
) )
instance.query(f"SELECT value2 FROM {name}") instance.query(f"SELECT value2, '{id}' FROM {name}")
instance.query("SYSTEM FLUSH LOGS") instance.query("SYSTEM FLUSH LOGS")
result1 = instance.query( result1 = instance.query(
f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE type='QueryFinish' and query LIKE 'SELECT value2 FROM {name}'" f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE type='QueryFinish' and query LIKE 'SELECT value2, ''{id}'' FROM {name}'"
) )
instance.query(f"SELECT * FROM {name}") instance.query(f"SELECT *, '{id}' FROM {name}")
instance.query("SYSTEM FLUSH LOGS") instance.query("SYSTEM FLUSH LOGS")
result2 = instance.query( result2 = instance.query(
f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE type='QueryFinish' and query LIKE 'SELECT * FROM {name}'" f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE type='QueryFinish' and query LIKE 'SELECT *, ''{id}'' FROM {name}'"
) )
assert round(int(result2) / int(result1)) == 3 assert round(int(result2) / int(result1)) == 3
def test_insert_select_schema_inference(started_cluster): def test_insert_select_schema_inference(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_insert_select.native') select toUInt64(1) as x" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test_insert_select.native') select toUInt64(1) as x"
) )
result = instance.query( result = instance.query(
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_insert_select.native')" f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test_insert_select.native')"
) )
assert result.strip() == "x\tUInt64" assert result.strip() == "x\tUInt64"
result = instance.query( result = instance.query(
f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_insert_select.native')" f"select * from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/test_insert_select.native')"
) )
assert int(result) == 1 assert int(result) == 1
@ -1553,7 +1590,7 @@ def test_parallel_reading_with_memory_limit(started_cluster):
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') select * from numbers(1000000)" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_memory_limit.native') select * from numbers(1000000) SETTINGS s3_truncate_on_insert=1"
) )
result = instance.query_and_get_error( result = instance.query_and_get_error(
@ -1574,7 +1611,7 @@ def test_wrong_format_usage(started_cluster):
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_wrong_format.native') select * from numbers(10e6)" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_wrong_format.native') select * from numbers(10e6) SETTINGS s3_truncate_on_insert=1"
) )
# size(test_wrong_format.native) = 10e6*8+16(header) ~= 76MiB # size(test_wrong_format.native) = 10e6*8+16(header) ~= 76MiB
@ -2097,11 +2134,11 @@ def test_read_subcolumns(started_cluster):
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.tsv', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3) SETTINGS s3_truncate_on_insert=1"
) )
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3)" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumns.jsonl', auto, 'a Tuple(b Tuple(c UInt32, d UInt32), e UInt32)') select ((1, 2), 3) SETTINGS s3_truncate_on_insert=1"
) )
res = instance.query( res = instance.query(
@ -2160,7 +2197,7 @@ def test_read_subcolumn_time(started_cluster):
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumn_time.tsv', auto, 'a UInt32') select (42)" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_subcolumn_time.tsv', auto, 'a UInt32') select (42) SETTINGS s3_truncate_on_insert=1"
) )
res = instance.query( res = instance.query(
@ -2171,29 +2208,30 @@ def test_read_subcolumn_time(started_cluster):
def test_filtering_by_file_or_path(started_cluster): def test_filtering_by_file_or_path(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] instance = started_cluster.instances["dummy"]
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_filter1.tsv', auto, 'x UInt64') select 1" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_filter1.tsv', auto, 'x UInt64') select 1 SETTINGS s3_truncate_on_insert=1"
) )
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_filter2.tsv', auto, 'x UInt64') select 2" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_filter2.tsv', auto, 'x UInt64') select 2 SETTINGS s3_truncate_on_insert=1"
) )
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_filter3.tsv', auto, 'x UInt64') select 3" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_filter3.tsv', auto, 'x UInt64') select 3 SETTINGS s3_truncate_on_insert=1"
) )
instance.query( instance.query(
f"select count() from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_filter*.tsv') where _file = 'test_filter1.tsv'" f"select count(), '{id}' from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_filter*.tsv') where _file = 'test_filter1.tsv'"
) )
instance.query("SYSTEM FLUSH LOGS") instance.query("SYSTEM FLUSH LOGS")
result = instance.query( result = instance.query(
f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query like '%select%s3%test_filter%' AND type='QueryFinish'" f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query like '%{id}%' AND type='QueryFinish'"
) )
assert int(result) == 1 assert int(result) == 1
@ -2206,54 +2244,56 @@ def test_filtering_by_file_or_path(started_cluster):
def test_union_schema_inference_mode(started_cluster): def test_union_schema_inference_mode(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket
instance = started_cluster.instances["s3_non_default"] instance = started_cluster.instances["s3_non_default"]
file_name_prefix = f"test_union_schema_inference_{id}_"
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference1.jsonl') select 1 as a" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name_prefix}1.jsonl') select 1 as a SETTINGS s3_truncate_on_insert=1"
) )
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference2.jsonl') select 2 as b" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name_prefix}2.jsonl') select 2 as b SETTINGS s3_truncate_on_insert=1"
) )
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference3.jsonl') select 2 as c" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name_prefix}3.jsonl') select 2 as c SETTINGS s3_truncate_on_insert=1"
) )
instance.query( instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference4.jsonl', TSV) select 'Error'" f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name_prefix}4.jsonl', TSV) select 'Error' SETTINGS s3_truncate_on_insert=1"
) )
for engine in ["s3", "url"]: for engine in ["s3", "url"]:
instance.query("system drop schema cache for s3") instance.query("system drop schema cache for s3")
result = instance.query( result = instance.query(
f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference{{1,2,3}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV" f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name_prefix}{{1,2,3}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
) )
assert result == "a\tNullable(Int64)\nb\tNullable(Int64)\nc\tNullable(Int64)\n" assert result == "a\tNullable(Int64)\nb\tNullable(Int64)\nc\tNullable(Int64)\n"
result = instance.query( result = instance.query(
"select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache where source like '%test_union_schema_inference%' order by file format TSV" f"select schema_inference_mode, splitByChar('/', source)[-1] as file, schema from system.schema_inference_cache where source like '%{file_name_prefix}%' order by file format TSV"
) )
assert ( assert (
result == "UNION\ttest_union_schema_inference1.jsonl\ta Nullable(Int64)\n" result == f"UNION\t{file_name_prefix}1.jsonl\ta Nullable(Int64)\n"
"UNION\ttest_union_schema_inference2.jsonl\tb Nullable(Int64)\n" f"UNION\t{file_name_prefix}2.jsonl\tb Nullable(Int64)\n"
"UNION\ttest_union_schema_inference3.jsonl\tc Nullable(Int64)\n" f"UNION\t{file_name_prefix}3.jsonl\tc Nullable(Int64)\n"
) )
result = instance.query( result = instance.query(
f"select * from {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference{{1,2,3}}.jsonl') order by tuple(*) settings schema_inference_mode='union', describe_compact_output=1 format TSV" f"select * from {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name_prefix}{{1,2,3}}.jsonl') order by tuple(*) settings schema_inference_mode='union', describe_compact_output=1 format TSV"
) )
assert result == "1\t\\N\t\\N\n" "\\N\t2\t\\N\n" "\\N\t\\N\t2\n" assert result == "1\t\\N\t\\N\n" "\\N\t2\t\\N\n" "\\N\t\\N\t2\n"
instance.query(f"system drop schema cache for {engine}") instance.query(f"system drop schema cache for {engine}")
result = instance.query( result = instance.query(
f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference2.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV" f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name_prefix}2.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
) )
assert result == "b\tNullable(Int64)\n" assert result == "b\tNullable(Int64)\n"
result = instance.query( result = instance.query(
f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference{{1,2,3}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV" f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name_prefix}{{1,2,3}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
) )
assert ( assert (
result == "a\tNullable(Int64)\n" result == "a\tNullable(Int64)\n"
@ -2262,7 +2302,7 @@ def test_union_schema_inference_mode(started_cluster):
) )
error = instance.query_and_get_error( error = instance.query_and_get_error(
f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_union_schema_inference{{1,2,3,4}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV" f"desc {engine}('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{file_name_prefix}{{1,2,3,4}}.jsonl') settings schema_inference_mode='union', describe_compact_output=1 format TSV"
) )
assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in error assert "CANNOT_EXTRACT_TABLE_STRUCTURE" in error