2020-09-28 23:30:41 +00:00
import gzip
2023-11-07 10:03:57 +00:00
import uuid
2019-09-19 09:34:33 +00:00
import logging
2020-09-16 04:26:10 +00:00
import os
2020-10-02 16:54:07 +00:00
import io
2020-01-27 21:44:18 +00:00
import random
2020-05-25 09:15:11 +00:00
import threading
2020-09-30 13:09:55 +00:00
import time
2019-09-19 09:34:33 +00:00
2020-09-16 04:26:10 +00:00
import helpers . client
2019-06-26 00:41:14 +00:00
import pytest
2022-07-07 20:19:15 +00:00
from helpers . cluster import ClickHouseCluster , ClickHouseInstance
2021-11-09 20:11:02 +00:00
from helpers . network import PartitionManager
2023-01-02 12:51:17 +00:00
from helpers . mock_servers import start_mock_servers
2021-12-22 10:57:08 +00:00
from helpers . test_tools import exec_query_with_retry
2023-03-07 15:04:21 +00:00
from helpers . s3_tools import prepare_s3_bucket
2019-09-19 09:34:33 +00:00
2021-02-20 14:59:39 +00:00
MINIO_INTERNAL_PORT = 9001
2019-09-19 09:34:33 +00:00
2021-03-04 15:56:55 +00:00
SCRIPT_DIR = os . path . dirname ( os . path . realpath ( __file__ ) )
2021-06-21 08:02:27 +00:00
2023-03-28 19:57:14 +00:00
2021-02-20 14:59:39 +00:00
def put_s3_file_content ( started_cluster , bucket , filename , data ) :
2020-10-02 16:54:07 +00:00
buf = io . BytesIO ( data )
2021-02-20 14:59:39 +00:00
started_cluster . minio_client . put_object ( bucket , filename , buf , len ( data ) )
2020-09-28 23:30:41 +00:00
2019-11-20 11:56:38 +00:00
# Returns content of given S3 file as string.
2021-02-20 14:59:39 +00:00
def get_s3_file_content ( started_cluster , bucket , filename , decode = True ) :
2021-06-02 15:08:16 +00:00
# type: (ClickHouseCluster, str, str, bool) -> str
2019-11-20 11:56:38 +00:00
2021-02-20 14:59:39 +00:00
data = started_cluster . minio_client . get_object ( bucket , filename )
2020-10-02 16:54:07 +00:00
data_str = b " "
2019-11-20 11:56:38 +00:00
for chunk in data . stream ( ) :
data_str + = chunk
2020-10-02 16:54:07 +00:00
if decode :
return data_str . decode ( )
2019-11-20 11:56:38 +00:00
return data_str
2019-09-19 09:34:33 +00:00
2019-06-26 00:41:14 +00:00
@pytest.fixture ( scope = " module " )
2021-02-20 14:59:39 +00:00
def started_cluster ( ) :
2019-06-26 00:41:14 +00:00
try :
cluster = ClickHouseCluster ( __file__ )
2020-06-01 17:16:09 +00:00
cluster . add_instance (
" restricted_dummy " ,
main_configs = [ " configs/config_for_test_remote_host_filter.xml " ] ,
2021-09-08 19:28:22 +00:00
with_minio = True ,
)
cluster . add_instance (
" dummy " ,
2020-06-01 17:16:09 +00:00
with_minio = True ,
2022-08-11 10:55:18 +00:00
main_configs = [
" configs/defaultS3.xml " ,
" configs/named_collections.xml " ,
" configs/schema_cache.xml " ,
2023-11-07 10:03:57 +00:00
" configs/blob_log.xml " ,
2022-08-11 10:55:18 +00:00
] ,
2023-10-18 15:30:59 +00:00
user_configs = [
" configs/access.xml " ,
" configs/users.xml " ,
" configs/s3_retry.xml " ,
] ,
2022-03-22 16:39:58 +00:00
)
2023-06-13 10:40:53 +00:00
cluster . add_instance (
" dummy_without_named_collections " ,
with_minio = True ,
main_configs = [
" configs/defaultS3.xml " ,
" configs/named_collections.xml " ,
" configs/schema_cache.xml " ,
] ,
user_configs = [ " configs/access.xml " ] ,
2022-03-22 16:39:58 +00:00
)
2021-03-04 15:56:55 +00:00
cluster . add_instance (
" s3_max_redirects " ,
with_minio = True ,
main_configs = [ " configs/defaultS3.xml " ] ,
2023-10-18 14:36:45 +00:00
user_configs = [ " configs/s3_max_redirects.xml " , " configs/s3_retry.xml " ] ,
2021-03-04 15:56:55 +00:00
)
2022-09-13 13:07:43 +00:00
cluster . add_instance (
" s3_non_default " ,
with_minio = True ,
)
2022-09-30 10:24:56 +00:00
cluster . add_instance (
" s3_with_environment_credentials " ,
with_minio = True ,
env_variables = {
" AWS_ACCESS_KEY_ID " : " minio " ,
" AWS_SECRET_ACCESS_KEY " : " minio123 " ,
} ,
main_configs = [ " configs/use_environment_credentials.xml " ] ,
)
2019-11-20 11:56:38 +00:00
logging . info ( " Starting cluster... " )
2019-06-26 00:41:14 +00:00
cluster . start ( )
2019-11-20 11:56:38 +00:00
logging . info ( " Cluster started " )
2019-09-19 09:34:33 +00:00
2019-11-20 11:56:38 +00:00
prepare_s3_bucket ( cluster )
logging . info ( " S3 bucket created " )
2021-04-12 08:55:54 +00:00
run_s3_mocks ( cluster )
2019-09-19 09:34:33 +00:00
2019-06-26 00:41:14 +00:00
yield cluster
finally :
cluster . shutdown ( )
2023-11-07 10:03:57 +00:00
def run_query ( instance , query , * args , * * kwargs ) :
2019-09-22 10:42:47 +00:00
logging . info ( " Running query ' {} ' ... " . format ( query ) )
2023-11-07 10:03:57 +00:00
result = instance . query ( query , * args , * * kwargs )
2019-09-22 10:42:47 +00:00
logging . info ( " Query finished " )
2019-06-26 00:41:14 +00:00
2019-11-20 11:56:38 +00:00
return result
2019-09-22 10:42:47 +00:00
2021-03-02 16:53:03 +00:00
# Test simple put. Also checks that wrong credentials produce an error with every compression method.
2021-03-02 02:43:19 +00:00
@pytest.mark.parametrize (
" maybe_auth,positive,compression " ,
[
2021-04-12 07:03:12 +00:00
pytest . param ( " " , True , " auto " , id = " positive " ) ,
pytest . param ( " ' minio ' , ' minio123 ' , " , True , " auto " , id = " auth_positive " ) ,
2021-04-29 14:26:41 +00:00
pytest . param ( " ' wrongid ' , ' wrongkey ' , " , False , " auto " , id = " auto " ) ,
pytest . param ( " ' wrongid ' , ' wrongkey ' , " , False , " gzip " , id = " gzip " ) ,
pytest . param ( " ' wrongid ' , ' wrongkey ' , " , False , " deflate " , id = " deflate " ) ,
pytest . param ( " ' wrongid ' , ' wrongkey ' , " , False , " brotli " , id = " brotli " ) ,
pytest . param ( " ' wrongid ' , ' wrongkey ' , " , False , " xz " , id = " xz " ) ,
pytest . param ( " ' wrongid ' , ' wrongkey ' , " , False , " zstd " , id = " zstd " ) ,
2019-12-01 11:24:55 +00:00
] ,
)
2022-08-05 16:20:15 +00:00
def test_put ( started_cluster , maybe_auth , positive , compression ) :
2023-11-07 10:03:57 +00:00
# type: (ClickHouseCluster, str, bool, str) -> None
2019-09-19 09:34:33 +00:00
2021-02-20 14:59:39 +00:00
bucket = (
started_cluster . minio_bucket
if not maybe_auth
else started_cluster . minio_restricted_bucket
2022-03-22 16:39:58 +00:00
)
2021-02-20 14:59:39 +00:00
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2019-11-20 11:56:38 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2019-09-22 10:42:47 +00:00
values = " (1, 2, 3), (3, 2, 1), (78, 43, 45) "
2019-11-20 11:56:38 +00:00
values_csv = " 1,2,3 \n 3,2,1 \n 78,43,45 \n "
filename = " test.csv "
2021-04-14 11:21:40 +00:00
put_query = f """ insert into table function s3( ' http:// { started_cluster . minio_ip } : { started_cluster . minio_port } / { bucket } / { filename } ' ,
2022-04-03 07:52:44 +00:00
{ maybe_auth } ' CSV ' , ' {table_format} ' , ' {compression} ' ) settings s3_truncate_on_insert = 1 values { values } """
2019-11-20 11:56:38 +00:00
2019-12-01 11:24:55 +00:00
try :
run_query ( instance , put_query )
except helpers . client . QueryRuntimeException :
2019-12-03 16:23:24 +00:00
if positive :
raise
2019-12-01 11:24:55 +00:00
else :
assert positive
2021-02-20 14:59:39 +00:00
assert values_csv == get_s3_file_content ( started_cluster , bucket , filename )
2019-11-20 11:56:38 +00:00
2022-08-05 16:20:15 +00:00
def test_partition_by ( started_cluster ) :
2024-07-22 15:47:53 +00:00
id = uuid . uuid4 ( )
2021-07-29 01:46:41 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2021-05-27 06:14:12 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2021-08-19 11:05:15 +00:00
partition_by = " column3 "
2021-05-27 06:14:12 +00:00
values = " (1, 2, 3), (3, 2, 1), (78, 43, 45) "
filename = " test_ {_partition_id} .csv "
2021-08-19 11:05:15 +00:00
put_query = f """ INSERT INTO TABLE FUNCTION
2024-07-22 15:47:53 +00:00
s3 ( ' http:// {started_cluster.minio_host} : {started_cluster.minio_port} / {bucket} / {id} / {filename} ' , ' CSV ' , ' {table_format} ' )
2021-08-19 11:05:15 +00:00
PARTITION BY { partition_by } VALUES { values } """
2021-05-27 06:14:12 +00:00
2021-05-31 08:46:28 +00:00
run_query ( instance , put_query )
2024-07-22 15:47:53 +00:00
assert " 1,2,3 \n " == get_s3_file_content ( started_cluster , bucket , f " { id } /test_3.csv " )
assert " 3,2,1 \n " == get_s3_file_content ( started_cluster , bucket , f " { id } /test_1.csv " )
assert " 78,43,45 \n " == get_s3_file_content (
started_cluster , bucket , f " { id } /test_45.csv "
)
2021-05-27 06:14:12 +00:00
2021-10-26 12:22:13 +00:00
filename = " test2_ {_partition_id} .csv "
instance . query (
2024-07-22 15:47:53 +00:00
f " create table p ( { table_format } ) engine=S3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } / { filename } ' , ' CSV ' ) partition by column3 "
2021-10-26 12:22:13 +00:00
)
instance . query ( f " insert into p values { values } " )
2024-07-22 15:47:53 +00:00
assert " 1,2,3 \n " == get_s3_file_content (
started_cluster , bucket , f " { id } /test2_3.csv "
)
assert " 3,2,1 \n " == get_s3_file_content (
started_cluster , bucket , f " { id } /test2_1.csv "
)
assert " 78,43,45 \n " == get_s3_file_content (
started_cluster , bucket , f " { id } /test2_45.csv "
)
instance . query ( " drop table p " )
2021-10-26 12:22:13 +00:00
2021-05-27 06:14:12 +00:00
2022-08-05 16:20:15 +00:00
def test_partition_by_string_column ( started_cluster ) :
2024-07-22 15:47:53 +00:00
id = uuid . uuid4 ( )
2021-08-19 11:05:15 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
table_format = " col_num UInt32, col_str String "
partition_by = " col_str "
2021-08-19 11:21:21 +00:00
values = " (1, ' foo/bar ' ), (3, ' йцук ' ), (78, ' 你好 ' ) "
2021-08-19 11:05:15 +00:00
filename = " test_ {_partition_id} .csv "
put_query = f """ INSERT INTO TABLE FUNCTION
2024-07-22 15:47:53 +00:00
s3 ( ' http:// {started_cluster.minio_host} : {started_cluster.minio_port} / {bucket} / {id} / {filename} ' , ' CSV ' , ' {table_format} ' )
2021-08-19 11:05:15 +00:00
PARTITION BY { partition_by } VALUES { values } """
run_query ( instance , put_query )
2021-08-19 11:21:21 +00:00
assert ' 1, " foo/bar " \n ' == get_s3_file_content (
2024-07-22 15:47:53 +00:00
started_cluster , bucket , f " { id } /test_foo/bar.csv "
2024-07-10 14:19:06 +00:00
)
2024-07-23 09:10:47 +00:00
assert ' 3, " йцук " \n ' == get_s3_file_content (
started_cluster , bucket , f " { id } /test_йцу к.csv "
)
assert ' 78, " 你好 " \n ' == get_s3_file_content (
started_cluster , bucket , f " { id } /test_你好.csv "
)
2021-08-19 11:05:15 +00:00
2022-08-05 16:20:15 +00:00
def test_partition_by_const_column ( started_cluster ) :
2024-07-22 15:47:53 +00:00
id = uuid . uuid4 ( )
2021-07-29 10:56:32 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
values = " (1, 2, 3), (3, 2, 1), (78, 43, 45) "
2021-08-19 11:05:15 +00:00
partition_by = " ' 88 ' "
2021-07-29 10:56:32 +00:00
values_csv = " 1,2,3 \n 3,2,1 \n 78,43,45 \n "
filename = " test_ {_partition_id} .csv "
2021-08-19 11:05:15 +00:00
put_query = f """ INSERT INTO TABLE FUNCTION
2024-07-22 15:47:53 +00:00
s3 ( ' http:// {started_cluster.minio_host} : {started_cluster.minio_port} / {bucket} / {id} / {filename} ' , ' CSV ' , ' {table_format} ' )
2021-08-19 11:05:15 +00:00
PARTITION BY { partition_by } VALUES { values } """
2021-07-29 10:56:32 +00:00
run_query ( instance , put_query )
2024-07-22 15:47:53 +00:00
assert values_csv == get_s3_file_content (
started_cluster , bucket , f " { id } /test_88.csv "
)
2021-07-29 10:56:32 +00:00
2021-05-04 06:25:33 +00:00
@pytest.mark.parametrize ( " special " , [ " space " , " plus " ] )
2022-08-05 16:20:15 +00:00
def test_get_file_with_special ( started_cluster , special ) :
2021-05-04 06:25:33 +00:00
symbol = { " space " : " " , " plus " : " + " } [ special ]
urlsafe_symbol = { " space " : " % 20 " , " plus " : " % 2B " } [ special ]
auth = " ' minio ' , ' minio123 ' , "
2021-05-12 07:03:53 +00:00
bucket = started_cluster . minio_restricted_bucket
instance = started_cluster . instances [ " dummy " ]
2021-05-04 06:25:33 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
values = [
[ 12549 , 2463 , 19893 ] ,
[ 64021 , 38652 , 66703 ] ,
[ 81611 , 39650 , 83516 ] ,
[ 11079 , 59507 , 61546 ] ,
[ 51764 , 69952 , 6876 ] ,
[ 41165 , 90293 , 29095 ] ,
[ 40167 , 78432 , 48309 ] ,
[ 81629 , 81327 , 11855 ] ,
[ 55852 , 21643 , 98507 ] ,
[ 6738 , 54643 , 41155 ] ,
]
values_csv = (
" \n " . join ( ( " , " . join ( map ( str , row ) ) for row in values ) ) + " \n "
) . encode ( )
filename = f " get_file_with_ { special } _ { symbol } two.csv "
2021-05-12 07:03:53 +00:00
put_s3_file_content ( started_cluster , bucket , filename , values_csv )
2021-05-04 06:25:33 +00:00
2021-05-12 07:03:53 +00:00
get_query = f " SELECT * FROM s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /get_file_with_ { special } _ { urlsafe_symbol } two.csv ' , { auth } ' CSV ' , ' { table_format } ' ) FORMAT TSV "
2021-05-04 06:25:33 +00:00
assert [
list ( map ( int , l . split ( ) ) ) for l in run_query ( instance , get_query ) . splitlines ( )
] == values
2021-05-12 07:03:53 +00:00
get_query = f " SELECT * FROM s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /get_file_with_ { special } *.csv ' , { auth } ' CSV ' , ' { table_format } ' ) FORMAT TSV "
2021-05-04 06:25:33 +00:00
assert [
list ( map ( int , l . split ( ) ) ) for l in run_query ( instance , get_query ) . splitlines ( )
] == values
2021-05-12 07:03:53 +00:00
get_query = f " SELECT * FROM s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /get_file_with_ { special } _ { urlsafe_symbol } *.csv ' , { auth } ' CSV ' , ' { table_format } ' ) FORMAT TSV "
2021-05-04 06:25:33 +00:00
assert [
list ( map ( int , l . split ( ) ) ) for l in run_query ( instance , get_query ) . splitlines ( )
] == values
@pytest.mark.parametrize ( " special " , [ " space " , " plus " , " plus2 " ] )
2022-08-05 16:20:15 +00:00
def test_get_path_with_special ( started_cluster , special ) :
2021-05-04 06:25:33 +00:00
symbol = { " space " : " % 20 " , " plus " : " % 2B " , " plus2 " : " % 2B " } [ special ]
safe_symbol = { " space " : " % 20 " , " plus " : " + " , " plus2 " : " % 2B " } [ special ]
auth = " ' minio ' , ' minio123 ' , "
table_format = " column1 String "
2021-05-12 07:03:53 +00:00
instance = started_cluster . instances [ " dummy " ]
2021-05-04 06:25:33 +00:00
get_query = f " SELECT * FROM s3( ' http://resolver:8082/get-my-path/ { safe_symbol } .csv ' , { auth } ' CSV ' , ' { table_format } ' ) FORMAT TSV "
assert run_query ( instance , get_query ) . splitlines ( ) == [ f " / { symbol } .csv " ]
2020-11-11 12:15:16 +00:00
# Test put no data to S3.
2021-04-13 10:52:22 +00:00
@pytest.mark.parametrize ( " auth " , [ pytest . param ( " ' minio ' , ' minio123 ' , " , id = " minio " ) ] )
2022-08-05 16:20:15 +00:00
def test_empty_put ( started_cluster , auth ) :
2021-06-02 15:08:16 +00:00
# type: (ClickHouseCluster, str) -> None
2024-07-22 15:47:53 +00:00
id = uuid . uuid4 ( )
2021-02-20 14:59:39 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2020-11-11 12:15:16 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2021-07-15 12:49:25 +00:00
drop_empty_table_query = " DROP TABLE IF EXISTS empty_table "
2024-07-22 15:47:53 +00:00
create_empty_table_query = (
f " CREATE TABLE empty_table ( { table_format } ) ENGINE = Null() "
2020-11-11 12:15:16 +00:00
)
2021-07-15 12:49:25 +00:00
run_query ( instance , drop_empty_table_query )
2020-11-11 12:15:16 +00:00
run_query ( instance , create_empty_table_query )
filename = " empty_put_test.csv "
2024-07-22 15:47:53 +00:00
put_query = f """ insert into table function
s3 ( ' http:// {started_cluster.minio_ip} : {MINIO_INTERNAL_PORT} / {bucket} / {id} / {filename} ' , { auth } ' CSV ' , ' {table_format} ' )
select * from empty_table """
2020-11-11 12:15:16 +00:00
run_query ( instance , put_query )
2022-06-15 15:02:02 +00:00
assert (
2020-11-11 12:15:16 +00:00
run_query (
instance ,
2024-07-22 15:47:53 +00:00
f """ select count(*) from
s3 ( ' http:// {started_cluster.minio_ip} : {MINIO_INTERNAL_PORT} / {bucket} / {id} / {filename} ' , { auth } ' CSV ' , ' {table_format} ' ) """ ,
2021-04-14 11:21:40 +00:00
)
2022-06-15 15:02:02 +00:00
== " 0 \n "
)
2020-11-11 12:15:16 +00:00
2019-11-20 11:56:38 +00:00
# Test put values in CSV format.
2019-12-01 11:24:55 +00:00
@pytest.mark.parametrize (
" maybe_auth,positive " ,
[
2021-04-12 07:03:12 +00:00
pytest . param ( " " , True , id = " positive " ) ,
pytest . param ( " ' minio ' , ' minio123 ' , " , True , id = " auth_positive " ) ,
pytest . param ( " ' wrongid ' , ' wrongkey ' , " , False , id = " negative " ) ,
2019-12-01 11:24:55 +00:00
] ,
)
2022-08-05 16:20:15 +00:00
def test_put_csv ( started_cluster , maybe_auth , positive ) :
2021-06-02 15:08:16 +00:00
# type: (ClickHouseCluster, bool, str) -> None
2019-11-20 11:56:38 +00:00
2021-02-20 14:59:39 +00:00
bucket = (
started_cluster . minio_bucket
if not maybe_auth
else started_cluster . minio_restricted_bucket
2022-03-22 16:39:58 +00:00
)
2021-02-20 14:59:39 +00:00
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2019-11-20 11:56:38 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
filename = " test.csv "
2022-04-03 07:52:44 +00:00
put_query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , {} ' CSV ' , ' {} ' ) settings s3_truncate_on_insert=1 format CSV " . format (
2021-04-14 11:21:40 +00:00
started_cluster . minio_ip ,
MINIO_INTERNAL_PORT ,
bucket ,
filename ,
maybe_auth ,
table_format ,
)
2019-09-22 10:42:47 +00:00
csv_data = " 8,9,16 \n 11,18,13 \n 22,14,2 \n "
2019-11-20 11:56:38 +00:00
2019-12-01 11:24:55 +00:00
try :
run_query ( instance , put_query , stdin = csv_data )
except helpers . client . QueryRuntimeException :
2019-12-03 16:23:24 +00:00
if positive :
raise
2019-12-01 11:24:55 +00:00
else :
assert positive
2021-02-20 14:59:39 +00:00
assert csv_data == get_s3_file_content ( started_cluster , bucket , filename )
2019-11-20 11:56:38 +00:00
# Test put and get with S3 server redirect.
2022-08-05 16:20:15 +00:00
def test_put_get_with_redirect ( started_cluster ) :
2019-11-20 11:56:38 +00:00
# type: (ClickHouseCluster) -> None
2021-02-20 14:59:39 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2019-11-20 11:56:38 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
values = " (1, 1, 1), (1, 1, 1), (11, 11, 11) "
values_csv = " 1,1,1 \n 1,1,1 \n 11,11,11 \n "
filename = " test.csv "
2022-04-03 07:52:44 +00:00
query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) settings s3_truncate_on_insert=1 values {} " . format (
2021-02-20 14:59:39 +00:00
started_cluster . minio_redirect_host ,
started_cluster . minio_redirect_port ,
bucket ,
filename ,
table_format ,
values ,
)
2019-09-19 09:34:33 +00:00
run_query ( instance , query )
2021-02-20 14:59:39 +00:00
assert values_csv == get_s3_file_content ( started_cluster , bucket , filename )
2019-11-20 11:56:38 +00:00
query = " select *, column1*column2*column3 from s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) " . format (
2021-02-20 14:59:39 +00:00
started_cluster . minio_redirect_host ,
started_cluster . minio_redirect_port ,
bucket ,
filename ,
table_format ,
)
2019-09-19 09:34:33 +00:00
stdout = run_query ( instance , query )
2019-11-20 11:56:38 +00:00
2019-09-19 09:34:33 +00:00
assert list ( map ( str . split , stdout . splitlines ( ) ) ) == [
2019-09-22 10:42:47 +00:00
[ " 1 " , " 1 " , " 1 " , " 1 " ] ,
[ " 1 " , " 1 " , " 1 " , " 1 " ] ,
[ " 11 " , " 11 " , " 11 " , " 1331 " ] ,
2019-06-26 00:41:14 +00:00
]
2019-11-20 11:56:38 +00:00
2020-11-23 11:02:17 +00:00
# Test put with restricted S3 server redirect.
2022-08-05 16:20:15 +00:00
def test_put_with_zero_redirect ( started_cluster ) :
2021-06-21 16:07:17 +00:00
# type: (ClickHouseCluster) -> None
2020-11-20 08:18:44 +00:00
2021-02-20 14:59:39 +00:00
bucket = started_cluster . minio_bucket
2021-06-21 16:07:17 +00:00
instance = started_cluster . instances [ " s3_max_redirects " ] # type: ClickHouseInstance
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2020-11-20 08:18:44 +00:00
values = " (1, 1, 1), (1, 1, 1), (11, 11, 11) "
filename = " test.csv "
2021-06-21 16:07:17 +00:00
# Should work without redirect
2022-04-03 07:52:44 +00:00
query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) settings s3_truncate_on_insert=1 values {} " . format (
2021-06-21 16:07:17 +00:00
started_cluster . minio_ip ,
MINIO_INTERNAL_PORT ,
bucket ,
filename ,
table_format ,
values ,
)
2020-11-20 08:18:44 +00:00
run_query ( instance , query )
2021-06-21 16:07:17 +00:00
# Should not work with redirect
2022-04-03 07:52:44 +00:00
query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) settings s3_truncate_on_insert=1 values {} " . format (
2021-02-20 14:59:39 +00:00
started_cluster . minio_redirect_host ,
started_cluster . minio_redirect_port ,
bucket ,
filename ,
table_format ,
values ,
)
2021-06-21 16:07:17 +00:00
exception_raised = False
2020-11-20 08:18:44 +00:00
try :
run_query ( instance , query )
2021-06-21 16:07:17 +00:00
except Exception as e :
assert str ( e ) . find ( " Too many redirects while trying to access " ) != - 1
exception_raised = True
2020-11-20 08:18:44 +00:00
finally :
assert exception_raised
2022-08-05 16:20:15 +00:00
def test_put_get_with_globs ( started_cluster ) :
2020-01-27 21:44:18 +00:00
# type: (ClickHouseCluster) -> None
2021-07-15 12:49:25 +00:00
unique_prefix = random . randint ( 1 , 10000 )
2021-02-20 14:59:39 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2020-01-27 21:44:18 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2020-01-27 22:09:21 +00:00
max_path = " "
2020-01-27 21:44:18 +00:00
for i in range ( 10 ) :
for j in range ( 10 ) :
2021-07-15 12:49:25 +00:00
path = " {} / {} _ {} / {} .csv " . format (
unique_prefix , i , random . choice ( [ " a " , " b " , " c " , " d " ] ) , j
)
2020-01-27 22:09:21 +00:00
max_path = max ( path , max_path )
2020-06-01 17:16:09 +00:00
values = " ( {} , {} , {} ) " . format ( i , j , i + j )
2020-01-27 21:44:18 +00:00
query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) values {} " . format (
2021-04-14 11:21:40 +00:00
started_cluster . minio_ip ,
MINIO_INTERNAL_PORT ,
bucket ,
path ,
table_format ,
values ,
)
2020-01-27 21:44:18 +00:00
run_query ( instance , query )
2021-07-15 12:49:25 +00:00
query = " select sum(column1), sum(column2), sum(column3), min(_file), max(_path) from s3( ' http:// {} : {} / {} / {} /*_ {{ a,b,c,d}}/ %3f .csv ' , ' CSV ' , ' {} ' ) " . format (
started_cluster . minio_redirect_host ,
started_cluster . minio_redirect_port ,
bucket ,
unique_prefix ,
table_format ,
)
2020-06-01 17:16:09 +00:00
assert run_query ( instance , query ) . splitlines ( ) == [
" 450 \t 450 \t 900 \t 0.csv \t {bucket} / {max_path} " . format (
bucket = bucket , max_path = max_path
2022-03-22 16:39:58 +00:00
)
2020-06-01 17:16:09 +00:00
]
2020-01-27 21:44:18 +00:00
2021-08-30 10:31:46 +00:00
minio = started_cluster . minio_client
for obj in list (
minio . list_objects (
started_cluster . minio_bucket ,
prefix = " {} / " . format ( unique_prefix ) ,
recursive = True ,
2022-03-22 16:39:58 +00:00
)
2021-08-30 10:31:46 +00:00
) :
minio . remove_object ( started_cluster . minio_bucket , obj . object_name )
2020-01-27 21:44:18 +00:00
2019-11-20 11:56:38 +00:00
# Test multipart put.
2019-12-01 11:24:55 +00:00
@pytest.mark.parametrize (
" maybe_auth,positive " ,
[
2021-04-12 07:03:12 +00:00
pytest . param ( " " , True , id = " positive " ) ,
pytest . param ( " ' wrongid ' , ' wrongkey ' " , False , id = " negative " ) ,
2019-12-03 16:23:24 +00:00
# ("'minio','minio123',",True), Redirect with credentials not working with nginx.
2019-12-01 11:24:55 +00:00
] ,
)
2022-08-05 16:20:15 +00:00
def test_multipart ( started_cluster , maybe_auth , positive ) :
2023-11-07 10:03:57 +00:00
# type: (ClickHouseCluster, str, bool) -> None
2019-11-20 11:56:38 +00:00
2024-07-22 16:12:11 +00:00
id = uuid . uuid4 ( )
2021-02-20 14:59:39 +00:00
bucket = (
started_cluster . minio_bucket
if not maybe_auth
else started_cluster . minio_restricted_bucket
2022-03-22 16:39:58 +00:00
)
2021-02-20 14:59:39 +00:00
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2019-11-20 11:56:38 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
# Minimum size of part is 5 Mb for Minio.
# See: https://github.com/minio/minio/blob/master/docs/minio-limits.md
2019-11-21 13:13:38 +00:00
min_part_size_bytes = 5 * 1024 * 1024
csv_size_bytes = int ( min_part_size_bytes * 1.5 ) # To have 2 parts.
one_line_length = 6 # 3 digits, 2 commas, 1 line separator.
2022-03-24 09:30:06 +00:00
total_rows = csv_size_bytes / / one_line_length
2019-11-21 13:13:38 +00:00
# Generate data having size more than one part
2022-03-24 09:30:06 +00:00
int_data = [ [ 1 , 2 , 3 ] for i in range ( total_rows ) ]
2019-11-21 13:13:38 +00:00
csv_data = " " . join ( [ " {} , {} , {} \n " . format ( x , y , z ) for x , y , z in int_data ] )
2019-11-20 11:56:38 +00:00
2019-11-21 13:13:38 +00:00
assert len ( csv_data ) > min_part_size_bytes
2024-07-22 16:12:11 +00:00
filename = f " { id } /test_multipart.csv "
2019-12-01 11:24:55 +00:00
put_query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , {} ' CSV ' , ' {} ' ) format CSV " . format (
2021-02-20 14:59:39 +00:00
started_cluster . minio_redirect_host ,
started_cluster . minio_redirect_port ,
bucket ,
filename ,
maybe_auth ,
table_format ,
)
2023-11-07 10:03:57 +00:00
put_query_id = uuid . uuid4 ( ) . hex
2019-12-01 11:24:55 +00:00
try :
2020-12-09 14:09:04 +00:00
run_query (
instance ,
put_query ,
stdin = csv_data ,
settings = {
" s3_min_upload_part_size " : min_part_size_bytes ,
" s3_max_single_part_upload_size " : 0 ,
2022-03-22 16:39:58 +00:00
} ,
2023-11-07 10:03:57 +00:00
query_id = put_query_id ,
2020-12-09 14:09:04 +00:00
)
2019-12-01 11:24:55 +00:00
except helpers . client . QueryRuntimeException :
2019-12-03 16:23:24 +00:00
if positive :
raise
2019-12-01 11:24:55 +00:00
else :
assert positive
2020-07-10 19:42:18 +00:00
# Use proxy access logs to count number of parts uploaded to Minio.
2021-02-20 14:59:39 +00:00
proxy_logs = started_cluster . get_container_logs ( " proxy1 " ) # type: str
2020-07-10 19:42:18 +00:00
assert proxy_logs . count ( " PUT / {} / {} " . format ( bucket , filename ) ) > = 2
2019-12-01 11:24:55 +00:00
2021-02-20 14:59:39 +00:00
assert csv_data == get_s3_file_content ( started_cluster , bucket , filename )
2019-11-06 17:06:50 +00:00
2022-03-24 09:30:06 +00:00
# select uploaded data from many threads
select_query = (
" select sum(column1), sum(column2), sum(column3) "
" from s3( ' http:// {host} : {port} / {bucket} / {filename} ' , {auth} ' CSV ' , ' {table_format} ' ) " . format (
host = started_cluster . minio_redirect_host ,
port = started_cluster . minio_redirect_port ,
bucket = bucket ,
filename = filename ,
auth = maybe_auth ,
table_format = table_format ,
)
)
try :
select_result = run_query (
instance ,
select_query ,
settings = {
" max_download_threads " : random . randint ( 4 , 16 ) ,
" max_download_buffer_size " : 1024 * 1024 ,
} ,
)
except helpers . client . QueryRuntimeException :
if positive :
raise
else :
assert positive
assert (
select_result
== " \t " . join ( map ( str , [ total_rows , total_rows * 2 , total_rows * 3 ] ) ) + " \n "
)
2023-11-07 10:03:57 +00:00
if positive :
instance . query ( " SYSTEM FLUSH LOGS " )
blob_storage_log = instance . query ( f " SELECT * FROM system.blob_storage_log " )
result = instance . query (
f """ SELECT
countIf ( event_type == ' MultiPartUploadCreate ' ) ,
countIf ( event_type == ' MultiPartUploadWrite ' ) ,
countIf ( event_type == ' MultiPartUploadComplete ' ) ,
count ( )
FROM system . blob_storage_log WHERE query_id = ' {put_query_id} ' """
)
r = result . strip ( ) . split ( " \t " )
assert int ( r [ 0 ] ) == 1 , blob_storage_log
assert int ( r [ 1 ] ) > = 1 , blob_storage_log
assert int ( r [ 2 ] ) == 1 , blob_storage_log
assert int ( r [ 0 ] ) + int ( r [ 1 ] ) + int ( r [ 2 ] ) == int ( r [ 3 ] ) , blob_storage_log
2019-12-03 17:36:02 +00:00
2022-08-05 16:20:15 +00:00
def test_remote_host_filter ( started_cluster ) :
2021-02-20 14:59:39 +00:00
instance = started_cluster . instances [ " restricted_dummy " ]
2019-11-06 17:06:50 +00:00
format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2019-12-09 12:05:16 +00:00
query = " select *, column1*column2*column3 from s3( ' http:// {} : {} / {} /test.csv ' , ' CSV ' , ' {} ' ) " . format (
2021-02-20 14:59:39 +00:00
" invalid_host " , MINIO_INTERNAL_PORT , started_cluster . minio_bucket , format
)
2021-10-29 22:29:36 +00:00
assert " not allowed in configuration file " in instance . query_and_get_error ( query )
2019-11-06 17:06:50 +00:00
other_values = " (1, 1, 1), (1, 1, 1), (11, 11, 11) "
2019-12-09 12:05:16 +00:00
query = " insert into table function s3( ' http:// {} : {} / {} /test.csv ' , ' CSV ' , ' {} ' ) values {} " . format (
2021-02-20 14:59:39 +00:00
" invalid_host " ,
MINIO_INTERNAL_PORT ,
started_cluster . minio_bucket ,
format ,
other_values ,
)
2021-10-29 22:29:36 +00:00
assert " not allowed in configuration file " in instance . query_and_get_error ( query )
2019-12-10 16:11:13 +00:00
2022-08-05 16:20:15 +00:00
def test_wrong_s3_syntax ( started_cluster ) :
2021-02-20 14:59:39 +00:00
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2019-12-10 16:11:13 +00:00
expected_err_msg = " Code: 42 " # NUMBER_OF_ARGUMENTS_DOESNT_MATCH
2023-12-18 14:31:01 +00:00
query = " create table test_table_s3_syntax (id UInt32) ENGINE = S3( ' ' , ' ' , ' ' , ' ' , ' ' , ' ' , ' ' ) "
2022-01-14 13:27:57 +00:00
assert expected_err_msg in instance . query_and_get_error ( query )
expected_err_msg = " Code: 36 " # BAD_ARGUMENTS
query = " create table test_table_s3_syntax (id UInt32) ENGINE = S3( ' ' ) "
2019-12-10 16:11:13 +00:00
assert expected_err_msg in instance . query_and_get_error ( query )
2020-05-25 09:15:11 +00:00
2020-05-25 21:05:15 +00:00
# https://en.wikipedia.org/wiki/One_Thousand_and_One_Nights
2022-08-05 16:20:15 +00:00
def test_s3_glob_scheherazade ( started_cluster ) :
2021-02-20 14:59:39 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2020-05-25 09:15:11 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
max_path = " "
values = " (1, 1, 1) "
nights_per_job = 1001 / / 30
jobs = [ ]
for night in range ( 0 , 1001 , nights_per_job ) :
2022-03-22 16:39:58 +00:00
2020-05-25 09:15:11 +00:00
def add_tales ( start , end ) :
for i in range ( start , end ) :
path = " night_ {} /tale.csv " . format ( i )
query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) values {} " . format (
2021-04-14 11:21:40 +00:00
started_cluster . minio_ip ,
MINIO_INTERNAL_PORT ,
bucket ,
path ,
table_format ,
values ,
)
2020-05-25 09:15:11 +00:00
run_query ( instance , query )
2020-09-16 04:26:10 +00:00
jobs . append (
threading . Thread (
target = add_tales , args = ( night , min ( night + nights_per_job , 1001 ) )
2022-03-22 16:39:58 +00:00
)
)
2020-05-25 09:15:11 +00:00
jobs [ - 1 ] . start ( )
for job in jobs :
job . join ( )
query = " select count(), sum(column1), sum(column2), sum(column3) from s3( ' http:// {} : {} / {} /night_*/tale.csv ' , ' CSV ' , ' {} ' ) " . format (
2021-02-20 14:59:39 +00:00
started_cluster . minio_redirect_host ,
started_cluster . minio_redirect_port ,
bucket ,
table_format ,
)
2020-05-25 09:15:11 +00:00
assert run_query ( instance , query ) . splitlines ( ) == [ " 1001 \t 1001 \t 1001 \t 1001 " ]
2020-06-01 17:16:09 +00:00
2024-04-10 12:02:01 +00:00
# a bit simplified version of scheherazade test
# checks e.g. `prefix{1,2}/file*.csv`, where there are more than 1000 files under prefix1.
2024-04-09 21:01:01 +00:00
def test_s3_glob_many_objects_under_selection ( started_cluster ) :
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
values = " (1, 1, 1) "
jobs = [ ]
2024-04-10 12:02:01 +00:00
for thread_num in range ( 16 ) :
2024-04-09 21:01:01 +00:00
2024-04-10 12:02:01 +00:00
def create_files ( thread_num ) :
for f_num in range ( thread_num * 63 , thread_num * 63 + 63 ) :
path = f " folder1/file { f_num } .csv "
2024-07-22 16:12:11 +00:00
query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) settings s3_truncate_on_insert=1 values {} " . format (
2024-04-09 21:01:01 +00:00
started_cluster . minio_ip ,
MINIO_INTERNAL_PORT ,
bucket ,
path ,
table_format ,
values ,
)
run_query ( instance , query )
2024-04-10 12:02:01 +00:00
jobs . append ( threading . Thread ( target = create_files , args = ( thread_num , ) ) )
2024-04-09 21:01:01 +00:00
jobs [ - 1 ] . start ( )
2024-07-22 16:12:11 +00:00
query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) settings s3_truncate_on_insert=1 values {} " . format (
2024-04-10 12:02:01 +00:00
started_cluster . minio_ip ,
MINIO_INTERNAL_PORT ,
bucket ,
f " folder2/file0.csv " ,
table_format ,
values ,
)
run_query ( instance , query )
2024-04-09 21:01:01 +00:00
for job in jobs :
job . join ( )
query = " select count(), sum(column1), sum(column2), sum(column3) from s3( ' http:// {} : {} / {} /folder {{ 1,2}}/file*.csv ' , ' CSV ' , ' {} ' ) " . format (
started_cluster . minio_redirect_host ,
started_cluster . minio_redirect_port ,
bucket ,
table_format ,
)
2024-04-10 12:02:01 +00:00
assert run_query ( instance , query ) . splitlines ( ) == [ " 1009 \t 1009 \t 1009 \t 1009 " ]
2024-04-09 21:01:01 +00:00
2021-04-27 15:34:33 +00:00
def run_s3_mocks ( started_cluster ) :
2023-01-02 12:51:17 +00:00
script_dir = os . path . join ( os . path . dirname ( __file__ ) , " s3_mocks " )
start_mock_servers (
started_cluster ,
script_dir ,
[
( " mock_s3.py " , " resolver " , " 8080 " ) ,
( " unstable_server.py " , " resolver " , " 8081 " ) ,
( " echo.py " , " resolver " , " 8082 " ) ,
2023-01-20 19:10:23 +00:00
( " no_list_objects.py " , " resolver " , " 8083 " ) ,
2023-01-02 12:51:17 +00:00
] ,
2021-04-12 08:55:54 +00:00
)
2020-06-01 17:16:09 +00:00
2022-07-07 20:19:15 +00:00
def replace_config ( path , old , new ) :
config = open ( path , " r " )
2021-03-04 15:56:55 +00:00
config_lines = config . readlines ( )
config . close ( )
config_lines = [ line . replace ( old , new ) for line in config_lines ]
2022-07-07 20:19:15 +00:00
config = open ( path , " w " )
2021-03-04 15:56:55 +00:00
config . writelines ( config_lines )
config . close ( )
2022-08-05 16:20:15 +00:00
def test_custom_auth_headers ( started_cluster ) :
2022-07-07 20:19:15 +00:00
config_path = os . path . join (
SCRIPT_DIR ,
" ./ {} /dummy/configs/config.d/defaultS3.xml " . format (
started_cluster . instances_dir_name
) ,
)
2020-06-01 17:16:09 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
filename = " test.csv "
get_query = " select * from s3( ' http://resolver:8080/ {bucket} / {file} ' , ' CSV ' , ' {table_format} ' ) " . format (
2021-02-20 14:59:39 +00:00
bucket = started_cluster . minio_restricted_bucket ,
2020-06-01 17:16:09 +00:00
file = filename ,
table_format = table_format ,
)
2021-02-20 14:59:39 +00:00
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2020-06-01 17:16:09 +00:00
result = run_query ( instance , get_query )
assert result == " 1 \t 2 \t 3 \n "
2020-07-07 13:20:48 +00:00
2021-07-15 12:49:25 +00:00
instance . query ( " DROP TABLE IF EXISTS test " )
2021-03-04 15:56:55 +00:00
instance . query (
" CREATE TABLE test ( {table_format} ) ENGINE = S3( ' http://resolver:8080/ {bucket} / {file} ' , ' CSV ' ) " . format (
2021-03-26 18:46:42 +00:00
bucket = started_cluster . minio_restricted_bucket ,
2021-03-04 15:56:55 +00:00
file = filename ,
table_format = table_format ,
)
2022-03-22 16:39:58 +00:00
)
2021-03-04 15:56:55 +00:00
assert run_query ( instance , " SELECT * FROM test " ) == " 1 \t 2 \t 3 \n "
replace_config (
2022-07-07 20:19:15 +00:00
config_path ,
2021-03-04 15:56:55 +00:00
" <header>Authorization: Bearer TOKEN " ,
" <header>Authorization: Bearer INVALID_TOKEN " ,
)
instance . query ( " SYSTEM RELOAD CONFIG " )
ret , err = instance . query_and_get_answer_with_error ( " SELECT * FROM test " )
assert ret == " " and err != " "
replace_config (
2022-07-07 20:19:15 +00:00
config_path ,
2021-03-04 15:56:55 +00:00
" <header>Authorization: Bearer INVALID_TOKEN " ,
" <header>Authorization: Bearer TOKEN " ,
)
instance . query ( " SYSTEM RELOAD CONFIG " )
assert run_query ( instance , " SELECT * FROM test " ) == " 1 \t 2 \t 3 \n "
2021-07-15 12:49:25 +00:00
instance . query ( " DROP TABLE test " )
2021-03-04 15:56:55 +00:00
2020-07-07 13:20:48 +00:00
2022-08-05 16:20:15 +00:00
def test_custom_auth_headers_exclusion ( started_cluster ) :
2021-01-07 03:42:39 +00:00
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
filename = " test.csv "
2021-02-20 14:59:39 +00:00
get_query = f " SELECT * FROM s3( ' http://resolver:8080/ { started_cluster . minio_restricted_bucket } /restricteddirectory/ { filename } ' , ' CSV ' , ' { table_format } ' ) "
2021-01-07 03:42:39 +00:00
2021-02-20 14:59:39 +00:00
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2021-01-07 03:42:39 +00:00
with pytest . raises ( helpers . client . QueryRuntimeException ) as ei :
result = run_query ( instance , get_query )
print ( result )
assert ei . value . returncode == 243
2022-11-29 17:33:35 +00:00
assert " HTTP response code: 403 " in ei . value . stderr
2021-01-07 03:42:39 +00:00
2021-04-29 11:57:48 +00:00
2022-08-05 16:20:15 +00:00
def test_infinite_redirect ( started_cluster ) :
2021-04-29 11:57:48 +00:00
bucket = " redirected "
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
filename = " test.csv "
get_query = f " select * from s3( ' http://resolver: { started_cluster . minio_redirect_port } / { bucket } / { filename } ' , ' CSV ' , ' { table_format } ' ) "
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
exception_raised = False
try :
run_query ( instance , get_query )
except Exception as e :
assert str ( e ) . find ( " Too many redirects while trying to access " ) != - 1
exception_raised = True
finally :
assert exception_raised
2022-03-22 16:39:58 +00:00
2021-01-29 04:54:52 +00:00
@pytest.mark.parametrize (
" extension,method " ,
2022-07-07 20:19:15 +00:00
[ pytest . param ( " bin " , " gzip " , id = " bin " ) , pytest . param ( " gz " , " auto " , id = " gz " ) ] ,
2021-01-29 04:54:52 +00:00
)
2022-08-05 16:20:15 +00:00
def test_storage_s3_get_gzip ( started_cluster , extension , method ) :
2021-02-20 14:59:39 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
2021-01-29 04:54:52 +00:00
filename = f " test_get_gzip. { extension } "
2021-03-03 08:56:15 +00:00
name = f " test_get_gzip_ { extension } "
2020-09-28 23:30:41 +00:00
data = [
" Sophia Intrieri,55 " ,
" Jack Taylor,71 " ,
" Christopher Silva,66 " ,
" Clifton Purser,35 " ,
" Richard Aceuedo,43 " ,
" Lisa Hensley,31 " ,
" Alice Wehrley,1 " ,
" Mary Farmer,47 " ,
" Samara Ramirez,19 " ,
" Shirley Lloyd,51 " ,
" Santos Cowger,0 " ,
" Richard Mundt,88 " ,
" Jerry Gonzalez,15 " ,
" Angela James,10 " ,
" Norman Ortega,33 " ,
" " ,
]
2021-07-15 12:49:25 +00:00
run_query ( instance , f " DROP TABLE IF EXISTS { name } " )
2020-10-02 16:54:07 +00:00
buf = io . BytesIO ( )
2020-09-28 23:30:41 +00:00
compressed = gzip . GzipFile ( fileobj = buf , mode = " wb " )
2020-10-02 16:54:07 +00:00
compressed . write ( ( " \n " . join ( data ) ) . encode ( ) )
2020-09-28 23:30:41 +00:00
compressed . close ( )
2021-02-20 14:59:39 +00:00
put_s3_file_content ( started_cluster , bucket , filename , buf . getvalue ( ) )
2020-09-28 23:30:41 +00:00
2021-03-03 08:56:15 +00:00
run_query (
instance ,
f """ CREATE TABLE { name } (name String, id UInt32) ENGINE = S3(
2021-04-14 11:21:40 +00:00
' http:// {started_cluster.minio_ip} : {MINIO_INTERNAL_PORT} / {bucket} / {filename} ' ,
2021-03-03 08:56:15 +00:00
' CSV ' ,
' {method} ' ) """ ,
)
2020-09-28 23:30:41 +00:00
2021-07-15 12:49:25 +00:00
run_query ( instance , f " SELECT sum(id) FROM { name } " ) . splitlines ( ) == [ " 565 " ]
run_query ( instance , f " DROP TABLE { name } " )
2020-09-28 23:30:41 +00:00
2022-08-05 16:20:15 +00:00
def test_storage_s3_get_unstable ( started_cluster ) :
2021-04-27 15:34:33 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
2021-04-12 21:38:45 +00:00
table_format = " column1 Int64, column2 Int64, column3 Int64, column4 Int64 "
2024-07-22 15:47:53 +00:00
get_query = f " SELECT count(), sum(column3), sum(column4) FROM s3( ' http://resolver:8081/ { started_cluster . minio_bucket } /test.csv ' , ' CSV ' , ' { table_format } ' ) SETTINGS s3_max_single_read_retries=30 FORMAT CSV "
2021-04-12 08:55:54 +00:00
result = run_query ( instance , get_query )
2021-05-08 21:55:24 +00:00
assert result . splitlines ( ) == [ " 500001,500000,0 " ]
2021-04-12 08:55:54 +00:00
2023-11-20 13:53:22 +00:00
def test_storage_s3_get_slow ( started_cluster ) :
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
table_format = " column1 Int64, column2 Int64, column3 Int64, column4 Int64 "
get_query = f " SELECT count(), sum(column3), sum(column4) FROM s3( ' http://resolver:8081/ { started_cluster . minio_bucket } /slow_send_test.csv ' , ' CSV ' , ' { table_format } ' ) FORMAT CSV "
result = run_query ( instance , get_query )
assert result . splitlines ( ) == [ " 500001,500000,0 " ]
2022-08-05 16:20:15 +00:00
def test_storage_s3_put_uncompressed ( started_cluster ) :
2024-07-23 09:10:47 +00:00
id = uuid . uuid4 ( )
2021-02-20 14:59:39 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
2024-07-23 09:10:47 +00:00
filename = f " { id } /test_put_uncompressed.bin "
2020-09-30 12:04:21 +00:00
name = " test_put_uncompressed "
data = [
" ' Gloria Thompson ' ,99 " ,
" ' Matthew Tang ' ,98 " ,
" ' Patsy Anderson ' ,23 " ,
" ' Nancy Badillo ' ,93 " ,
" ' Roy Hunt ' ,5 " ,
" ' Adam Kirk ' ,51 " ,
" ' Joshua Douds ' ,28 " ,
" ' Jolene Ryan ' ,0 " ,
" ' Roxanne Padilla ' ,50 " ,
" ' Howard Roberts ' ,41 " ,
" ' Ricardo Broughton ' ,13 " ,
" ' Roland Speer ' ,83 " ,
" ' Cathy Cohan ' ,58 " ,
" ' Kathie Dawson ' ,100 " ,
" ' Gregg Mcquistion ' ,11 " ,
]
2021-03-03 08:56:15 +00:00
run_query (
instance ,
" CREATE TABLE {} (name String, id UInt32) ENGINE = S3( ' http:// {} : {} / {} / {} ' , ' CSV ' ) " . format (
2021-04-14 11:21:40 +00:00
name , started_cluster . minio_ip , MINIO_INTERNAL_PORT , bucket , filename
2022-03-22 16:39:58 +00:00
) ,
2021-04-14 11:21:40 +00:00
)
2023-11-07 10:03:57 +00:00
insert_query_id = uuid . uuid4 ( ) . hex
data_sep = " ),( "
run_query (
instance ,
" INSERT INTO {} VALUES ( {} ) " . format ( name , data_sep . join ( data ) ) ,
query_id = insert_query_id ,
)
2020-09-30 13:09:55 +00:00
2021-03-03 08:56:15 +00:00
run_query ( instance , " SELECT sum(id) FROM {} " . format ( name ) ) . splitlines ( ) == [ " 753 " ]
2020-09-30 12:04:21 +00:00
2021-03-03 08:56:15 +00:00
uncompressed_content = get_s3_file_content ( started_cluster , bucket , filename )
assert sum ( [ int ( i . split ( " , " ) [ 1 ] ) for i in uncompressed_content . splitlines ( ) ] ) == 753
2020-09-30 12:04:21 +00:00
2023-11-07 10:03:57 +00:00
instance . query ( " SYSTEM FLUSH LOGS " )
blob_storage_log = instance . query ( f " SELECT * FROM system.blob_storage_log " )
result = instance . query (
f """ SELECT
countIf ( event_type == ' Upload ' ) ,
countIf ( remote_path == ' {filename} ' ) ,
countIf ( bucket == ' {bucket} ' ) ,
count ( )
FROM system . blob_storage_log WHERE query_id = ' {insert_query_id} ' """
)
r = result . strip ( ) . split ( " \t " )
assert int ( r [ 0 ] ) > = 1 , blob_storage_log
assert all ( col == r [ 0 ] for col in r ) , blob_storage_log
2024-07-23 09:10:47 +00:00
run_query ( instance , f " DROP TABLE { name } " )
2023-11-07 10:03:57 +00:00
2020-09-30 12:04:21 +00:00
2021-01-29 04:54:52 +00:00
@pytest.mark.parametrize (
" extension,method " ,
2021-04-12 07:03:12 +00:00
[ pytest . param ( " bin " , " gzip " , id = " bin " ) , pytest . param ( " gz " , " auto " , id = " gz " ) ] ,
2021-01-29 04:54:52 +00:00
)
2022-08-05 16:20:15 +00:00
def test_storage_s3_put_gzip ( started_cluster , extension , method ) :
2024-07-23 14:43:49 +00:00
id = uuid . uuid4 ( )
2021-02-20 14:59:39 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
2024-07-23 14:43:49 +00:00
filename = f " { id } /test_put_gzip. { extension } "
2021-03-03 08:56:15 +00:00
name = f " test_put_gzip_ { extension } "
2020-09-28 23:30:41 +00:00
data = [
" ' Joseph Tomlinson ' ,5 " ,
" ' Earnest Essary ' ,44 " ,
" ' Matha Pannell ' ,24 " ,
" ' Michael Shavers ' ,46 " ,
" ' Elias Groce ' ,38 " ,
" ' Pamela Bramlet ' ,50 " ,
" ' Lewis Harrell ' ,49 " ,
" ' Tamara Fyall ' ,58 " ,
" ' George Dixon ' ,38 " ,
" ' Alice Walls ' ,49 " ,
" ' Paula Mais ' ,24 " ,
" ' Myrtle Pelt ' ,93 " ,
" ' Sylvia Naffziger ' ,18 " ,
" ' Amanda Cave ' ,83 " ,
" ' Yolanda Joseph ' ,89 " ,
]
2021-03-03 08:56:15 +00:00
run_query (
instance ,
f """ CREATE TABLE { name } (name String, id UInt32) ENGINE = S3(
2021-04-14 11:21:40 +00:00
' http:// {started_cluster.minio_ip} : {MINIO_INTERNAL_PORT} / {bucket} / {filename} ' ,
2021-03-03 08:56:15 +00:00
' CSV ' ,
' {method} ' ) """ ,
)
2020-09-28 23:30:41 +00:00
2021-03-03 08:56:15 +00:00
run_query ( instance , f " INSERT INTO { name } VALUES ( { ' ),( ' . join ( data ) } ) " )
2020-09-30 13:09:55 +00:00
2021-03-03 08:56:15 +00:00
run_query ( instance , f " SELECT sum(id) FROM { name } " ) . splitlines ( ) == [ " 708 " ]
2020-09-28 23:30:41 +00:00
2021-03-03 08:56:15 +00:00
buf = io . BytesIO (
get_s3_file_content ( started_cluster , bucket , filename , decode = False )
2022-03-22 16:39:58 +00:00
)
2021-03-03 08:56:15 +00:00
f = gzip . GzipFile ( fileobj = buf , mode = " rb " )
uncompressed_content = f . read ( ) . decode ( )
2021-06-06 09:38:49 +00:00
assert sum ( [ int ( i . split ( " , " ) [ 1 ] ) for i in uncompressed_content . splitlines ( ) ] ) == 708
2024-07-23 14:43:49 +00:00
run_query ( instance , f " DROP TABLE { name } " )
2021-06-21 15:44:36 +00:00
2022-08-05 16:20:15 +00:00
def test_truncate_table ( started_cluster ) :
2021-06-21 15:44:36 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
name = " truncate "
instance . query (
" CREATE TABLE {} (id UInt32) ENGINE = S3( ' http:// {} : {} / {} / {} ' , ' CSV ' ) " . format (
name , started_cluster . minio_ip , MINIO_INTERNAL_PORT , bucket , name
)
2022-03-22 16:39:58 +00:00
)
2021-06-21 15:44:36 +00:00
instance . query ( " INSERT INTO {} SELECT number FROM numbers(10) " . format ( name ) )
result = instance . query ( " SELECT * FROM {} " . format ( name ) )
assert result == instance . query ( " SELECT number FROM numbers(10) " )
instance . query ( " TRUNCATE TABLE {} " . format ( name ) )
minio = started_cluster . minio_client
timeout = 30
while timeout > 0 :
if (
len ( list ( minio . list_objects ( started_cluster . minio_bucket , " truncate/ " ) ) )
== 0
) :
2024-07-23 14:43:49 +00:00
break
2021-06-21 15:44:36 +00:00
timeout - = 1
time . sleep ( 1 )
assert len ( list ( minio . list_objects ( started_cluster . minio_bucket , " truncate/ " ) ) ) == 0
2024-07-23 14:43:49 +00:00
# FIXME: there was a bug in test and it was never checked.
# Currently read from truncated table fails with
# DB::Exception: Failed to get object info: No response body..
# HTTP response code: 404: while reading truncate: While executing S3Source
# assert instance.query("SELECT * FROM {}".format(name)) == ""
instance . query ( f " DROP TABLE { name } SYNC " )
assert (
instance . query ( f " SELECT count() FROM system.tables where name= ' { name } ' " )
== " 0 \n "
)
2021-06-21 15:44:36 +00:00
2021-09-08 19:28:22 +00:00
2022-08-05 16:20:15 +00:00
def test_predefined_connection_configuration ( started_cluster ) :
2024-07-23 14:43:49 +00:00
id = uuid . uuid4 ( )
2021-09-08 19:28:22 +00:00
bucket = started_cluster . minio_bucket
2023-06-13 11:32:56 +00:00
instance = started_cluster . instances [
" dummy_without_named_collections "
] # type: ClickHouseInstance
2021-09-08 19:28:22 +00:00
name = " test_table "
2023-06-06 12:46:34 +00:00
instance . query ( " CREATE USER user " )
instance . query ( " GRANT CREATE ON *.* TO user " )
instance . query ( " GRANT SOURCES ON *.* TO user " )
instance . query ( " GRANT SELECT ON *.* TO user " )
instance . query ( f " drop table if exists { name } " , user = " user " )
error = instance . query_and_get_error (
2023-06-06 16:53:31 +00:00
f " CREATE TABLE { name } (id UInt32) ENGINE = S3(s3_conf1, format= ' CSV ' ) " ,
user = " user " ,
)
assert (
2023-08-06 14:27:45 +00:00
" To execute this query, it ' s necessary to have the grant NAMED COLLECTION ON s3_conf1 "
2023-06-06 16:53:31 +00:00
in error
2023-06-06 12:46:34 +00:00
)
2023-06-13 10:40:53 +00:00
instance . query ( " GRANT NAMED COLLECTION ON s3_conf1 TO user " , user = " admin " )
2021-09-08 19:28:22 +00:00
instance . query (
2023-06-06 16:53:31 +00:00
f " CREATE TABLE { name } (id UInt32) ENGINE = S3(s3_conf1, format= ' CSV ' ) " ,
user = " user " ,
2022-03-22 16:39:58 +00:00
)
2021-09-08 19:28:22 +00:00
2024-07-23 14:43:49 +00:00
instance . query (
f " INSERT INTO { name } SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1 "
)
2023-06-06 12:46:34 +00:00
result = instance . query ( f " SELECT * FROM { name } " )
2021-09-08 19:28:22 +00:00
assert result == instance . query ( " SELECT number FROM numbers(10) " )
result = instance . query (
2023-06-06 12:46:34 +00:00
" SELECT * FROM s3(s3_conf1, format= ' CSV ' , structure= ' id UInt32 ' ) " , user = " user "
2021-09-08 19:28:22 +00:00
)
assert result == instance . query ( " SELECT number FROM numbers(10) " )
2021-10-31 19:53:24 +00:00
2023-06-06 12:46:34 +00:00
error = instance . query_and_get_error ( " SELECT * FROM s3(no_collection) " , user = " user " )
2023-06-13 10:40:53 +00:00
assert (
2023-08-06 14:27:45 +00:00
" To execute this query, it ' s necessary to have the grant NAMED COLLECTION ON no_collection "
2023-06-13 10:40:53 +00:00
in error
)
2024-07-23 14:43:49 +00:00
instance2 = started_cluster . instances [ " dummy " ] # has named collection access
error = instance2 . query_and_get_error ( " SELECT * FROM s3(no_collection) " )
2023-06-06 12:46:34 +00:00
assert " There is no named collection `no_collection` " in error
2024-07-23 14:43:49 +00:00
instance . query ( " DROP USER user " )
instance . query ( f " DROP TABLE { name } " )
2022-12-22 14:29:35 +00:00
2021-10-31 19:53:24 +00:00
2021-11-09 20:11:02 +00:00
result = " "
2022-03-22 16:39:58 +00:00
2022-08-05 16:20:15 +00:00
def test_url_reconnect_in_the_middle ( started_cluster ) :
2021-11-09 20:11:02 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
table_format = " id String, data String "
filename = " test_url_reconnect_ {} .tsv " . format ( random . randint ( 0 , 1000 ) )
instance . query (
f """ insert into table function
s3 ( ' http:// {started_cluster.minio_host} : {started_cluster.minio_port} / {bucket} / {filename} ' , ' TSV ' , ' {table_format} ' )
select number , randomPrintableASCII ( number % 1000 ) from numbers ( 1000000 ) """
)
with PartitionManager ( ) as pm :
pm_rule_reject = {
" probability " : 0.02 ,
" destination " : instance . ip_address ,
" source_port " : started_cluster . minio_port ,
" action " : " REJECT --reject-with tcp-reset " ,
}
pm_rule_drop_all = {
" destination " : instance . ip_address ,
" source_port " : started_cluster . minio_port ,
" action " : " DROP " ,
2022-03-22 16:39:58 +00:00
}
2021-11-09 20:11:02 +00:00
pm . _add_rule ( pm_rule_reject )
def select ( ) :
global result
result = instance . query (
2023-06-26 21:49:44 +00:00
f """ select count(), sum(cityHash64(x)) from (select toUInt64(id) + sleep(0.1) as x from
2021-11-09 20:11:02 +00:00
url ( ' http:// {started_cluster.minio_host} : {started_cluster.minio_port} / {bucket} / {filename} ' , ' TSV ' , ' {table_format} ' )
settings http_max_tries = 10 , http_retry_max_backoff_ms = 2000 , http_send_timeout = 1 , http_receive_timeout = 1 ) """
2021-12-23 16:44:24 +00:00
)
2023-06-26 21:49:44 +00:00
assert result == " 1000000 \t 3914219105369203805 \n "
2021-11-09 20:11:02 +00:00
thread = threading . Thread ( target = select )
thread . start ( )
time . sleep ( 4 )
pm . _add_rule ( pm_rule_drop_all )
time . sleep ( 2 )
pm . _delete_rule ( pm_rule_drop_all )
pm . _delete_rule ( pm_rule_reject )
thread . join ( )
2023-06-26 21:49:44 +00:00
assert result == " 1000000 \t 3914219105369203805 \n "
2021-11-13 11:38:57 +00:00
2024-07-24 15:49:17 +00:00
# At the time of writing the actual read bytes are respectively 148 and 169, so -10% to not be flaky
2024-07-24 16:05:10 +00:00
@pytest.mark.parametrize (
" format_name,expected_bytes_read " , [ ( " Parquet " , 133 ) , ( " ORC " , 150 ) ]
)
2024-07-24 15:49:17 +00:00
def test_seekable_formats ( started_cluster , format_name , expected_bytes_read ) :
2024-07-24 16:05:10 +00:00
expected_lines = 1500000
2021-10-31 19:53:24 +00:00
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2024-07-24 15:49:17 +00:00
table_function = f " s3(s3_ { format_name . lower ( ) } , structure= ' a Int32, b String ' , format= ' { format_name } ' ) "
2022-07-04 11:52:53 +00:00
exec_query_with_retry (
instance ,
2024-07-24 15:49:17 +00:00
f " INSERT INTO TABLE FUNCTION { table_function } SELECT number, randomString(100) FROM numbers( { expected_lines } ) settings s3_truncate_on_insert=1 " ,
timeout = 300 ,
2022-01-18 19:26:13 +00:00
)
2021-10-31 19:53:24 +00:00
result = instance . query ( f " SELECT count() FROM { table_function } " )
2024-07-24 15:49:17 +00:00
assert int ( result ) == expected_lines
2021-10-31 19:53:24 +00:00
2022-07-04 11:52:53 +00:00
result = instance . query (
2023-06-14 17:38:03 +00:00
f " SELECT count() FROM { table_function } SETTINGS max_memory_usage= ' 60M ' , max_download_threads=1 "
2022-07-04 11:52:53 +00:00
)
2024-07-24 15:49:17 +00:00
assert int ( result ) == expected_lines
2022-07-04 11:52:53 +00:00
instance . query ( f " SELECT * FROM { table_function } FORMAT Null " )
2021-11-20 12:01:45 +00:00
2021-12-21 00:38:39 +00:00
instance . query ( " SYSTEM FLUSH LOGS " )
2021-11-20 12:01:45 +00:00
result = instance . query (
2022-07-04 18:56:34 +00:00
f " SELECT formatReadableSize(ProfileEvents[ ' ReadBufferFromS3Bytes ' ]) FROM system.query_log WHERE startsWith(query, ' SELECT * FROM s3 ' ) AND memory_usage > 0 AND type= ' QueryFinish ' ORDER BY event_time_microseconds DESC LIMIT 1 "
2021-11-20 12:01:45 +00:00
)
2022-07-04 11:52:53 +00:00
result = result . strip ( )
assert result . endswith ( " MiB " )
2022-03-01 09:38:01 +00:00
result = result [ : result . index ( " . " ) ]
2024-07-24 15:49:17 +00:00
assert int ( result ) > 140
2021-11-20 12:01:45 +00:00
2024-07-24 15:49:17 +00:00
@pytest.mark.parametrize ( " format_name " , [ " Parquet " , " ORC " ] )
def test_seekable_formats_url ( started_cluster , format_name ) :
2021-11-20 12:01:45 +00:00
bucket = started_cluster . minio_bucket
2024-07-24 16:05:10 +00:00
expected_lines = 1500000
2022-07-04 11:52:53 +00:00
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
2021-11-20 12:01:45 +00:00
2024-07-24 16:05:10 +00:00
format_name_lower = format_name . lower ( )
2024-07-24 15:49:17 +00:00
table_function = f " s3(s3_ { format_name_lower } , structure= ' a Int32, b String ' , format= ' { format_name } ' ) "
2022-07-04 11:52:53 +00:00
exec_query_with_retry (
instance ,
2024-07-24 15:49:17 +00:00
f " INSERT INTO TABLE FUNCTION { table_function } SELECT number, randomString(100) FROM numbers( { expected_lines } ) settings s3_truncate_on_insert=1 " ,
timeout = 300 ,
2022-01-18 19:26:13 +00:00
)
2021-11-20 12:01:45 +00:00
result = instance . query ( f " SELECT count() FROM { table_function } " )
2024-07-24 15:49:17 +00:00
assert int ( result ) == expected_lines
2021-11-20 12:01:45 +00:00
2024-07-24 15:49:17 +00:00
url_function = f " url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_ { format_name_lower } ' , ' { format_name } ' , ' a Int32, b String ' ) "
2021-11-20 12:01:45 +00:00
result = instance . query (
2024-07-24 15:49:17 +00:00
f " SELECT count() FROM { url_function } SETTINGS max_memory_usage= ' 60M ' "
2021-11-20 12:01:45 +00:00
)
2024-07-24 15:49:17 +00:00
assert int ( result ) == expected_lines
2021-12-22 08:42:23 +00:00
2022-08-05 16:20:15 +00:00
def test_empty_file ( started_cluster ) :
2021-12-22 08:42:23 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
name = " empty "
url = f " http:// { started_cluster . minio_ip } : { MINIO_INTERNAL_PORT } / { bucket } / { name } "
minio = started_cluster . minio_client
minio . put_object ( bucket , name , io . BytesIO ( b " " ) , 0 )
table_function = f " s3( ' { url } ' , ' CSV ' , ' id Int32 ' ) "
result = instance . query ( f " SELECT count() FROM { table_function } " )
assert int ( result ) == 0
2021-12-24 14:13:35 +00:00
2022-08-05 16:20:15 +00:00
def test_insert_with_path_with_globs ( started_cluster ) :
2021-12-24 14:13:35 +00:00
instance = started_cluster . instances [ " dummy " ]
table_function_3 = f " s3( ' http://minio1:9001/root/test_parquet* ' , ' minio ' , ' minio123 ' , ' Parquet ' , ' a Int32, b String ' ) "
instance . query_and_get_error (
f " insert into table function { table_function_3 } SELECT number, randomString(100) FROM numbers(500) "
)
2021-12-17 15:34:13 +00:00
2022-08-05 16:20:15 +00:00
def test_s3_schema_inference ( started_cluster ) :
2021-12-17 15:34:13 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
2021-12-20 14:39:15 +00:00
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into table function s3(s3_native, structure= ' a Int32, b String ' , format= ' Native ' ) select number, randomString(100) from numbers(5000000) SETTINGS s3_truncate_on_insert=1 "
2021-12-20 14:39:15 +00:00
)
result = instance . query ( f " desc s3(s3_native, format= ' Native ' ) " )
2021-12-17 15:34:13 +00:00
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
2021-12-20 14:39:15 +00:00
result = instance . query ( f " select count(*) from s3(s3_native, format= ' Native ' ) " )
2021-12-17 15:34:13 +00:00
assert int ( result ) == 5000000
2021-12-20 14:39:15 +00:00
instance . query (
f " create table schema_inference engine=S3(s3_native, format= ' Native ' ) "
)
2021-12-17 15:34:13 +00:00
result = instance . query ( f " desc schema_inference " )
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
result = instance . query ( f " select count(*) from schema_inference " )
assert int ( result ) == 5000000
2022-02-25 08:32:39 +00:00
2021-12-23 16:44:24 +00:00
table_function = f " url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_native ' , ' Native ' ) "
result = instance . query ( f " desc { table_function } " )
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
result = instance . query ( f " select count(*) from { table_function } " )
assert int ( result ) == 5000000
instance . query (
f " create table schema_inference_2 engine=URL( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_native ' , ' Native ' ) "
)
result = instance . query ( f " desc schema_inference_2 " )
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
result = instance . query ( f " select count(*) from schema_inference_2 " )
assert int ( result ) == 5000000
2022-01-31 16:39:20 +00:00
table_function = f " s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_native ' , ' Native ' ) "
result = instance . query ( f " desc { table_function } " )
assert result == " a \t Int32 \t \t \t \t \t \n b \t String \t \t \t \t \t \n "
result = instance . query ( f " select count(*) from { table_function } " )
assert int ( result ) == 5000000
2024-07-23 14:43:49 +00:00
instance . query ( " drop table schema_inference " )
instance . query ( " drop table schema_inference_2 " )
2021-12-23 16:44:24 +00:00
2022-08-05 16:20:15 +00:00
def test_empty_file ( started_cluster ) :
2021-12-23 16:44:24 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
name = " empty "
url = f " http:// { started_cluster . minio_ip } : { MINIO_INTERNAL_PORT } / { bucket } / { name } "
minio = started_cluster . minio_client
minio . put_object ( bucket , name , io . BytesIO ( b " " ) , 0 )
table_function = f " s3( ' { url } ' , ' CSV ' , ' id Int32 ' ) "
result = instance . query ( f " SELECT count() FROM { table_function } " )
assert int ( result ) == 0
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_overwrite ( started_cluster ) :
2021-12-29 18:03:15 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
table_function = f " s3(s3_parquet, structure= ' a Int32, b String ' , format= ' Parquet ' ) "
instance . query ( f " create table test_overwrite as { table_function } " )
instance . query ( f " truncate table test_overwrite " )
instance . query (
f " insert into test_overwrite select number, randomString(100) from numbers(50) settings s3_truncate_on_insert=1 "
)
instance . query_and_get_error (
f " insert into test_overwrite select number, randomString(100) from numbers(100) "
)
instance . query (
f " insert into test_overwrite select number, randomString(100) from numbers(200) settings s3_truncate_on_insert=1 "
)
result = instance . query ( f " select count() from test_overwrite " )
assert int ( result ) == 200
2024-07-23 14:43:49 +00:00
instance . query ( f " drop table test_overwrite " )
2021-12-29 18:03:15 +00:00
2022-08-05 16:20:15 +00:00
def test_create_new_files_on_insert ( started_cluster ) :
2021-12-29 18:03:15 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
table_function = f " s3(s3_parquet, structure= ' a Int32, b String ' , format= ' Parquet ' ) "
instance . query ( f " create table test_multiple_inserts as { table_function } " )
instance . query ( f " truncate table test_multiple_inserts " )
2022-01-18 19:26:13 +00:00
instance . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
instance . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1 "
)
instance . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1 "
)
2022-02-25 08:32:39 +00:00
2021-12-29 18:03:15 +00:00
result = instance . query ( f " select count() from test_multiple_inserts " )
assert int ( result ) == 60
instance . query ( f " drop table test_multiple_inserts " )
table_function = (
f " s3(s3_parquet_gz, structure= ' a Int32, b String ' , format= ' Parquet ' ) "
2022-03-22 16:39:58 +00:00
)
2021-12-29 18:03:15 +00:00
instance . query ( f " create table test_multiple_inserts as { table_function } " )
instance . query ( f " truncate table test_multiple_inserts " )
2022-01-18 19:26:13 +00:00
instance . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(10) settings s3_truncate_on_insert=1 "
)
2021-12-29 18:03:15 +00:00
instance . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(20) settings s3_create_new_file_on_insert=1 "
)
instance . query (
f " insert into test_multiple_inserts select number, randomString(100) from numbers(30) settings s3_create_new_file_on_insert=1 "
)
2022-02-25 08:32:39 +00:00
2021-12-29 18:03:15 +00:00
result = instance . query ( f " select count() from test_multiple_inserts " )
assert int ( result ) == 60
2024-07-23 14:43:49 +00:00
instance . query ( " drop table test_multiple_inserts " )
2021-12-29 18:03:15 +00:00
2022-02-25 08:32:39 +00:00
2022-08-05 16:20:15 +00:00
def test_format_detection ( started_cluster ) :
2022-01-14 11:00:50 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query ( f " create table arrow_table_s3 (x UInt64) engine=S3(s3_arrow) " )
2024-07-23 14:43:49 +00:00
instance . query (
f " insert into arrow_table_s3 select 1 settings s3_truncate_on_insert=1 "
)
2022-01-14 11:00:50 +00:00
result = instance . query ( f " select * from s3(s3_arrow) " )
assert int ( result ) == 1
2022-01-31 16:39:20 +00:00
result = instance . query (
f " select * from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' ) "
)
assert int ( result ) == 1
2021-11-09 20:11:02 +00:00
result = instance . query (
2022-02-10 15:57:02 +00:00
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' ) "
2022-03-22 16:39:58 +00:00
)
2022-02-18 16:19:42 +00:00
assert int ( result ) == 1
2022-02-08 10:40:03 +00:00
instance . query ( f " create table parquet_table_s3 (x UInt64) engine=S3(s3_parquet2) " )
2024-07-23 14:43:49 +00:00
instance . query (
f " insert into parquet_table_s3 select 1 settings s3_truncate_on_insert=1 "
)
2022-02-08 10:40:03 +00:00
result = instance . query ( f " select * from s3(s3_parquet2) " )
assert int ( result ) == 1
result = instance . query (
f " select * from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.parquet ' ) "
)
assert int ( result ) == 1
result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.parquet ' ) "
)
assert int ( result ) == 1
2024-07-23 14:43:49 +00:00
instance . query ( f " drop table arrow_table_s3 " )
instance . query ( f " drop table parquet_table_s3 " )
2022-02-08 10:40:03 +00:00
2022-08-05 16:20:15 +00:00
def test_schema_inference_from_globs ( started_cluster ) :
2024-07-23 14:43:49 +00:00
id = uuid . uuid4 ( )
2022-02-09 16:14:14 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test1.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select NULL "
2022-02-09 16:14:14 +00:00
)
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test2.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select 0 "
2022-02-09 16:14:14 +00:00
)
url_filename = " test { 1,2}.jsoncompacteachrow "
result = instance . query (
2024-07-23 14:43:49 +00:00
f " desc url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } / { url_filename } ' ) settings input_format_json_infer_incomplete_types_as_strings=0 "
2022-02-09 16:14:14 +00:00
)
2022-07-21 16:54:42 +00:00
assert result . strip ( ) == " c1 \t Nullable(Int64) "
2022-02-09 16:14:14 +00:00
result = instance . query (
2024-07-23 14:43:49 +00:00
f " select * from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } / { url_filename } ' ) settings input_format_json_infer_incomplete_types_as_strings=0 "
2022-02-09 16:14:14 +00:00
)
assert sorted ( result . split ( ) ) == [ " 0 " , " \\ N " ]
result = instance . query (
2024-07-23 14:43:49 +00:00
f " desc s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test*.jsoncompacteachrow ' ) settings input_format_json_infer_incomplete_types_as_strings=0 "
2022-02-09 16:14:14 +00:00
)
2022-07-21 16:54:42 +00:00
assert result . strip ( ) == " c1 \t Nullable(Int64) "
2022-02-09 16:14:14 +00:00
result = instance . query (
2024-07-23 14:43:49 +00:00
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test*.jsoncompacteachrow ' ) settings input_format_json_infer_incomplete_types_as_strings=0 "
2022-02-09 16:14:14 +00:00
)
assert sorted ( result . split ( ) ) == [ " 0 " , " \\ N " ]
2022-04-13 16:59:04 +00:00
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test3.jsoncompacteachrow ' , ' JSONCompactEachRow ' , ' x Nullable(UInt32) ' ) select NULL "
2022-04-13 16:59:04 +00:00
)
url_filename = " test { 1,3}.jsoncompacteachrow "
result = instance . query_and_get_error (
2024-07-23 14:43:49 +00:00
f " desc s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } / { url_filename } ' ) settings schema_inference_use_cache_for_s3=0, input_format_json_infer_incomplete_types_as_strings=0 "
2022-04-13 16:59:04 +00:00
)
assert " All attempts to extract table structure from files failed " in result
result = instance . query_and_get_error (
2024-07-23 14:43:49 +00:00
f " desc url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } / { url_filename } ' ) settings schema_inference_use_cache_for_url=0, input_format_json_infer_incomplete_types_as_strings=0 "
2022-04-13 16:59:04 +00:00
)
assert " All attempts to extract table structure from files failed " in result
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test0.jsoncompacteachrow ' , ' TSV ' , ' x String ' ) select ' [123;] ' "
2022-04-13 16:59:04 +00:00
)
result = instance . query_and_get_error (
2024-07-23 14:43:49 +00:00
f " desc s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test*.jsoncompacteachrow ' ) settings schema_inference_use_cache_for_s3=0, input_format_json_infer_incomplete_types_as_strings=0 "
2022-04-13 16:59:04 +00:00
)
2024-01-26 01:03:12 +00:00
assert " CANNOT_EXTRACT_TABLE_STRUCTURE " in result
2022-04-13 16:59:04 +00:00
url_filename = " test { 0,1,2,3}.jsoncompacteachrow "
2022-04-20 14:35:24 +00:00
2022-04-13 16:59:04 +00:00
result = instance . query_and_get_error (
2024-07-23 14:43:49 +00:00
f " desc url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } / { url_filename } ' ) settings schema_inference_use_cache_for_url=0, input_format_json_infer_incomplete_types_as_strings=0 "
2022-04-13 16:59:04 +00:00
)
2024-01-26 01:03:12 +00:00
assert " CANNOT_EXTRACT_TABLE_STRUCTURE " in result
2022-04-13 16:59:04 +00:00
2022-02-16 09:45:31 +00:00
2022-08-05 16:20:15 +00:00
def test_signatures ( started_cluster ) :
2023-12-18 14:31:01 +00:00
session_token = " session token that will not be checked by MiniIO "
2022-02-10 15:57:02 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query ( f " create table test_signatures (x UInt64) engine=S3(s3_arrow) " )
instance . query ( f " truncate table test_signatures " )
instance . query ( f " insert into test_signatures select 1 " )
result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' ) "
)
assert int ( result ) == 1
result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' , ' Arrow ' , ' x UInt64 ' ) "
)
assert int ( result ) == 1
result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' , ' minio ' , ' minio123 ' ) "
)
assert int ( result ) == 1
2024-03-14 12:42:07 +00:00
error = instance . query_and_get_error (
2023-12-18 14:31:01 +00:00
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' , ' minio ' , ' minio123 ' , ' { session_token } ' ) "
)
2024-03-14 12:42:07 +00:00
assert " S3_ERROR " in error
2023-12-18 14:31:01 +00:00
2022-02-10 15:57:02 +00:00
result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' , ' Arrow ' , ' x UInt64 ' , ' auto ' ) "
)
assert int ( result ) == 1
result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' , ' minio ' , ' minio123 ' , ' Arrow ' ) "
)
assert int ( result ) == 1
2024-03-14 12:42:07 +00:00
error = instance . query_and_get_error (
2023-12-18 14:31:01 +00:00
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' , ' minio ' , ' minio123 ' , ' { session_token } ' , ' Arrow ' ) "
)
2024-03-14 12:42:07 +00:00
assert " S3_ERROR " in error
2023-12-18 14:31:01 +00:00
2024-03-14 12:42:07 +00:00
error = instance . query_and_get_error (
2023-12-18 14:31:01 +00:00
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' , ' minio ' , ' minio123 ' , ' { session_token } ' , ' Arrow ' , ' x UInt64 ' ) "
)
2024-03-14 12:42:07 +00:00
assert " S3_ERROR " in error
2023-12-18 14:31:01 +00:00
2024-03-14 12:42:07 +00:00
error = instance . query_and_get_error (
2023-12-18 14:31:01 +00:00
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test.arrow ' , ' minio ' , ' minio123 ' , ' { session_token } ' , ' Arrow ' , ' x UInt64 ' , ' auto ' ) "
)
2024-03-14 12:42:07 +00:00
assert " S3_ERROR " in error
2023-12-18 14:31:01 +00:00
2024-07-23 14:43:49 +00:00
instance . query ( f " drop table test_signatures " )
2022-02-25 08:32:39 +00:00
2022-08-05 16:20:15 +00:00
def test_select_columns ( started_cluster ) :
2022-02-25 08:32:39 +00:00
bucket = started_cluster . minio_bucket
2024-07-23 14:43:49 +00:00
id = uuid . uuid4 ( )
2022-02-25 08:32:39 +00:00
instance = started_cluster . instances [ " dummy " ]
2022-03-01 11:07:13 +00:00
name = " test_table2 "
2022-02-25 08:32:39 +00:00
structure = " id UInt32, value1 Int32, value2 Int32 "
instance . query ( f " drop table if exists { name } " )
instance . query (
f " CREATE TABLE { name } ( { structure } ) ENGINE = S3(s3_conf1, format= ' Parquet ' ) "
)
limit = 10000000
2022-03-02 12:01:15 +00:00
instance . query (
f " INSERT INTO { name } SELECT * FROM generateRandom( ' { structure } ' ) LIMIT { limit } SETTINGS s3_truncate_on_insert=1 "
)
2024-07-23 14:43:49 +00:00
instance . query ( f " SELECT value2, ' { id } ' FROM { name } " )
2022-02-25 08:32:39 +00:00
instance . query ( " SYSTEM FLUSH LOGS " )
result1 = instance . query (
2024-07-23 14:43:49 +00:00
f " SELECT ProfileEvents[ ' ReadBufferFromS3Bytes ' ] FROM system.query_log WHERE type= ' QueryFinish ' and query LIKE ' SELECT value2, ' ' { id } ' ' FROM { name } ' "
2022-02-25 08:32:39 +00:00
)
2024-07-23 14:43:49 +00:00
instance . query ( f " SELECT *, ' { id } ' FROM { name } " )
2022-02-25 08:32:39 +00:00
instance . query ( " SYSTEM FLUSH LOGS " )
result2 = instance . query (
2024-07-23 14:43:49 +00:00
f " SELECT ProfileEvents[ ' ReadBufferFromS3Bytes ' ] FROM system.query_log WHERE type= ' QueryFinish ' and query LIKE ' SELECT *, ' ' { id } ' ' FROM { name } ' "
2022-02-25 08:32:39 +00:00
)
2023-06-29 12:36:50 +00:00
assert round ( int ( result2 ) / int ( result1 ) ) == 3
2022-03-01 11:06:56 +00:00
2022-08-05 16:20:15 +00:00
def test_insert_select_schema_inference ( started_cluster ) :
2024-07-23 14:43:49 +00:00
id = uuid . uuid4 ( )
2022-02-18 16:19:42 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
2022-03-10 14:16:07 +00:00
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test_insert_select.native ' ) select toUInt64(1) as x "
2022-03-10 14:16:07 +00:00
)
result = instance . query (
2024-07-23 14:43:49 +00:00
f " desc s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test_insert_select.native ' ) "
2022-02-28 10:07:29 +00:00
)
assert result . strip ( ) == " x \t UInt64 "
2022-02-18 16:19:42 +00:00
2022-03-10 14:16:07 +00:00
result = instance . query (
2024-07-23 14:43:49 +00:00
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { id } /test_insert_select.native ' ) "
2022-02-18 16:19:42 +00:00
)
assert int ( result ) == 1
2022-04-12 13:54:53 +00:00
2022-08-05 16:20:15 +00:00
def test_parallel_reading_with_memory_limit ( started_cluster ) :
2022-04-12 13:54:53 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_memory_limit.native ' ) select * from numbers(1000000) SETTINGS s3_truncate_on_insert=1 "
2022-04-12 13:54:53 +00:00
)
result = instance . query_and_get_error (
2022-04-13 20:39:12 +00:00
f " select * from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_memory_limit.native ' ) settings max_memory_usage=1000 "
2022-04-12 13:54:53 +00:00
)
assert " Memory limit (for query) exceeded " in result
2022-04-13 20:39:12 +00:00
time . sleep ( 5 )
2022-04-12 13:54:53 +00:00
# Check that server didn't crash
result = instance . query ( " select 1 " )
assert int ( result ) == 1
2022-04-13 19:27:38 +00:00
2022-08-05 16:20:15 +00:00
def test_wrong_format_usage ( started_cluster ) :
2022-04-13 19:27:38 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_wrong_format.native ' ) select * from numbers(10e6) SETTINGS s3_truncate_on_insert=1 "
2022-04-13 19:27:38 +00:00
)
2022-08-19 19:51:23 +00:00
# size(test_wrong_format.native) = 10e6*8+16(header) ~= 76MiB
2022-04-13 19:27:38 +00:00
2022-08-19 19:51:23 +00:00
# ensure that not all file will be loaded into memory
2022-04-13 19:27:38 +00:00
result = instance . query_and_get_error (
2022-08-19 19:51:23 +00:00
f " desc s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_wrong_format.native ' , ' Parquet ' ) settings input_format_allow_seeks=0, max_memory_usage= ' 10Mi ' "
2022-04-13 19:27:38 +00:00
)
assert " Not a Parquet file " in result
2022-06-21 13:02:48 +00:00
2023-08-22 11:59:59 +00:00
def check_profile_event_for_query (
instance , file , storage_name , started_cluster , bucket , profile_event , amount
) :
2022-06-27 14:04:28 +00:00
instance . query ( " system flush logs " )
2023-08-22 11:59:59 +00:00
query_pattern = f " { storage_name } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file } ' " . replace (
" ' " , " \\ ' "
)
2022-08-24 11:28:56 +00:00
res = int (
instance . query (
2023-08-22 11:59:59 +00:00
f " select ProfileEvents[ ' { profile_event } ' ] from system.query_log where query like ' % { query_pattern } % ' and query not like ' %ProfileEvents% ' and type = ' QueryFinish ' order by query_start_time_microseconds desc limit 1 "
2022-06-27 12:43:24 +00:00
)
2022-08-24 11:28:56 +00:00
)
2022-08-18 17:15:30 +00:00
assert res == amount
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
def check_cache_misses ( instance , file , storage_name , started_cluster , bucket , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query (
instance ,
file ,
storage_name ,
started_cluster ,
bucket ,
" SchemaInferenceCacheMisses " ,
amount ,
)
2022-06-21 13:02:48 +00:00
2022-06-28 16:13:42 +00:00
2022-08-05 16:20:15 +00:00
def check_cache_hits ( instance , file , storage_name , started_cluster , bucket , amount = 1 ) :
2023-08-22 11:59:59 +00:00
check_profile_event_for_query (
instance ,
file ,
storage_name ,
started_cluster ,
bucket ,
" SchemaInferenceCacheHits " ,
amount ,
)
2022-06-28 16:13:42 +00:00
2022-06-21 13:02:48 +00:00
2022-08-11 10:55:18 +00:00
def check_cache_invalidations (
instance , file , storage_name , started_cluster , bucket , amount = 1
) :
2022-08-18 17:15:30 +00:00
check_profile_event_for_query (
2023-08-22 11:59:59 +00:00
instance ,
file ,
storage_name ,
started_cluster ,
bucket ,
" SchemaInferenceCacheInvalidations " ,
amount ,
2022-08-18 17:15:30 +00:00
)
2022-06-21 13:02:48 +00:00
2022-08-11 10:55:18 +00:00
def check_cache_evictions (
instance , file , storage_name , started_cluster , bucket , amount = 1
) :
2022-08-18 17:15:30 +00:00
check_profile_event_for_query (
2023-08-22 11:59:59 +00:00
instance ,
file ,
storage_name ,
started_cluster ,
bucket ,
" SchemaInferenceCacheEvictions " ,
amount ,
)
def check_cahce_num_rows_hits (
instance , file , storage_name , started_cluster , bucket , amount = 1
) :
check_profile_event_for_query (
instance ,
file ,
storage_name ,
started_cluster ,
bucket ,
" SchemaInferenceCacheNumRowsHits " ,
amount ,
2022-08-18 17:15:30 +00:00
)
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
def run_describe_query ( instance , file , storage_name , started_cluster , bucket ) :
query = f " desc { storage_name } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file } ' ) "
instance . query ( query )
2022-06-21 13:02:48 +00:00
2023-08-22 11:59:59 +00:00
def run_count_query ( instance , file , storage_name , started_cluster , bucket ) :
query = f " select count() from { storage_name } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file } ' , auto, ' x UInt64 ' ) "
return instance . query ( query )
2022-08-05 16:20:15 +00:00
def check_cache ( instance , expected_files ) :
sources = instance . query ( " select source from system.schema_inference_cache " )
2022-08-11 10:55:18 +00:00
assert sorted ( map ( lambda x : x . strip ( ) . split ( " / " ) [ - 1 ] , sources . split ( ) ) ) == sorted (
expected_files
)
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
def test_schema_inference_cache ( started_cluster ) :
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
def test ( storage_name ) :
2022-08-17 12:25:58 +00:00
instance . query ( " system drop schema cache " )
2022-08-05 16:20:15 +00:00
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache0.jsonl ' ) select * from numbers(100) settings s3_truncate_on_insert=1 "
)
time . sleep ( 1 )
2022-06-28 16:13:42 +00:00
2022-08-11 10:55:18 +00:00
run_describe_query (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
2022-08-05 16:20:15 +00:00
check_cache ( instance , [ " test_cache0.jsonl " ] )
2022-08-11 10:55:18 +00:00
check_cache_misses (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
2022-06-21 13:02:48 +00:00
2022-08-11 10:55:18 +00:00
run_describe_query (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
check_cache_hits (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache0.jsonl ' ) select * from numbers(100) settings s3_truncate_on_insert=1 "
)
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-11 10:55:18 +00:00
run_describe_query (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
check_cache_invalidations (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
2022-06-21 17:18:14 +00:00
2022-08-05 16:20:15 +00:00
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache1.jsonl ' ) select * from numbers(100) settings s3_truncate_on_insert=1 "
)
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-11 10:55:18 +00:00
run_describe_query (
instance , " test_cache1.jsonl " , storage_name , started_cluster , bucket
)
2022-08-05 16:20:15 +00:00
check_cache ( instance , [ " test_cache0.jsonl " , " test_cache1.jsonl " ] )
2022-08-11 10:55:18 +00:00
check_cache_misses (
instance , " test_cache1.jsonl " , storage_name , started_cluster , bucket
)
2022-06-21 13:02:48 +00:00
2022-08-11 10:55:18 +00:00
run_describe_query (
instance , " test_cache1.jsonl " , storage_name , started_cluster , bucket
)
check_cache_hits (
instance , " test_cache1.jsonl " , storage_name , started_cluster , bucket
)
2022-06-21 17:18:14 +00:00
2022-08-05 16:20:15 +00:00
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache2.jsonl ' ) select * from numbers(100) settings s3_truncate_on_insert=1 "
)
time . sleep ( 1 )
2022-06-21 13:02:48 +00:00
2022-08-11 10:55:18 +00:00
run_describe_query (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
2022-08-05 16:20:15 +00:00
check_cache ( instance , [ " test_cache1.jsonl " , " test_cache2.jsonl " ] )
2022-08-11 10:55:18 +00:00
check_cache_misses (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
check_cache_evictions (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
run_describe_query (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
check_cache_hits (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
run_describe_query (
instance , " test_cache1.jsonl " , storage_name , started_cluster , bucket
)
check_cache_hits (
instance , " test_cache1.jsonl " , storage_name , started_cluster , bucket
)
run_describe_query (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
2022-08-05 16:20:15 +00:00
check_cache ( instance , [ " test_cache0.jsonl " , " test_cache1.jsonl " ] )
2022-08-11 10:55:18 +00:00
check_cache_misses (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
check_cache_evictions (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
2022-08-05 16:20:15 +00:00
2022-08-11 10:55:18 +00:00
run_describe_query (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
2022-08-05 16:20:15 +00:00
check_cache ( instance , [ " test_cache0.jsonl " , " test_cache2.jsonl " ] )
2022-08-11 10:55:18 +00:00
check_cache_misses (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
2022-08-17 12:25:58 +00:00
check_cache_evictions (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
2022-08-05 16:20:15 +00:00
2022-08-11 10:55:18 +00:00
run_describe_query (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
check_cache_hits (
instance , " test_cache2.jsonl " , storage_name , started_cluster , bucket
)
2022-08-05 16:20:15 +00:00
2022-08-11 10:55:18 +00:00
run_describe_query (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
check_cache_hits (
instance , " test_cache0.jsonl " , storage_name , started_cluster , bucket
)
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache3.jsonl ' ) select * from numbers(100) settings s3_truncate_on_insert=1 "
)
time . sleep ( 1 )
2022-06-21 17:18:14 +00:00
2022-08-05 16:20:15 +00:00
files = " test_cache { 0,1,2,3}.jsonl "
run_describe_query ( instance , files , storage_name , started_cluster , bucket )
check_cache_hits ( instance , files , storage_name , started_cluster , bucket )
2022-06-21 17:18:14 +00:00
2022-08-05 16:20:15 +00:00
instance . query ( f " system drop schema cache for { storage_name } " )
check_cache ( instance , [ ] )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( instance , files , storage_name , started_cluster , bucket )
2024-05-23 12:11:30 +00:00
check_cache_misses ( instance , files , storage_name , started_cluster , bucket , 4 )
2022-06-28 16:13:42 +00:00
2022-08-05 16:20:15 +00:00
instance . query ( " system drop schema cache " )
check_cache ( instance , [ ] )
2022-06-21 13:02:48 +00:00
2022-08-05 16:20:15 +00:00
run_describe_query ( instance , files , storage_name , started_cluster , bucket )
2024-05-23 12:11:30 +00:00
check_cache_misses ( instance , files , storage_name , started_cluster , bucket , 4 )
2022-06-21 13:02:48 +00:00
2023-08-22 11:59:59 +00:00
instance . query ( " system drop schema cache " )
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache0.csv ' ) select * from numbers(100) settings s3_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = run_count_query (
instance , " test_cache0.csv " , storage_name , started_cluster , bucket
)
assert int ( res ) == 100
check_cache ( instance , [ " test_cache0.csv " ] )
check_cache_misses (
instance , " test_cache0.csv " , storage_name , started_cluster , bucket
)
res = run_count_query (
instance , " test_cache0.csv " , storage_name , started_cluster , bucket
)
assert int ( res ) == 100
check_cache_hits (
instance , " test_cache0.csv " , storage_name , started_cluster , bucket
)
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache0.csv ' ) select * from numbers(200) settings s3_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = run_count_query (
instance , " test_cache0.csv " , storage_name , started_cluster , bucket
)
assert int ( res ) == 200
check_cache_invalidations (
instance , " test_cache0.csv " , storage_name , started_cluster , bucket
)
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache1.csv ' ) select * from numbers(100) settings s3_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = run_count_query (
instance , " test_cache1.csv " , storage_name , started_cluster , bucket
)
assert int ( res ) == 100
check_cache ( instance , [ " test_cache0.csv " , " test_cache1.csv " ] )
check_cache_misses (
instance , " test_cache1.csv " , storage_name , started_cluster , bucket
)
res = run_count_query (
instance , " test_cache1.csv " , storage_name , started_cluster , bucket
)
assert int ( res ) == 100
check_cache_hits (
instance , " test_cache1.csv " , storage_name , started_cluster , bucket
)
res = run_count_query (
instance , " test_cache { 0,1}.csv " , storage_name , started_cluster , bucket
)
assert int ( res ) == 300
check_cache_hits (
instance , " test_cache { 0,1}.csv " , storage_name , started_cluster , bucket , 2
)
instance . query ( f " system drop schema cache for { storage_name } " )
check_cache ( instance , [ ] )
res = run_count_query (
instance , " test_cache { 0,1}.csv " , storage_name , started_cluster , bucket
)
assert int ( res ) == 300
check_cache_misses (
instance , " test_cache { 0,1}.csv " , storage_name , started_cluster , bucket , 2
)
instance . query ( f " system drop schema cache for { storage_name } " )
check_cache ( instance , [ ] )
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache.parquet ' ) select * from numbers(100) settings s3_truncate_on_insert=1 "
)
time . sleep ( 1 )
res = instance . query (
f " select count() from { storage_name } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache.parquet ' ) "
)
assert int ( res ) == 100
check_cache_misses (
instance , " test_cache.parquet " , storage_name , started_cluster , bucket
)
check_cache_hits (
instance , " test_cache.parquet " , storage_name , started_cluster , bucket
)
check_cahce_num_rows_hits (
instance , " test_cache.parquet " , storage_name , started_cluster , bucket
)
2022-08-05 16:20:15 +00:00
test ( " s3 " )
test ( " url " )
2022-09-13 13:07:43 +00:00
def test_ast_auth_headers ( started_cluster ) :
bucket = started_cluster . minio_restricted_bucket
instance = started_cluster . instances [ " s3_non_default " ] # type: ClickHouseInstance
filename = " test.csv "
result = instance . query_and_get_error (
2023-05-25 10:46:53 +00:00
f " select count() from s3( ' http://resolver:8080/ { bucket } / { filename } ' , ' CSV ' , ' dummy String ' ) "
2022-09-13 13:07:43 +00:00
)
2022-12-15 04:19:16 +00:00
assert " HTTP response code: 403 " in result
2022-09-22 11:11:35 +00:00
assert " S3_ERROR " in result
2022-09-13 13:07:43 +00:00
result = instance . query (
f " select * from s3( ' http://resolver:8080/ { bucket } / { filename } ' , ' CSV ' , headers(Authorization=`Bearer TOKEN`)) "
)
assert result . strip ( ) == " 1 \t 2 \t 3 "
2022-09-30 10:24:56 +00:00
def test_environment_credentials ( started_cluster ) :
bucket = started_cluster . minio_restricted_bucket
instance = started_cluster . instances [ " s3_with_environment_credentials " ]
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache3.jsonl ' ) select * from numbers(100) settings s3_truncate_on_insert=1 "
)
assert (
" 100 "
== instance . query (
f " select count() from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache3.jsonl ' ) "
) . strip ( )
)
2023-01-20 19:10:23 +00:00
2023-02-09 08:13:16 +00:00
# manually defined access key should override from env
with pytest . raises ( helpers . client . QueryRuntimeException ) as ei :
instance . query (
2023-02-09 08:26:36 +00:00
f " select count() from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_cache4.jsonl ' , ' aws ' , ' aws123 ' ) "
)
2023-02-09 08:13:16 +00:00
assert ei . value . returncode == 243
assert " HTTP response code: 403 " in ei . value . stderr
2023-01-20 19:10:23 +00:00
2023-02-09 08:26:36 +00:00
2023-01-20 19:10:23 +00:00
def test_s3_list_objects_failure ( started_cluster ) :
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ] # type: ClickHouseInstance
filename = " test_no_list_ {_partition_id} .csv "
put_query = f """
INSERT INTO TABLE FUNCTION
s3 ( ' http://resolver:8083/ {bucket} / {filename} ' , ' CSV ' , ' c1 UInt32 ' )
PARTITION BY c1 % 20
SELECT number FROM numbers ( 100 )
SETTINGS s3_truncate_on_insert = 1
"""
run_query ( instance , put_query )
T = 10
for _ in range ( 0 , T ) :
started_cluster . exec_in_container (
started_cluster . get_container_id ( " resolver " ) ,
[
" curl " ,
" -X " ,
" POST " ,
f " http://localhost:8083/reset_counters?max= { random . randint ( 1 , 15 ) } " ,
] ,
)
get_query = """
SELECT sleep ( { seconds } ) FROM s3 ( ' http://resolver:8083/ {bucket} /test_no_list_* ' , ' CSV ' , ' c1 UInt32 ' )
2023-01-23 01:18:58 +00:00
SETTINGS s3_list_object_keys_size = 1 , max_threads = { max_threads } , enable_s3_requests_logging = 1
2023-01-20 19:10:23 +00:00
""" .format(
bucket = bucket , seconds = random . random ( ) , max_threads = random . randint ( 2 , 20 )
)
with pytest . raises ( helpers . client . QueryRuntimeException ) as ei :
result = run_query ( instance , get_query )
print ( result )
assert ei . value . returncode == 243
assert " Could not list objects " in ei . value . stderr
2023-05-30 19:32:24 +00:00
def test_skip_empty_files ( started_cluster ) :
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files1.parquet ' , TSVRaw) select * from numbers(0) settings s3_truncate_on_insert=1 "
)
instance . query (
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files2.parquet ' ) select * from numbers(1) settings s3_truncate_on_insert=1 "
)
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
def test ( engine , setting ) :
instance . query_and_get_error (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files1.parquet ' ) settings { setting } =0 "
)
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
instance . query_and_get_error (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files1.parquet ' , auto, ' number UINt64 ' ) settings { setting } =0 "
)
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
instance . query_and_get_error (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files1.parquet ' ) settings { setting } =1 "
)
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
res = instance . query (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files1.parquet ' , auto, ' number UInt64 ' ) settings { setting } =1 "
)
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
assert len ( res ) == 0
instance . query_and_get_error (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files {{ 1,2 }} .parquet ' ) settings { setting } =0 "
)
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
instance . query_and_get_error (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files {{ 1,2 }} .parquet ' , auto, ' number UInt64 ' ) settings { setting } =0 "
)
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
res = instance . query (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files {{ 1,2 }} .parquet ' ) settings { setting } =1 "
)
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
assert int ( res ) == 0
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
res = instance . query (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files {{ 1,2 }} .parquet ' , auto, ' number UInt64 ' ) settings { setting } =1 "
)
2023-05-31 17:52:29 +00:00
2023-05-30 19:32:24 +00:00
assert int ( res ) == 0
test ( " s3 " , " s3_skip_empty_files " )
test ( " url " , " engine_url_skip_empty_files " )
2023-06-15 12:59:46 +00:00
res = instance . query (
f " select * from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files {{ 1|2 }} .parquet ' ) settings engine_url_skip_empty_files=1 "
)
assert int ( res ) == 0
res = instance . query (
2023-06-26 11:30:51 +00:00
f " select * from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /skip_empty_files {{ 11|1|22 }} .parquet ' , auto, ' number UInt64 ' ) settings engine_url_skip_empty_files=1 "
2023-06-15 12:59:46 +00:00
)
assert len ( res . strip ( ) ) == 0
2023-07-04 16:50:31 +00:00
def test_read_subcolumns ( started_cluster ) :
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.tsv ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) select ((1, 2), 3) SETTINGS s3_truncate_on_insert=1 "
2023-07-04 16:50:31 +00:00
)
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.jsonl ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) select ((1, 2), 3) SETTINGS s3_truncate_on_insert=1 "
2023-07-04 16:50:31 +00:00
)
res = instance . query (
f " select a.b.d, _path, a.b, _file, a.e from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.tsv ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
assert res == " 2 \t root/test_subcolumns.tsv \t (1,2) \t test_subcolumns.tsv \t 3 \n "
res = instance . query (
f " select a.b.d, _path, a.b, _file, a.e from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.jsonl ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
assert res == " 2 \t root/test_subcolumns.jsonl \t (1,2) \t test_subcolumns.jsonl \t 3 \n "
res = instance . query (
f " select x.b.d, _path, x.b, _file, x.e from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.jsonl ' , auto, ' x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
assert res == " 0 \t root/test_subcolumns.jsonl \t (0,0) \t test_subcolumns.jsonl \t 0 \n "
res = instance . query (
2024-06-10 21:15:22 +00:00
f " select x.b.d, _path, x.b, _file, x.e from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.jsonl ' , auto, ' x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42) ' ) "
2023-07-04 16:50:31 +00:00
)
2024-06-10 21:15:22 +00:00
assert res == " 42 \t root/test_subcolumns.jsonl \t (42,42) \t test_subcolumns.jsonl \t 42 \n "
2023-07-04 16:50:31 +00:00
res = instance . query (
f " select a.b.d, _path, a.b, _file, a.e from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.tsv ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
assert res == " 2 \t /root/test_subcolumns.tsv \t (1,2) \t test_subcolumns.tsv \t 3 \n "
res = instance . query (
f " select a.b.d, _path, a.b, _file, a.e from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.jsonl ' , auto, ' a Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
assert res == " 2 \t /root/test_subcolumns.jsonl \t (1,2) \t test_subcolumns.jsonl \t 3 \n "
res = instance . query (
f " select x.b.d, _path, x.b, _file, x.e from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.jsonl ' , auto, ' x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) ' ) "
)
assert res == " 0 \t /root/test_subcolumns.jsonl \t (0,0) \t test_subcolumns.jsonl \t 0 \n "
res = instance . query (
f " select x.b.d, _path, x.b, _file, x.e from url( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumns.jsonl ' , auto, ' x Tuple(b Tuple(c UInt32, d UInt32), e UInt32) default ((42, 42), 42) ' ) "
)
assert (
res == " 42 \t /root/test_subcolumns.jsonl \t (42,42) \t test_subcolumns.jsonl \t 42 \n "
)
2023-08-22 16:51:09 +00:00
2024-06-06 21:21:12 +00:00
2024-06-10 22:43:32 +00:00
def test_read_subcolumn_time ( started_cluster ) :
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumn_time.tsv ' , auto, ' a UInt32 ' ) select (42) SETTINGS s3_truncate_on_insert=1 "
2024-06-10 22:43:32 +00:00
)
res = instance . query (
2024-06-11 12:50:16 +00:00
f " select a, dateDiff( ' minute ' , _time, now()) < 59 from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_subcolumn_time.tsv ' , auto, ' a UInt32 ' ) "
2024-06-10 22:43:32 +00:00
)
2024-06-11 12:50:16 +00:00
assert res == " 42 \t 1 \n "
2023-08-22 16:51:09 +00:00
2024-06-06 21:21:12 +00:00
2023-08-22 16:51:09 +00:00
def test_filtering_by_file_or_path ( started_cluster ) :
2024-07-23 14:43:49 +00:00
id = uuid . uuid4 ( )
2023-08-22 16:51:09 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_filter1.tsv ' , auto, ' x UInt64 ' ) select 1 SETTINGS s3_truncate_on_insert=1 "
2023-08-22 16:51:09 +00:00
)
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_filter2.tsv ' , auto, ' x UInt64 ' ) select 2 SETTINGS s3_truncate_on_insert=1 "
2023-08-22 16:51:09 +00:00
)
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_filter3.tsv ' , auto, ' x UInt64 ' ) select 3 SETTINGS s3_truncate_on_insert=1 "
2023-08-22 16:51:09 +00:00
)
instance . query (
2024-07-23 14:43:49 +00:00
f " select count(), ' { id } ' from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_filter*.tsv ' ) where _file = ' test_filter1.tsv ' "
2023-08-22 16:51:09 +00:00
)
instance . query ( " SYSTEM FLUSH LOGS " )
result = instance . query (
2024-07-23 14:43:49 +00:00
f " SELECT ProfileEvents[ ' EngineFileLikeReadFiles ' ] FROM system.query_log WHERE query like ' % { id } % ' AND type= ' QueryFinish ' "
2023-08-22 16:51:09 +00:00
)
assert int ( result ) == 1
2023-10-20 20:46:41 +00:00
2024-06-21 14:53:45 +00:00
assert 0 == int (
instance . query (
f " select count() from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_filter*.tsv ' ) where _file = ' kek ' "
)
)
2024-06-21 14:46:03 +00:00
2023-10-20 20:46:41 +00:00
def test_union_schema_inference_mode ( started_cluster ) :
2024-07-23 14:43:49 +00:00
id = uuid . uuid4 ( )
2023-10-20 20:46:41 +00:00
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " s3_non_default " ]
2024-07-23 14:43:49 +00:00
file_name_prefix = f " test_union_schema_inference_ { id } _ "
2023-10-20 20:46:41 +00:00
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file_name_prefix } 1.jsonl ' ) select 1 as a SETTINGS s3_truncate_on_insert=1 "
2023-10-20 20:46:41 +00:00
)
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file_name_prefix } 2.jsonl ' ) select 2 as b SETTINGS s3_truncate_on_insert=1 "
2023-10-20 20:46:41 +00:00
)
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file_name_prefix } 3.jsonl ' ) select 2 as c SETTINGS s3_truncate_on_insert=1 "
2023-10-20 20:46:41 +00:00
)
instance . query (
2024-07-23 14:43:49 +00:00
f " insert into function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file_name_prefix } 4.jsonl ' , TSV) select ' Error ' SETTINGS s3_truncate_on_insert=1 "
2023-10-20 20:46:41 +00:00
)
for engine in [ " s3 " , " url " ] :
instance . query ( " system drop schema cache for s3 " )
result = instance . query (
2024-07-23 14:43:49 +00:00
f " desc { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file_name_prefix } {{ 1,2,3 }} .jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
assert result == " a \t Nullable(Int64) \n b \t Nullable(Int64) \n c \t Nullable(Int64) \n "
result = instance . query (
2024-07-23 14:43:49 +00:00
f " select schema_inference_mode, splitByChar( ' / ' , source)[-1] as file, schema from system.schema_inference_cache where source like ' % { file_name_prefix } % ' order by file format TSV "
2023-10-20 20:46:41 +00:00
)
assert (
2024-07-23 14:43:49 +00:00
result == f " UNION \t { file_name_prefix } 1.jsonl \t a Nullable(Int64) \n "
f " UNION \t { file_name_prefix } 2.jsonl \t b Nullable(Int64) \n "
f " UNION \t { file_name_prefix } 3.jsonl \t c Nullable(Int64) \n "
2023-10-20 20:46:41 +00:00
)
result = instance . query (
2024-07-23 14:43:49 +00:00
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file_name_prefix } {{ 1,2,3 }} .jsonl ' ) order by tuple(*) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
assert result == " 1 \t \\ N \t \\ N \n " " \\ N \t 2 \t \\ N \n " " \\ N \t \\ N \t 2 \n "
instance . query ( f " system drop schema cache for { engine } " )
result = instance . query (
2024-07-23 14:43:49 +00:00
f " desc { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file_name_prefix } 2.jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
assert result == " b \t Nullable(Int64) \n "
result = instance . query (
2024-07-23 14:43:49 +00:00
f " desc { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file_name_prefix } {{ 1,2,3 }} .jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
assert (
result == " a \t Nullable(Int64) \n "
" b \t Nullable(Int64) \n "
" c \t Nullable(Int64) \n "
)
error = instance . query_and_get_error (
2024-07-23 14:43:49 +00:00
f " desc { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } / { file_name_prefix } {{ 1,2,3,4 }} .jsonl ' ) settings schema_inference_mode= ' union ' , describe_compact_output=1 format TSV "
2023-10-20 20:46:41 +00:00
)
2024-01-24 17:55:31 +00:00
assert " CANNOT_EXTRACT_TABLE_STRUCTURE " in error
2024-01-22 22:55:50 +00:00
def test_s3_format_detection ( started_cluster ) :
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query (
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_format_detection0 ' , ' JSONEachRow ' , ' x UInt64, y String ' ) select number, ' str_ ' || toString(number) from numbers(0) settings s3_truncate_on_insert=1 "
)
instance . query (
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_format_detection1 ' , ' JSONEachRow ' , ' x UInt64, y String ' ) select number, ' str_ ' || toString(number) from numbers(5) settings s3_truncate_on_insert=1 "
)
expected_result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_format_detection1 ' , ' JSONEachRow ' , ' x UInt64, y String ' ) "
)
expected_desc_result = instance . query (
f " desc s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_format_detection1 ' , ' JSONEachRow ' ) "
)
for engine in [ " s3 " , " url " ] :
desc_result = instance . query (
f " desc { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_format_detection1 ' ) "
)
assert desc_result == expected_desc_result
result = instance . query (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_format_detection1 ' ) "
)
assert result == expected_result
result = instance . query (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_format_detection1 ' , auto, ' x UInt64, y String ' ) "
)
assert result == expected_result
result = instance . query (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_format_detection {{ 0,1 }} ' , auto, ' x UInt64, y String ' ) "
)
assert result == expected_result
instance . query ( f " system drop schema cache for { engine } " )
result = instance . query (
f " select * from { engine } ( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_format_detection {{ 0,1 }} ' , auto, ' x UInt64, y String ' ) "
)
assert result == expected_result
2024-04-08 20:18:47 +00:00
def test_respect_object_existence_on_partitioned_write ( started_cluster ) :
bucket = started_cluster . minio_bucket
instance = started_cluster . instances [ " dummy " ]
instance . query (
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_partitioned_write42.csv ' , ' CSV ' , ' x UInt64 ' ) select 42 settings s3_truncate_on_insert=1 "
)
result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_partitioned_write42.csv ' ) "
)
assert int ( result ) == 42
error = instance . query_and_get_error (
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_partitioned_write {{ _partition_id }} .csv ' , ' CSV ' , ' x UInt64 ' ) partition by 42 select 42 settings s3_truncate_on_insert=0 "
)
assert " BAD_ARGUMENTS " in error
instance . query (
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_partitioned_write {{ _partition_id }} .csv ' , ' CSV ' , ' x UInt64 ' ) partition by 42 select 43 settings s3_truncate_on_insert=1 "
)
result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_partitioned_write42.csv ' ) "
)
assert int ( result ) == 43
instance . query (
f " insert into table function s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_partitioned_write {{ _partition_id }} .csv ' , ' CSV ' , ' x UInt64 ' ) partition by 42 select 44 settings s3_truncate_on_insert=0, s3_create_new_file_on_insert=1 "
)
result = instance . query (
f " select * from s3( ' http:// { started_cluster . minio_host } : { started_cluster . minio_port } / { bucket } /test_partitioned_write42.1.csv ' ) "
)
assert int ( result ) == 44