2019-09-19 09:34:33 +00:00
import json
import logging
2019-06-26 00:41:14 +00:00
import pytest
2019-11-20 11:56:38 +00:00
from helpers . cluster import ClickHouseCluster , ClickHouseInstance
2019-09-19 09:34:33 +00:00
2019-12-01 11:24:55 +00:00
import helpers . client
2019-09-19 09:34:33 +00:00
logging . getLogger ( ) . setLevel ( logging . INFO )
logging . getLogger ( ) . addHandler ( logging . StreamHandler ( ) )
2019-11-20 11:56:38 +00:00
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket ( cluster ) :
minio_client = cluster . minio_client
if minio_client . bucket_exists ( cluster . minio_bucket ) :
minio_client . remove_bucket ( cluster . minio_bucket )
minio_client . make_bucket ( cluster . minio_bucket )
# Allows read-write access for bucket without authorization.
bucket_read_write_policy = { " Version " : " 2012-10-17 " ,
" Statement " : [
{
" Sid " : " " ,
" Effect " : " Allow " ,
" Principal " : { " AWS " : " * " } ,
" Action " : " s3:GetBucketLocation " ,
" Resource " : " arn:aws:s3:::root "
} ,
{
" Sid " : " " ,
" Effect " : " Allow " ,
" Principal " : { " AWS " : " * " } ,
" Action " : " s3:ListBucket " ,
" Resource " : " arn:aws:s3:::root "
} ,
{
" Sid " : " " ,
" Effect " : " Allow " ,
" Principal " : { " AWS " : " * " } ,
" Action " : " s3:GetObject " ,
" Resource " : " arn:aws:s3:::root/* "
} ,
{
" Sid " : " " ,
" Effect " : " Allow " ,
" Principal " : { " AWS " : " * " } ,
" Action " : " s3:PutObject " ,
" Resource " : " arn:aws:s3:::root/* "
}
] }
minio_client . set_bucket_policy ( cluster . minio_bucket , json . dumps ( bucket_read_write_policy ) )
2019-12-01 11:24:55 +00:00
cluster . minio_restricted_bucket = " {} -with-auth " . format ( cluster . minio_bucket )
if minio_client . bucket_exists ( cluster . minio_restricted_bucket ) :
minio_client . remove_bucket ( cluster . minio_restricted_bucket )
minio_client . make_bucket ( cluster . minio_restricted_bucket )
2019-11-20 11:56:38 +00:00
# Returns content of given S3 file as string.
2019-12-01 11:24:55 +00:00
def get_s3_file_content ( cluster , bucket , filename ) :
2019-11-20 11:56:38 +00:00
# type: (ClickHouseCluster, str) -> str
2019-12-01 11:24:55 +00:00
data = cluster . minio_client . get_object ( bucket , filename )
2019-11-20 11:56:38 +00:00
data_str = " "
for chunk in data . stream ( ) :
data_str + = chunk
return data_str
2019-09-19 09:34:33 +00:00
2019-11-21 13:13:38 +00:00
# Returns nginx access log lines.
def get_nginx_access_logs ( ) :
handle = open ( " /nginx/access.log " , " r " )
data = handle . readlines ( )
handle . close ( )
return data
2019-06-26 00:41:14 +00:00
@pytest.fixture ( scope = " module " )
2019-11-20 11:56:38 +00:00
def cluster ( ) :
2019-06-26 00:41:14 +00:00
try :
cluster = ClickHouseCluster ( __file__ )
2019-12-03 17:36:02 +00:00
cluster . add_instance ( " restricted_dummy " , main_configs = [ " configs/config_for_test_remote_host_filter.xml " ] , with_minio = True )
2019-11-20 11:56:38 +00:00
cluster . add_instance ( " dummy " , with_minio = True )
logging . info ( " Starting cluster... " )
2019-06-26 00:41:14 +00:00
cluster . start ( )
2019-11-20 11:56:38 +00:00
logging . info ( " Cluster started " )
2019-09-19 09:34:33 +00:00
2019-11-20 11:56:38 +00:00
prepare_s3_bucket ( cluster )
logging . info ( " S3 bucket created " )
2019-09-19 09:34:33 +00:00
2019-06-26 00:41:14 +00:00
yield cluster
finally :
cluster . shutdown ( )
2019-09-24 10:58:42 +00:00
def run_query ( instance , query , stdin = None , settings = None ) :
2019-11-20 11:56:38 +00:00
# type: (ClickHouseInstance, str, object, dict) -> str
2019-09-22 10:42:47 +00:00
logging . info ( " Running query ' {} ' ... " . format ( query ) )
2019-09-24 10:58:42 +00:00
result = instance . query ( query , stdin = stdin , settings = settings )
2019-09-22 10:42:47 +00:00
logging . info ( " Query finished " )
2019-06-26 00:41:14 +00:00
2019-11-20 11:56:38 +00:00
return result
2019-09-22 10:42:47 +00:00
2019-11-20 11:56:38 +00:00
# Test simple put.
2019-12-01 11:24:55 +00:00
@pytest.mark.parametrize ( " maybe_auth,positive " , [
( " " , True ) ,
( " ' minio ' , ' minio123 ' , " , True ) ,
( " ' wrongid ' , ' wrongkey ' , " , False )
] )
def test_put ( cluster , maybe_auth , positive ) :
2019-11-20 11:56:38 +00:00
# type: (ClickHouseCluster) -> None
2019-09-19 09:34:33 +00:00
2019-12-01 11:24:55 +00:00
bucket = cluster . minio_bucket if not maybe_auth else cluster . minio_restricted_bucket
2019-11-20 11:56:38 +00:00
instance = cluster . instances [ " dummy " ] # type: ClickHouseInstance
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2019-09-22 10:42:47 +00:00
values = " (1, 2, 3), (3, 2, 1), (78, 43, 45) "
2019-11-20 11:56:38 +00:00
values_csv = " 1,2,3 \n 3,2,1 \n 78,43,45 \n "
filename = " test.csv "
2019-12-01 11:24:55 +00:00
put_query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , {} ' CSV ' , ' {} ' ) values {} " . format (
cluster . minio_host , cluster . minio_port , bucket , filename , maybe_auth , table_format , values )
2019-11-20 11:56:38 +00:00
2019-12-01 11:24:55 +00:00
try :
run_query ( instance , put_query )
except helpers . client . QueryRuntimeException :
assert not positive
else :
assert positive
assert values_csv == get_s3_file_content ( cluster , bucket , filename )
2019-11-20 11:56:38 +00:00
# Test put values in CSV format.
2019-12-01 11:24:55 +00:00
@pytest.mark.parametrize ( " maybe_auth,positive " , [
( " " , True ) ,
( " ' minio ' , ' minio123 ' , " , True ) ,
( " ' wrongid ' , ' wrongkey ' , " , False )
] )
def test_put_csv ( cluster , maybe_auth , positive ) :
2019-11-20 11:56:38 +00:00
# type: (ClickHouseCluster) -> None
2019-12-01 11:24:55 +00:00
bucket = cluster . minio_bucket if not maybe_auth else cluster . minio_restricted_bucket
2019-11-20 11:56:38 +00:00
instance = cluster . instances [ " dummy " ] # type: ClickHouseInstance
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
filename = " test.csv "
2019-12-01 11:24:55 +00:00
put_query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , {} ' CSV ' , ' {} ' ) format CSV " . format (
cluster . minio_host , cluster . minio_port , bucket , filename , maybe_auth , table_format )
2019-09-22 10:42:47 +00:00
csv_data = " 8,9,16 \n 11,18,13 \n 22,14,2 \n "
2019-11-20 11:56:38 +00:00
2019-12-01 11:24:55 +00:00
try :
run_query ( instance , put_query , stdin = csv_data )
except helpers . client . QueryRuntimeException :
assert not positive
else :
assert positive
assert csv_data == get_s3_file_content ( cluster , bucket , filename )
2019-11-20 11:56:38 +00:00
# Test put and get with S3 server redirect.
def test_put_get_with_redirect ( cluster ) :
# type: (ClickHouseCluster) -> None
2019-12-01 11:24:55 +00:00
bucket = cluster . minio_bucket
2019-11-20 11:56:38 +00:00
instance = cluster . instances [ " dummy " ] # type: ClickHouseInstance
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
values = " (1, 1, 1), (1, 1, 1), (11, 11, 11) "
values_csv = " 1,1,1 \n 1,1,1 \n 11,11,11 \n "
filename = " test.csv "
query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) values {} " . format (
2019-12-01 11:24:55 +00:00
cluster . minio_redirect_host , cluster . minio_redirect_port , bucket , filename , table_format , values )
2019-09-19 09:34:33 +00:00
run_query ( instance , query )
2019-12-01 11:24:55 +00:00
assert values_csv == get_s3_file_content ( cluster , bucket , filename )
2019-11-20 11:56:38 +00:00
query = " select *, column1*column2*column3 from s3( ' http:// {} : {} / {} / {} ' , ' CSV ' , ' {} ' ) " . format (
2019-12-01 11:24:55 +00:00
cluster . minio_redirect_host , cluster . minio_redirect_port , bucket , filename , table_format )
2019-09-19 09:34:33 +00:00
stdout = run_query ( instance , query )
2019-11-20 11:56:38 +00:00
2019-09-19 09:34:33 +00:00
assert list ( map ( str . split , stdout . splitlines ( ) ) ) == [
2019-09-22 10:42:47 +00:00
[ " 1 " , " 1 " , " 1 " , " 1 " ] ,
[ " 1 " , " 1 " , " 1 " , " 1 " ] ,
[ " 11 " , " 11 " , " 11 " , " 1331 " ] ,
2019-06-26 00:41:14 +00:00
]
2019-11-20 11:56:38 +00:00
# Test multipart put.
2019-12-01 11:24:55 +00:00
@pytest.mark.parametrize ( " maybe_auth,positive " , [
( " " , True ) ,
( " ' minio ' , ' minio123 ' , " , True ) ,
( " ' wrongid ' , ' wrongkey ' , " , False )
] )
def test_multipart_put ( cluster , maybe_auth , positive ) :
2019-11-20 11:56:38 +00:00
# type: (ClickHouseCluster) -> None
2019-12-01 11:24:55 +00:00
bucket = cluster . minio_bucket if not maybe_auth else cluster . minio_restricted_bucket
2019-11-20 11:56:38 +00:00
instance = cluster . instances [ " dummy " ] # type: ClickHouseInstance
table_format = " column1 UInt32, column2 UInt32, column3 UInt32 "
# Minimum size of part is 5 Mb for Minio.
# See: https://github.com/minio/minio/blob/master/docs/minio-limits.md
2019-11-21 13:13:38 +00:00
min_part_size_bytes = 5 * 1024 * 1024
csv_size_bytes = int ( min_part_size_bytes * 1.5 ) # To have 2 parts.
one_line_length = 6 # 3 digits, 2 commas, 1 line separator.
# Generate data having size more than one part
int_data = [ [ 1 , 2 , 3 ] for i in range ( csv_size_bytes / one_line_length ) ]
csv_data = " " . join ( [ " {} , {} , {} \n " . format ( x , y , z ) for x , y , z in int_data ] )
2019-11-20 11:56:38 +00:00
2019-11-21 13:13:38 +00:00
assert len ( csv_data ) > min_part_size_bytes
filename = " test_multipart.csv "
2019-12-01 11:24:55 +00:00
put_query = " insert into table function s3( ' http:// {} : {} / {} / {} ' , {} ' CSV ' , ' {} ' ) format CSV " . format (
cluster . minio_redirect_host , cluster . minio_redirect_port , bucket , filename , maybe_auth , table_format )
2019-09-14 07:44:46 +00:00
2019-12-01 11:24:55 +00:00
try :
run_query ( instance , put_query , stdin = csv_data , settings = { ' s3_min_upload_part_size ' : min_part_size_bytes } )
except helpers . client . QueryRuntimeException :
assert not positive
else :
assert positive
# Use Nginx access logs to count number of parts uploaded to Minio.
nginx_logs = get_nginx_access_logs ( )
uploaded_parts = filter ( lambda log_line : log_line . find ( filename ) > = 0 and log_line . find ( " PUT " ) > = 0 , nginx_logs )
assert uploaded_parts > 1
assert csv_data == get_s3_file_content ( cluster , bucket , filename )
2019-11-06 17:06:50 +00:00
2019-12-03 17:36:02 +00:00
def test_remote_host_filter ( cluster ) :
instance = cluster . instances [ " restricted_dummy " ]
2019-11-06 17:06:50 +00:00
format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2019-12-03 17:36:02 +00:00
query = " select *, column1*column2*column3 from s3( ' http:// {} : {} / ' , ' CSV ' , ' {} ' ) " . format ( " invalid_host " , cluster . minio_redirect_port , format )
2019-11-06 17:06:50 +00:00
assert " not allowed in config.xml " in instance . query_and_get_error ( query )
other_values = " (1, 1, 1), (1, 1, 1), (11, 11, 11) "
2019-12-03 17:36:02 +00:00
query = " insert into table function s3( ' http:// {} : {} / {} /test.csv ' , ' CSV ' , ' {} ' ) values {} " . format ( " invalid_host " , cluster . minio_port , cluster . minio_bucket , format , other_values )
2019-11-06 17:06:50 +00:00
assert " not allowed in config.xml " in instance . query_and_get_error ( query )