This commit is contained in:
Yatsishin Ilya 2024-07-22 15:47:53 +00:00
parent c15699e32d
commit 69be00cbf0

View File

@ -154,6 +154,7 @@ def test_put(started_cluster, maybe_auth, positive, compression):
def test_partition_by(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -161,26 +162,37 @@ def test_partition_by(started_cluster):
values = "(1, 2, 3), (3, 2, 1), (78, 43, 45)"
filename = "test_{_partition_id}.csv"
put_query = f"""INSERT INTO TABLE FUNCTION
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV', '{table_format}')
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{filename}', 'CSV', '{table_format}')
PARTITION BY {partition_by} VALUES {values}"""
run_query(instance, put_query)
assert "1,2,3\n" == get_s3_file_content(started_cluster, bucket, "test_3.csv")
assert "3,2,1\n" == get_s3_file_content(started_cluster, bucket, "test_1.csv")
assert "78,43,45\n" == get_s3_file_content(started_cluster, bucket, "test_45.csv")
assert "1,2,3\n" == get_s3_file_content(started_cluster, bucket, f"{id}/test_3.csv")
assert "3,2,1\n" == get_s3_file_content(started_cluster, bucket, f"{id}/test_1.csv")
assert "78,43,45\n" == get_s3_file_content(
started_cluster, bucket, f"{id}/test_45.csv"
)
filename = "test2_{_partition_id}.csv"
instance.query(
f"create table p ({table_format}) engine=S3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV') partition by column3"
f"create table p ({table_format}) engine=S3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{filename}', 'CSV') partition by column3"
)
instance.query(f"insert into p values {values}")
assert "1,2,3\n" == get_s3_file_content(started_cluster, bucket, "test2_3.csv")
assert "3,2,1\n" == get_s3_file_content(started_cluster, bucket, "test2_1.csv")
assert "78,43,45\n" == get_s3_file_content(started_cluster, bucket, "test2_45.csv")
assert "1,2,3\n" == get_s3_file_content(
started_cluster, bucket, f"{id}/test2_3.csv"
)
assert "3,2,1\n" == get_s3_file_content(
started_cluster, bucket, f"{id}/test2_1.csv"
)
assert "78,43,45\n" == get_s3_file_content(
started_cluster, bucket, f"{id}/test2_45.csv"
)
instance.query("drop table p")
def test_partition_by_string_column(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "col_num UInt32, col_str String"
@ -188,21 +200,20 @@ def test_partition_by_string_column(started_cluster):
values = "(1, 'foo/bar'), (3, 'йцук'), (78, '你好')"
filename = "test_{_partition_id}.csv"
put_query = f"""INSERT INTO TABLE FUNCTION
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV', '{table_format}')
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{filename}', 'CSV', '{table_format}')
PARTITION BY {partition_by} VALUES {values}"""
run_query(instance, put_query)
assert '1,"foo/bar"\n' == get_s3_file_content(
started_cluster, bucket, "test_foo/bar.csv"
)
assert '3,"йцук"\n' == get_s3_file_content(started_cluster, bucket, "test_йцук.csv")
assert '78,"你好"\n' == get_s3_file_content(
started_cluster, bucket, "test_你好.csv"
started_cluster, bucket, f"{id}/test_foo/bar.csv"
)
assert '3,"йцук"\n' == get_s3_file_content(started_cluster, bucket, f"{id}/test_йцук.csv")
assert '78,"你好"\n' == get_s3_file_content(started_cluster, bucket, f"{id}/test_你好.csv")
def test_partition_by_const_column(started_cluster):
id = uuid.uuid4()
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
@ -211,12 +222,14 @@ def test_partition_by_const_column(started_cluster):
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test_{_partition_id}.csv"
put_query = f"""INSERT INTO TABLE FUNCTION
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'CSV', '{table_format}')
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{id}/{filename}', 'CSV', '{table_format}')
PARTITION BY {partition_by} VALUES {values}"""
run_query(instance, put_query)
assert values_csv == get_s3_file_content(started_cluster, bucket, "test_88.csv")
assert values_csv == get_s3_file_content(
started_cluster, bucket, f"{id}/test_88.csv"
)
@pytest.mark.parametrize("special", ["space", "plus"])
@ -276,46 +289,31 @@ def test_get_path_with_special(started_cluster, special):
@pytest.mark.parametrize("auth", [pytest.param("'minio','minio123',", id="minio")])
def test_empty_put(started_cluster, auth):
# type: (ClickHouseCluster, str) -> None
id = uuid.uuid4()
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
drop_empty_table_query = "DROP TABLE IF EXISTS empty_table"
create_empty_table_query = """
CREATE TABLE empty_table (
{}
) ENGINE = Null()
""".format(
table_format
create_empty_table_query = (
f"CREATE TABLE empty_table ({table_format}) ENGINE = Null()"
)
run_query(instance, drop_empty_table_query)
run_query(instance, create_empty_table_query)
filename = "empty_put_test.csv"
put_query = "insert into table function s3('http://{}:{}/{}/{}', {}'CSV', '{}') select * from empty_table".format(
started_cluster.minio_ip,
MINIO_INTERNAL_PORT,
bucket,
filename,
auth,
table_format,
)
put_query = f"""insert into table function
s3('http://{started_cluster.minio_ip}:{MINIO_INTERNAL_PORT}/{bucket}/{id}/{filename}', {auth} 'CSV', '{table_format}')
select * from empty_table"""
run_query(instance, put_query)
assert (
run_query(
instance,
"select count(*) from s3('http://{}:{}/{}/{}', {}'CSV', '{}')".format(
started_cluster.minio_ip,
MINIO_INTERNAL_PORT,
bucket,
filename,
auth,
table_format,
),
f"""select count(*) from
s3('http://{started_cluster.minio_ip}:{MINIO_INTERNAL_PORT}/{bucket}/{id}/{filename}', {auth} 'CSV', '{table_format}')""",
)
== "0\n"
)
@ -881,7 +879,7 @@ def test_storage_s3_get_unstable(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_format = "column1 Int64, column2 Int64, column3 Int64, column4 Int64"
get_query = f"SELECT count(), sum(column3), sum(column4) FROM s3('http://resolver:8081/{started_cluster.minio_bucket}/test.csv', 'CSV', '{table_format}') FORMAT CSV"
get_query = f"SELECT count(), sum(column3), sum(column4) FROM s3('http://resolver:8081/{started_cluster.minio_bucket}/test.csv', 'CSV', '{table_format}') SETTINGS s3_max_single_read_retries=30 FORMAT CSV"
result = run_query(instance, get_query)
assert result.splitlines() == ["500001,500000,0"]