2019-09-19 09:34:33 +00:00
import httplib
import json
import logging
import os
import time
import traceback
2019-06-26 00:41:14 +00:00
import pytest
from helpers . cluster import ClickHouseCluster
2019-09-19 09:34:33 +00:00
logging . getLogger ( ) . setLevel ( logging . INFO )
logging . getLogger ( ) . addHandler ( logging . StreamHandler ( ) )
def get_communication_data ( started_cluster ) :
2019-09-22 10:42:47 +00:00
conn = httplib . HTTPConnection ( started_cluster . instances [ " dummy " ] . ip_address , started_cluster . communication_port )
2019-09-19 09:34:33 +00:00
conn . request ( " GET " , " / " )
r = conn . getresponse ( )
raw_data = r . read ( )
conn . close ( )
return json . loads ( raw_data )
def put_communication_data ( started_cluster , body ) :
2019-09-22 10:42:47 +00:00
conn = httplib . HTTPConnection ( started_cluster . instances [ " dummy " ] . ip_address , started_cluster . communication_port )
2019-09-19 09:34:33 +00:00
conn . request ( " PUT " , " / " , body )
r = conn . getresponse ( )
conn . close ( )
2019-06-26 00:41:14 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster = ClickHouseCluster ( __file__ )
2019-09-23 12:41:59 +00:00
instance = cluster . add_instance ( " dummy " )
2019-06-26 00:41:14 +00:00
cluster . start ( )
2019-09-19 09:34:33 +00:00
cluster . communication_port = 10000
2019-09-22 10:42:47 +00:00
instance . copy_file_to_container ( os . path . join ( os . path . dirname ( __file__ ) , " test_server.py " ) , " test_server.py " )
cluster . bucket = " abc "
instance . exec_in_container ( [ " python " , " test_server.py " , str ( cluster . communication_port ) , cluster . bucket ] , detach = True )
2019-09-19 09:34:33 +00:00
cluster . mock_host = instance . ip_address
for i in range ( 10 ) :
try :
data = get_communication_data ( cluster )
2019-09-22 10:42:47 +00:00
cluster . redirecting_to_http_port = data [ " redirecting_to_http_port " ]
cluster . preserving_data_port = data [ " preserving_data_port " ]
cluster . multipart_preserving_data_port = data [ " multipart_preserving_data_port " ]
cluster . redirecting_preserving_data_port = data [ " redirecting_preserving_data_port " ]
2019-09-19 09:34:33 +00:00
except :
logging . error ( traceback . format_exc ( ) )
time . sleep ( 0.5 )
else :
break
else :
2019-09-22 10:42:47 +00:00
assert False , " Could not initialize mock server "
2019-09-19 09:34:33 +00:00
2019-06-26 00:41:14 +00:00
yield cluster
finally :
cluster . shutdown ( )
2019-09-19 09:34:33 +00:00
def run_query ( instance , query , stdin = None ) :
2019-09-22 10:42:47 +00:00
logging . info ( " Running query ' {} ' ... " . format ( query ) )
2019-09-19 09:34:33 +00:00
result = instance . query ( query , stdin = stdin )
2019-09-22 10:42:47 +00:00
logging . info ( " Query finished " )
2019-09-19 09:34:33 +00:00
return result
2019-06-26 00:41:14 +00:00
2019-09-22 10:42:47 +00:00
2019-09-19 09:34:33 +00:00
def test_get_with_redirect ( started_cluster ) :
2019-09-22 10:42:47 +00:00
instance = started_cluster . instances [ " dummy " ]
format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2019-07-02 23:17:00 +00:00
2019-09-22 10:42:47 +00:00
put_communication_data ( started_cluster , " === Get with redirect test === " )
2019-09-19 09:34:33 +00:00
query = " select *, column1*column2*column3 from s3( ' http:// {} : {} / ' , ' CSV ' , ' {} ' ) " . format ( started_cluster . mock_host , started_cluster . redirecting_to_http_port , format )
stdout = run_query ( instance , query )
2019-09-22 11:03:02 +00:00
data = get_communication_data ( started_cluster )
expected = [ [ str ( row [ 0 ] ) , str ( row [ 1 ] ) , str ( row [ 2 ] ) , str ( row [ 0 ] * row [ 1 ] * row [ 2 ] ) ] for row in data [ " redirect_csv_data " ] ]
assert list ( map ( str . split , stdout . splitlines ( ) ) ) == expected
2019-06-26 00:41:14 +00:00
2019-09-22 10:42:47 +00:00
2019-09-19 09:34:33 +00:00
def test_put ( started_cluster ) :
2019-09-22 10:42:47 +00:00
instance = started_cluster . instances [ " dummy " ]
format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2019-09-19 09:34:33 +00:00
2019-09-22 10:42:47 +00:00
logging . info ( " Phase 3 " )
put_communication_data ( started_cluster , " === Put test === " )
values = " (1, 2, 3), (3, 2, 1), (78, 43, 45) "
2019-09-19 09:34:33 +00:00
put_query = " insert into table function s3( ' http:// {} : {} / {} /test.csv ' , ' CSV ' , ' {} ' ) values {} " . format ( started_cluster . mock_host , started_cluster . preserving_data_port , started_cluster . bucket , format , values )
run_query ( instance , put_query )
data = get_communication_data ( started_cluster )
2019-09-22 10:42:47 +00:00
received_data_completed = data [ " received_data_completed " ]
received_data = data [ " received_data " ]
finalize_data = data [ " finalize_data " ]
finalize_data_query = data [ " finalize_data_query " ]
assert received_data [ - 1 ] . decode ( ) == " 1,2,3 \n 3,2,1 \n 78,43,45 \n "
2019-09-19 09:34:33 +00:00
assert received_data_completed
2019-09-22 10:42:47 +00:00
assert finalize_data == " <CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>hello-etag</ETag></Part></CompleteMultipartUpload> "
assert finalize_data_query == " uploadId=TEST "
2019-06-26 00:41:14 +00:00
2019-09-19 09:34:33 +00:00
def test_put_csv ( started_cluster ) :
2019-09-22 10:42:47 +00:00
instance = started_cluster . instances [ " dummy " ]
format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2019-09-19 09:34:33 +00:00
2019-09-22 10:42:47 +00:00
put_communication_data ( started_cluster , " === Put test CSV === " )
2019-09-19 09:34:33 +00:00
put_query = " insert into table function s3( ' http:// {} : {} / {} /test.csv ' , ' CSV ' , ' {} ' ) format CSV " . format ( started_cluster . mock_host , started_cluster . preserving_data_port , started_cluster . bucket , format )
2019-09-22 10:42:47 +00:00
csv_data = " 8,9,16 \n 11,18,13 \n 22,14,2 \n "
2019-09-19 09:34:33 +00:00
run_query ( instance , put_query , stdin = csv_data )
data = get_communication_data ( started_cluster )
2019-09-22 10:42:47 +00:00
received_data_completed = data [ " received_data_completed " ]
received_data = data [ " received_data " ]
finalize_data = data [ " finalize_data " ]
finalize_data_query = data [ " finalize_data_query " ]
2019-09-19 09:34:33 +00:00
assert received_data [ - 1 ] . decode ( ) == csv_data
assert received_data_completed
2019-09-22 10:42:47 +00:00
assert finalize_data == " <CompleteMultipartUpload><Part><PartNumber>1</PartNumber><ETag>hello-etag</ETag></Part></CompleteMultipartUpload> "
assert finalize_data_query == " uploadId=TEST "
2019-06-26 00:41:14 +00:00
2019-09-19 09:34:33 +00:00
def test_put_with_redirect ( started_cluster ) :
2019-09-22 10:42:47 +00:00
instance = started_cluster . instances [ " dummy " ]
format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2019-09-19 09:34:33 +00:00
2019-09-22 10:42:47 +00:00
put_communication_data ( started_cluster , " === Put with redirect test === " )
other_values = " (1, 1, 1), (1, 1, 1), (11, 11, 11) "
2019-09-19 09:34:33 +00:00
query = " insert into table function s3( ' http:// {} : {} / {} /test.csv ' , ' CSV ' , ' {} ' ) values {} " . format ( started_cluster . mock_host , started_cluster . redirecting_preserving_data_port , started_cluster . bucket , format , other_values )
run_query ( instance , query )
query = " select *, column1*column2*column3 from s3( ' http:// {} : {} / {} /test.csv ' , ' CSV ' , ' {} ' ) " . format ( started_cluster . mock_host , started_cluster . preserving_data_port , started_cluster . bucket , format )
stdout = run_query ( instance , query )
assert list ( map ( str . split , stdout . splitlines ( ) ) ) == [
2019-09-22 10:42:47 +00:00
[ " 1 " , " 1 " , " 1 " , " 1 " ] ,
[ " 1 " , " 1 " , " 1 " , " 1 " ] ,
[ " 11 " , " 11 " , " 11 " , " 1331 " ] ,
2019-06-26 00:41:14 +00:00
]
2019-09-19 09:34:33 +00:00
data = get_communication_data ( started_cluster )
2019-09-22 10:42:47 +00:00
received_data = data [ " received_data " ]
assert received_data [ - 1 ] . decode ( ) == " 1,1,1 \n 1,1,1 \n 11,11,11 \n "
2019-09-19 09:34:33 +00:00
def test_multipart_put ( started_cluster ) :
2019-09-22 10:42:47 +00:00
instance = started_cluster . instances [ " dummy " ]
format = " column1 UInt32, column2 UInt32, column3 UInt32 "
2019-09-14 07:44:46 +00:00
2019-09-22 10:42:47 +00:00
put_communication_data ( started_cluster , " === Multipart test === " )
2019-09-19 09:34:33 +00:00
long_data = [ [ i , i + 1 , i + 2 ] for i in range ( 100000 ) ]
2019-09-22 10:42:47 +00:00
long_values = " " . join ( [ " {} , {} , {} \n " . format ( x , y , z ) for x , y , z in long_data ] )
2019-09-23 12:41:59 +00:00
put_query = " set s3_min_upload_part_size = 1000000; insert into table function s3( ' http:// {} : {} / {} /test.csv ' , ' CSV ' , ' {} ' ) format CSV " . format ( started_cluster . mock_host , started_cluster . multipart_preserving_data_port , started_cluster . bucket , format )
2019-09-19 09:34:33 +00:00
run_query ( instance , put_query , stdin = long_values )
data = get_communication_data ( started_cluster )
2019-09-22 10:42:47 +00:00
assert " multipart_received_data " in data
received_data = data [ " multipart_received_data " ]
assert received_data [ - 1 ] . decode ( ) == " " . join ( [ " {} , {} , {} \n " . format ( x , y , z ) for x , y , z in long_data ] )
assert 1 < data [ " multipart_parts " ] < 10000