2020-09-21 22:12:55 +00:00
import os
import pytest
2020-10-11 02:19:16 +00:00
import sys
2020-11-02 00:47:43 +00:00
import time
2022-02-08 13:15:56 +00:00
import pytz
import uuid
2020-09-21 22:12:55 +00:00
import grpc
2021-01-22 14:27:23 +00:00
from helpers . cluster import ClickHouseCluster , run_and_check
2020-10-28 08:09:08 +00:00
from threading import Thread
2021-12-15 21:06:17 +00:00
import gzip
import lz4 . frame
2020-09-21 22:12:55 +00:00
2021-08-06 10:55:49 +00:00
GRPC_PORT = 9100
2020-09-21 22:12:55 +00:00
SCRIPT_DIR = os . path . dirname ( os . path . realpath ( __file__ ) )
2021-08-06 10:55:49 +00:00
DEFAULT_ENCODING = ' utf-8 '
2020-10-11 02:19:16 +00:00
2020-10-05 20:33:34 +00:00
2020-10-11 02:19:16 +00:00
# Use grpcio-tools to generate *pb2.py files from *.proto.
2020-10-05 20:33:34 +00:00
2020-09-21 22:12:55 +00:00
proto_dir = os . path . join ( SCRIPT_DIR , ' ./protos ' )
2020-10-05 20:33:34 +00:00
gen_dir = os . path . join ( SCRIPT_DIR , ' ./_gen ' )
os . makedirs ( gen_dir , exist_ok = True )
2021-01-22 14:27:23 +00:00
run_and_check (
2020-10-05 20:33:34 +00:00
' python3 -m grpc_tools.protoc -I {proto_dir} --python_out= {gen_dir} --grpc_python_out= {gen_dir} \
{ proto_dir } / clickhouse_grpc . proto ' .format(proto_dir=proto_dir, gen_dir=gen_dir), shell=True)
2020-10-11 02:19:16 +00:00
2020-10-05 20:33:34 +00:00
sys . path . append ( gen_dir )
2020-10-11 02:19:16 +00:00
import clickhouse_grpc_pb2
import clickhouse_grpc_pb2_grpc
2020-09-21 22:12:55 +00:00
2020-10-05 20:33:34 +00:00
# Utilities
2020-10-11 02:19:16 +00:00
config_dir = os . path . join ( SCRIPT_DIR , ' ./configs ' )
cluster = ClickHouseCluster ( __file__ )
2020-12-07 19:02:18 +00:00
node = cluster . add_instance ( ' node ' , main_configs = [ ' configs/grpc_config.xml ' ] )
2020-10-05 20:33:34 +00:00
main_channel = None
def create_channel ( ) :
2021-08-06 10:55:49 +00:00
node_ip_with_grpc_port = cluster . get_instance_ip ( ' node ' ) + ' : ' + str ( GRPC_PORT )
2020-10-05 20:33:34 +00:00
channel = grpc . insecure_channel ( node_ip_with_grpc_port )
grpc . channel_ready_future ( channel ) . result ( timeout = 10 )
global main_channel
if not main_channel :
main_channel = channel
return channel
2022-02-08 13:15:56 +00:00
def query_common ( query_text , settings = { } , input_data = [ ] , input_data_delimiter = ' ' , output_format = ' TabSeparated ' , send_output_columns = False ,
external_tables = [ ] , user_name = ' ' , password = ' ' , query_id = ' 123 ' , session_id = ' ' , stream_output = False , channel = None ) :
2021-08-06 10:55:49 +00:00
if type ( input_data ) is not list :
2020-10-05 20:33:34 +00:00
input_data = [ input_data ]
2021-08-06 10:55:49 +00:00
if type ( input_data_delimiter ) is str :
input_data_delimiter = input_data_delimiter . encode ( DEFAULT_ENCODING )
2020-10-05 20:33:34 +00:00
if not channel :
channel = main_channel
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( channel )
2020-10-24 22:03:49 +00:00
def query_info ( ) :
2021-08-06 10:55:49 +00:00
input_data_part = input_data . pop ( 0 ) if input_data else b ' '
if type ( input_data_part ) is str :
input_data_part = input_data_part . encode ( DEFAULT_ENCODING )
return clickhouse_grpc_pb2 . QueryInfo ( query = query_text , settings = settings , input_data = input_data_part ,
input_data_delimiter = input_data_delimiter , output_format = output_format ,
2022-02-08 13:15:56 +00:00
send_output_columns = send_output_columns , external_tables = external_tables ,
user_name = user_name , password = password , query_id = query_id ,
2021-08-06 10:55:49 +00:00
session_id = session_id , next_query_info = bool ( input_data ) )
2020-10-24 22:03:49 +00:00
def send_query_info ( ) :
yield query_info ( )
2020-10-05 20:33:34 +00:00
while input_data :
input_data_part = input_data . pop ( 0 )
2021-08-06 10:55:49 +00:00
if type ( input_data_part ) is str :
input_data_part = input_data_part . encode ( DEFAULT_ENCODING )
2020-10-05 20:33:34 +00:00
yield clickhouse_grpc_pb2 . QueryInfo ( input_data = input_data_part , next_query_info = bool ( input_data ) )
2020-10-24 22:03:49 +00:00
stream_input = len ( input_data ) > 1
if stream_input and stream_output :
return list ( stub . ExecuteQueryWithStreamIO ( send_query_info ( ) ) )
elif stream_input :
return [ stub . ExecuteQueryWithStreamInput ( send_query_info ( ) ) ]
elif stream_output :
return list ( stub . ExecuteQueryWithStreamOutput ( query_info ( ) ) )
else :
return [ stub . ExecuteQuery ( query_info ( ) ) ]
2020-10-05 20:33:34 +00:00
def query_no_errors ( * args , * * kwargs ) :
results = query_common ( * args , * * kwargs )
if results and results [ - 1 ] . HasField ( ' exception ' ) :
2020-10-23 21:48:34 +00:00
raise Exception ( results [ - 1 ] . exception . display_text )
2020-10-05 20:33:34 +00:00
return results
def query ( * args , * * kwargs ) :
2021-08-06 10:55:49 +00:00
output = b ' '
2020-10-05 20:33:34 +00:00
for result in query_no_errors ( * args , * * kwargs ) :
output + = result . output
2021-08-06 10:55:49 +00:00
return output . decode ( DEFAULT_ENCODING )
2020-10-05 20:33:34 +00:00
def query_and_get_error ( * args , * * kwargs ) :
results = query_common ( * args , * * kwargs )
if not results or not results [ - 1 ] . HasField ( ' exception ' ) :
raise Exception ( " Expected to be failed but succeeded! " )
return results [ - 1 ] . exception
2020-09-21 22:12:55 +00:00
2020-10-05 20:33:34 +00:00
def query_and_get_totals ( * args , * * kwargs ) :
2021-08-06 10:55:49 +00:00
totals = b ' '
2020-10-05 20:33:34 +00:00
for result in query_no_errors ( * args , * * kwargs ) :
totals + = result . totals
2021-08-06 10:55:49 +00:00
return totals . decode ( DEFAULT_ENCODING )
2020-10-05 20:33:34 +00:00
def query_and_get_extremes ( * args , * * kwargs ) :
2021-08-06 10:55:49 +00:00
extremes = b ' '
2020-10-05 20:33:34 +00:00
for result in query_no_errors ( * args , * * kwargs ) :
extremes + = result . extremes
2021-08-06 10:55:49 +00:00
return extremes . decode ( DEFAULT_ENCODING )
2020-10-05 20:33:34 +00:00
2020-10-24 00:37:57 +00:00
def query_and_get_logs ( * args , * * kwargs ) :
logs = " "
for result in query_no_errors ( * args , * * kwargs ) :
for log_entry in result . logs :
#print(log_entry)
logs + = log_entry . text + " \n "
return logs
2020-10-28 08:09:08 +00:00
class QueryThread ( Thread ) :
def __init__ ( self , query_text , expected_output , query_id , use_separate_channel = False ) :
Thread . __init__ ( self )
self . query_text = query_text
self . expected_output = expected_output
self . use_separate_channel = use_separate_channel
self . query_id = query_id
def run ( self ) :
if self . use_separate_channel :
with create_channel ( ) as channel :
assert query ( self . query_text , query_id = self . query_id , channel = channel ) == self . expected_output
else :
assert query ( self . query_text , query_id = self . query_id ) == self . expected_output
2020-10-05 20:33:34 +00:00
@pytest.fixture ( scope = " module " , autouse = True )
def start_cluster ( ) :
2020-10-11 02:19:16 +00:00
cluster . start ( )
try :
2020-10-05 20:33:34 +00:00
with create_channel ( ) as channel :
yield cluster
2020-10-11 02:19:16 +00:00
finally :
cluster . shutdown ( )
2020-09-21 22:12:55 +00:00
2020-10-05 20:33:34 +00:00
@pytest.fixture ( autouse = True )
def reset_after_test ( ) :
yield
2021-05-24 08:23:04 +00:00
node . query_with_retry ( " DROP TABLE IF EXISTS t " )
2020-10-05 20:33:34 +00:00
2021-08-06 10:55:49 +00:00
2020-10-05 20:33:34 +00:00
# Actual tests
def test_select_one ( ) :
assert query ( " SELECT 1 " ) == " 1 \n "
def test_ordinary_query ( ) :
assert query ( " SELECT count() FROM numbers(100) " ) == " 100 \n "
def test_insert_query ( ) :
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
query ( " INSERT INTO t VALUES (1),(2),(3) " )
query ( " INSERT INTO t FORMAT TabSeparated 4 \n 5 \n 6 \n " )
query ( " INSERT INTO t VALUES " , input_data = " (7),(8) " )
query ( " INSERT INTO t FORMAT TabSeparated " , input_data = " 9 \n 10 \n " )
assert query ( " SELECT a FROM t ORDER BY a " ) == " 1 \n 2 \n 3 \n 4 \n 5 \n 6 \n 7 \n 8 \n 9 \n 10 \n "
def test_insert_query_streaming ( ) :
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
2020-11-03 21:33:23 +00:00
query ( " INSERT INTO t VALUES " , input_data = [ " (1),(2),(3), " , " (5),(4),(6), " , " (7),(8),(9) " ] )
2020-10-05 20:33:34 +00:00
assert query ( " SELECT a FROM t ORDER BY a " ) == " 1 \n 2 \n 3 \n 4 \n 5 \n 6 \n 7 \n 8 \n 9 \n "
2020-11-04 14:59:31 +00:00
def test_insert_query_delimiter ( ) :
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
query ( " INSERT INTO t FORMAT CSV 1 \n 2 " , input_data = [ " 3 " , " 4 \n 5 " ] , input_data_delimiter = ' \n ' )
assert query ( " SELECT a FROM t ORDER BY a " ) == " 1 \n 2 \n 3 \n 4 \n 5 \n "
query ( " DROP TABLE t " )
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
query ( " INSERT INTO t FORMAT CSV 1 \n 2 " , input_data = [ " 3 " , " 4 \n 5 " ] )
assert query ( " SELECT a FROM t ORDER BY a " ) == " 1 \n 5 \n 234 \n "
2020-11-03 21:26:09 +00:00
def test_insert_default_column ( ) :
2022-02-06 20:20:53 +00:00
query ( " CREATE TABLE t (a UInt8, b Int32 DEFAULT 100 - a, c String DEFAULT ' c ' ) ENGINE = Memory " )
2020-11-03 21:26:09 +00:00
query ( " INSERT INTO t (c, a) VALUES ( ' x ' ,1),( ' y ' ,2) " )
query ( " INSERT INTO t (a) FORMAT TabSeparated " , input_data = " 3 \n 4 \n " )
2022-02-06 20:20:53 +00:00
assert query ( " SELECT * FROM t ORDER BY a " ) == " 1 \t 99 \t x \n " \
" 2 \t 98 \t y \n " \
" 3 \t 97 \t c \n " \
" 4 \t 96 \t c \n "
2020-11-03 21:26:09 +00:00
2020-11-03 21:33:23 +00:00
def test_insert_splitted_row ( ) :
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
query ( " INSERT INTO t VALUES " , input_data = [ " (1),(2),( " , " 3),(5),(4),(6) " ] )
assert query ( " SELECT a FROM t ORDER BY a " ) == " 1 \n 2 \n 3 \n 4 \n 5 \n 6 \n "
2020-10-23 20:55:47 +00:00
def test_output_format ( ) :
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
query ( " INSERT INTO t VALUES (1),(2),(3) " )
assert query ( " SELECT a FROM t ORDER BY a FORMAT JSONEachRow " ) == ' { " a " :1} \n { " a " :2} \n { " a " :3} \n '
assert query ( " SELECT a FROM t ORDER BY a " , output_format = " JSONEachRow " ) == ' { " a " :1} \n { " a " :2} \n { " a " :3} \n '
2020-10-05 20:33:34 +00:00
def test_totals_and_extremes ( ) :
query ( " CREATE TABLE t (x UInt8, y UInt8) ENGINE = Memory " )
query ( " INSERT INTO t VALUES (1, 2), (2, 4), (3, 2), (3, 3), (3, 4) " )
assert query ( " SELECT sum(x), y FROM t GROUP BY y WITH TOTALS " ) == " 4 \t 2 \n 3 \t 3 \n 5 \t 4 \n "
assert query_and_get_totals ( " SELECT sum(x), y FROM t GROUP BY y WITH TOTALS " ) == " 12 \t 0 \n "
assert query ( " SELECT x, y FROM t " ) == " 1 \t 2 \n 2 \t 4 \n 3 \t 2 \n 3 \t 3 \n 3 \t 4 \n "
assert query_and_get_extremes ( " SELECT x, y FROM t " , settings = { " extremes " : " 1 " } ) == " 1 \t 2 \n 3 \t 4 \n "
2022-02-10 13:43:12 +00:00
def test_get_query_details ( ) :
2022-02-08 13:15:56 +00:00
result = list ( query_no_errors ( " CREATE TABLE t (a UInt8) ENGINE = Memory " , query_id = ' 123 ' ) ) [ 0 ]
assert result . query_id == ' 123 '
pytz . timezone ( result . time_zone )
2022-02-10 13:43:12 +00:00
assert result . output_format == ' '
2022-02-08 13:15:56 +00:00
assert len ( result . output_columns ) == 0
2022-02-10 13:43:12 +00:00
assert result . output == b ' '
#
result = list ( query_no_errors ( " SELECT ' a ' , 1 " , query_id = ' ' , output_format = ' TabSeparated ' ) ) [ 0 ]
2022-02-08 13:15:56 +00:00
uuid . UUID ( result . query_id )
pytz . timezone ( result . time_zone )
2022-02-10 13:43:12 +00:00
assert result . output_format == ' TabSeparated '
2022-02-08 13:15:56 +00:00
assert len ( result . output_columns ) == 0
2022-02-10 13:43:12 +00:00
assert result . output == b ' a \t 1 \n '
#
2022-02-08 13:15:56 +00:00
result = list ( query_no_errors ( " SELECT ' a ' AS x, 1 FORMAT JSONEachRow " , query_id = ' ' , send_output_columns = True ) ) [ 0 ]
2022-02-08 13:15:56 +00:00
uuid . UUID ( result . query_id )
pytz . timezone ( result . time_zone )
2022-02-10 13:43:12 +00:00
assert result . output_format == ' JSONEachRow '
2022-02-08 13:15:56 +00:00
assert ( [ ( col . name , col . type ) for col in result . output_columns ] ) == [ ( ' x ' , ' String ' ) , ( ' 1 ' , ' UInt8 ' ) ]
2022-02-10 13:43:12 +00:00
assert result . output == b ' { " x " : " a " , " 1 " :1} \n '
2020-10-05 20:33:34 +00:00
def test_errors_handling ( ) :
e = query_and_get_error ( " " )
#print(e)
2020-10-23 21:48:34 +00:00
assert " Empty query " in e . display_text
2020-10-05 20:33:34 +00:00
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
e = query_and_get_error ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
2020-10-23 21:48:34 +00:00
assert " Table default.t already exists " in e . display_text
2020-10-24 00:37:57 +00:00
2020-11-07 11:42:29 +00:00
def test_authentication ( ) :
2021-10-10 14:32:01 +00:00
query ( " CREATE USER OR REPLACE john IDENTIFIED BY ' qwe123 ' " )
2020-11-07 11:42:29 +00:00
assert query ( " SELECT currentUser() " , user_name = " john " , password = " qwe123 " ) == " john \n "
2021-10-08 09:17:42 +00:00
query ( " DROP USER john " )
2020-11-07 11:42:29 +00:00
2020-10-24 00:37:57 +00:00
def test_logs ( ) :
logs = query_and_get_logs ( " SELECT 1 " , settings = { ' send_logs_level ' : ' debug ' } )
assert " SELECT 1 " in logs
assert " Read 1 rows " in logs
assert " Peak memory usage " in logs
2020-10-24 06:49:45 +00:00
def test_progress ( ) :
2020-10-24 22:03:49 +00:00
results = query_no_errors ( " SELECT number, sleep(0.31) FROM numbers(8) SETTINGS max_block_size=2, interactive_delay=100000 " , stream_output = True )
2022-02-08 13:15:56 +00:00
for result in results :
result . time_zone = ' '
result . query_id = ' '
2020-10-24 06:49:45 +00:00
#print(results)
assert str ( results ) == \
""" [progress {
read_rows : 2
read_bytes : 16
total_rows_to_read : 8
}
2022-02-10 13:43:12 +00:00
output_format : " TabSeparated "
2020-10-24 06:49:45 +00:00
, output : " 0 \\ t0 \\ n1 \\ t0 \\ n "
, progress {
read_rows : 2
read_bytes : 16
}
, output : " 2 \\ t0 \\ n3 \\ t0 \\ n "
, progress {
read_rows : 2
read_bytes : 16
}
, output : " 4 \\ t0 \\ n5 \\ t0 \\ n "
, progress {
read_rows : 2
read_bytes : 16
}
, output : " 6 \\ t0 \\ n7 \\ t0 \\ n "
2020-11-07 18:00:55 +00:00
, stats {
rows : 8
2021-01-03 18:51:57 +00:00
blocks : 4
2020-11-07 18:00:55 +00:00
allocated_bytes : 324
applied_limit : true
rows_before_limit : 8
}
] """
2020-10-24 16:57:27 +00:00
2022-02-06 20:20:53 +00:00
def test_session_settings ( ) :
2020-10-24 16:57:27 +00:00
session_a = " session A "
session_b = " session B "
query ( " SET custom_x=1 " , session_id = session_a )
query ( " SET custom_y=2 " , session_id = session_a )
query ( " SET custom_x=3 " , session_id = session_b )
query ( " SET custom_y=4 " , session_id = session_b )
assert query ( " SELECT getSetting( ' custom_x ' ), getSetting( ' custom_y ' ) " , session_id = session_a ) == " 1 \t 2 \n "
assert query ( " SELECT getSetting( ' custom_x ' ), getSetting( ' custom_y ' ) " , session_id = session_b ) == " 3 \t 4 \n "
2022-02-06 20:20:53 +00:00
def test_session_temp_tables ( ) :
session_a = " session A "
session_b = " session B "
query ( " CREATE TEMPORARY TABLE my_temp_table(a Int8) " , session_id = session_a )
query ( " INSERT INTO my_temp_table VALUES (10) " , session_id = session_a )
assert query ( " SELECT * FROM my_temp_table " , session_id = session_a ) == " 10 \n "
query ( " CREATE TEMPORARY TABLE my_temp_table(a Int8) " , session_id = session_b )
query ( " INSERT INTO my_temp_table VALUES (20) " , session_id = session_b )
assert query ( " SELECT * FROM my_temp_table " , session_id = session_b ) == " 20 \n "
assert query ( " SELECT * FROM my_temp_table " , session_id = session_a ) == " 10 \n "
2020-10-24 16:57:27 +00:00
def test_no_session ( ) :
e = query_and_get_error ( " SET custom_x=1 " )
assert " There is no session " in e . display_text
2022-02-06 20:20:53 +00:00
e = query_and_get_error ( " CREATE TEMPORARY TABLE my_temp_table(a Int8) " )
assert " There is no session " in e . display_text
2020-10-28 08:09:08 +00:00
2020-11-03 04:37:59 +00:00
def test_input_function ( ) :
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
query ( " INSERT INTO t SELECT col1 * col2 FROM input( ' col1 UInt8, col2 UInt8 ' ) FORMAT CSV " , input_data = [ " 5,4 \n " , " 8,11 \n " , " 10,12 \n " ] )
assert query ( " SELECT a FROM t ORDER BY a " ) == " 20 \n 88 \n 120 \n "
query ( " INSERT INTO t SELECT col1 * col2 FROM input( ' col1 UInt8, col2 UInt8 ' ) FORMAT CSV 11,13 " )
assert query ( " SELECT a FROM t ORDER BY a " ) == " 20 \n 88 \n 120 \n 143 \n "
query ( " INSERT INTO t SELECT col1 * col2 FROM input( ' col1 UInt8, col2 UInt8 ' ) FORMAT CSV 20,10 \n " , input_data = " 15,15 \n " )
assert query ( " SELECT a FROM t ORDER BY a " ) == " 20 \n 88 \n 120 \n 143 \n 200 \n 225 \n "
2020-11-03 11:47:34 +00:00
def test_external_table ( ) :
columns = [ clickhouse_grpc_pb2 . NameAndType ( name = ' UserID ' , type = ' UInt64 ' ) , clickhouse_grpc_pb2 . NameAndType ( name = ' UserName ' , type = ' String ' ) ]
2021-08-06 10:55:49 +00:00
ext1 = clickhouse_grpc_pb2 . ExternalTable ( name = ' ext1 ' , columns = columns , data = b ' 1 \t Alex \n 2 \t Ben \n 3 \t Carl \n ' , format = ' TabSeparated ' )
2020-11-03 11:47:34 +00:00
assert query ( " SELECT * FROM ext1 ORDER BY UserID " , external_tables = [ ext1 ] ) == " 1 \t Alex \n " \
" 2 \t Ben \n " \
" 3 \t Carl \n "
2021-08-06 10:55:49 +00:00
ext2 = clickhouse_grpc_pb2 . ExternalTable ( name = ' ext2 ' , columns = columns , data = b ' 4,Daniel \n 5,Ethan \n ' , format = ' CSV ' )
2020-11-03 11:47:34 +00:00
assert query ( " SELECT * FROM (SELECT * FROM ext1 UNION ALL SELECT * FROM ext2) ORDER BY UserID " , external_tables = [ ext1 , ext2 ] ) == " 1 \t Alex \n " \
" 2 \t Ben \n " \
" 3 \t Carl \n " \
" 4 \t Daniel \n " \
" 5 \t Ethan \n "
unnamed_columns = [ clickhouse_grpc_pb2 . NameAndType ( type = ' UInt64 ' ) , clickhouse_grpc_pb2 . NameAndType ( type = ' String ' ) ]
2021-08-06 10:55:49 +00:00
unnamed_table = clickhouse_grpc_pb2 . ExternalTable ( columns = unnamed_columns , data = b ' 6 \t George \n 7 \t Fred \n ' )
2020-11-03 11:47:34 +00:00
assert query ( " SELECT * FROM _data ORDER BY _2 " , external_tables = [ unnamed_table ] ) == " 7 \t Fred \n " \
" 6 \t George \n "
def test_external_table_streaming ( ) :
columns = [ clickhouse_grpc_pb2 . NameAndType ( name = ' UserID ' , type = ' UInt64 ' ) , clickhouse_grpc_pb2 . NameAndType ( name = ' UserName ' , type = ' String ' ) ]
def send_query_info ( ) :
yield clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT * FROM exts ORDER BY UserID " ,
2021-08-06 10:55:49 +00:00
external_tables = [ clickhouse_grpc_pb2 . ExternalTable ( name = ' exts ' , columns = columns , data = b ' 1 \t Alex \n 2 \t Ben \n 3 \t Carl \n ' ) ] ,
2020-11-03 11:47:34 +00:00
next_query_info = True )
2021-08-06 10:55:49 +00:00
yield clickhouse_grpc_pb2 . QueryInfo ( external_tables = [ clickhouse_grpc_pb2 . ExternalTable ( name = ' exts ' , data = b ' 4 \t Daniel \n 5 \t Ethan \n ' ) ] )
2020-11-03 11:47:34 +00:00
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
result = stub . ExecuteQueryWithStreamInput ( send_query_info ( ) )
2021-08-06 10:55:49 +00:00
assert result . output == b ' 1 \t Alex \n ' \
b ' 2 \t Ben \n ' \
b ' 3 \t Carl \n ' \
b ' 4 \t Daniel \n ' \
b ' 5 \t Ethan \n '
2020-11-03 11:47:34 +00:00
2020-10-28 08:09:08 +00:00
def test_simultaneous_queries_same_channel ( ) :
threads = [ ]
try :
for i in range ( 0 , 100 ) :
thread = QueryThread ( " SELECT sum(number) FROM numbers(10) " , expected_output = " 45 \n " , query_id = ' sqA ' + str ( i ) )
threads . append ( thread )
thread . start ( )
finally :
for thread in threads :
thread . join ( )
def test_simultaneous_queries_multiple_channels ( ) :
threads = [ ]
try :
for i in range ( 0 , 100 ) :
thread = QueryThread ( " SELECT sum(number) FROM numbers(10) " , expected_output = " 45 \n " , query_id = ' sqB ' + str ( i ) , use_separate_channel = True )
threads . append ( thread )
thread . start ( )
finally :
for thread in threads :
thread . join ( )
2020-11-02 00:47:43 +00:00
def test_cancel_while_processing_input ( ) :
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
def send_query_info ( ) :
2021-08-06 10:55:49 +00:00
yield clickhouse_grpc_pb2 . QueryInfo ( query = " INSERT INTO t FORMAT TabSeparated " , input_data = b ' 1 \n 2 \n 3 \n ' , next_query_info = True )
yield clickhouse_grpc_pb2 . QueryInfo ( input_data = b ' 4 \n 5 \n 6 \n ' , next_query_info = True )
2020-11-02 00:47:43 +00:00
yield clickhouse_grpc_pb2 . QueryInfo ( cancel = True )
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
result = stub . ExecuteQueryWithStreamInput ( send_query_info ( ) )
assert result . cancelled == True
assert result . progress . written_rows == 6
assert query ( " SELECT a FROM t ORDER BY a " ) == " 1 \n 2 \n 3 \n 4 \n 5 \n 6 \n "
def test_cancel_while_generating_output ( ) :
def send_query_info ( ) :
yield clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT number, sleep(0.2) FROM numbers(10) SETTINGS max_block_size=2 " )
time . sleep ( 0.5 )
yield clickhouse_grpc_pb2 . QueryInfo ( cancel = True )
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
results = list ( stub . ExecuteQueryWithStreamIO ( send_query_info ( ) ) )
assert len ( results ) > = 1
assert results [ - 1 ] . cancelled == True
2021-08-06 10:55:49 +00:00
output = b ' '
2020-11-02 00:47:43 +00:00
for result in results :
output + = result . output
2021-08-06 10:55:49 +00:00
assert output == b ' 0 \t 0 \n 1 \t 0 \n 2 \t 0 \n 3 \t 0 \n '
2021-09-12 21:43:04 +00:00
2021-12-15 21:06:17 +00:00
def test_compressed_output ( ) :
2022-02-06 16:10:41 +00:00
query_info = clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT 0 FROM numbers(1000) " , output_compression_type = " lz4 " )
2021-12-15 21:06:17 +00:00
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
result = stub . ExecuteQuery ( query_info )
assert lz4 . frame . decompress ( result . output ) == ( b ' 0 \n ' ) * 1000
def test_compressed_output_streaming ( ) :
2022-02-06 16:10:41 +00:00
query_info = clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT 0 FROM numbers(100000) " , output_compression_type = " lz4 " )
2021-12-15 21:06:17 +00:00
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
d_context = lz4 . frame . create_decompression_context ( )
data = b ' '
for result in stub . ExecuteQueryWithStreamOutput ( query_info ) :
d1 , _ , _ = lz4 . frame . decompress_chunk ( d_context , result . output )
data + = d1
assert data == ( b ' 0 \n ' ) * 100000
def test_compressed_output_gzip ( ) :
2022-02-06 16:10:41 +00:00
query_info = clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT 0 FROM numbers(1000) " , output_compression_type = " gzip " , output_compression_level = 6 )
2021-12-15 21:06:17 +00:00
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
result = stub . ExecuteQuery ( query_info )
assert gzip . decompress ( result . output ) == ( b ' 0 \n ' ) * 1000
def test_compressed_totals_and_extremes ( ) :
query ( " CREATE TABLE t (x UInt8, y UInt8) ENGINE = Memory " )
query ( " INSERT INTO t VALUES (1, 2), (2, 4), (3, 2), (3, 3), (3, 4) " )
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
2022-02-06 16:10:41 +00:00
query_info = clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT sum(x), y FROM t GROUP BY y WITH TOTALS " , output_compression_type = " lz4 " )
2021-12-15 21:06:17 +00:00
result = stub . ExecuteQuery ( query_info )
assert lz4 . frame . decompress ( result . totals ) == b ' 12 \t 0 \n '
2022-02-06 16:10:41 +00:00
query_info = clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT x, y FROM t " , settings = { " extremes " : " 1 " } , output_compression_type = " lz4 " )
2021-12-15 21:06:17 +00:00
result = stub . ExecuteQuery ( query_info )
assert lz4 . frame . decompress ( result . extremes ) == b ' 1 \t 2 \n 3 \t 4 \n '
def test_compressed_insert_query_streaming ( ) :
query ( " CREATE TABLE t (a UInt8) ENGINE = Memory " )
data = lz4 . frame . compress ( b ' (1),(2),(3),(5),(4),(6),(7),(8),(9) ' )
sz1 = len ( data ) / / 3
sz2 = len ( data ) / / 3
d1 = data [ : sz1 ]
d2 = data [ sz1 : sz1 + sz2 ]
d3 = data [ sz1 + sz2 : ]
def send_query_info ( ) :
2022-02-06 16:10:41 +00:00
yield clickhouse_grpc_pb2 . QueryInfo ( query = " INSERT INTO t VALUES " , input_data = d1 , input_compression_type = " lz4 " , next_query_info = True )
2021-12-15 21:06:17 +00:00
yield clickhouse_grpc_pb2 . QueryInfo ( input_data = d2 , next_query_info = True )
yield clickhouse_grpc_pb2 . QueryInfo ( input_data = d3 )
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
stub . ExecuteQueryWithStreamInput ( send_query_info ( ) )
assert query ( " SELECT a FROM t ORDER BY a " ) == " 1 \n 2 \n 3 \n 4 \n 5 \n 6 \n 7 \n 8 \n 9 \n "
def test_compressed_external_table ( ) :
columns = [ clickhouse_grpc_pb2 . NameAndType ( name = ' UserID ' , type = ' UInt64 ' ) , clickhouse_grpc_pb2 . NameAndType ( name = ' UserName ' , type = ' String ' ) ]
d1 = lz4 . frame . compress ( b ' 1 \t Alex \n 2 \t Ben \n 3 \t Carl \n ' )
d2 = gzip . compress ( b ' 4,Daniel \n 5,Ethan \n ' )
ext1 = clickhouse_grpc_pb2 . ExternalTable ( name = ' ext1 ' , columns = columns , data = d1 , format = ' TabSeparated ' , compression_type = " lz4 " )
ext2 = clickhouse_grpc_pb2 . ExternalTable ( name = ' ext2 ' , columns = columns , data = d2 , format = ' CSV ' , compression_type = " gzip " )
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
query_info = clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT * FROM (SELECT * FROM ext1 UNION ALL SELECT * FROM ext2) ORDER BY UserID " , external_tables = [ ext1 , ext2 ] )
result = stub . ExecuteQuery ( query_info )
assert result . output == b " 1 \t Alex \n " \
b " 2 \t Ben \n " \
b " 3 \t Carl \n " \
b " 4 \t Daniel \n " \
b " 5 \t Ethan \n "
2022-02-05 19:09:56 +00:00
2022-02-06 18:33:31 +00:00
def test_transport_compression ( ) :
query_info = clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT 0 FROM numbers(1000000) " , transport_compression_type = ' gzip ' , transport_compression_level = 3 )
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
result = stub . ExecuteQuery ( query_info )
assert result . output == ( b ' 0 \n ' ) * 1000000
2022-02-05 19:09:56 +00:00
def test_opentelemetry_context_propagation ( ) :
trace_id = " 80c190b5-9dc1-4eae-82b9-6c261438c817 "
parent_span_id = 123
trace_state = " some custom state "
trace_id_hex = trace_id . replace ( " - " , " " )
parent_span_id_hex = f ' { parent_span_id : 0>16X } '
metadata = [ ( " traceparent " , f " 00- { trace_id_hex } - { parent_span_id_hex } -01 " ) , ( " tracestate " , trace_state ) ]
stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( main_channel )
query_info = clickhouse_grpc_pb2 . QueryInfo ( query = " SELECT 1 " )
result = stub . ExecuteQuery ( query_info , metadata = metadata )
assert result . output == b " 1 \n "
node . query ( " SYSTEM FLUSH LOGS " )
assert node . query ( f " SELECT attribute[ ' db.statement ' ], attribute[ ' clickhouse.tracestate ' ] FROM system.opentelemetry_span_log "
f " WHERE trace_id= ' { trace_id } ' AND parent_span_id= { parent_span_id } " ) == " SELECT 1 \t some custom state \n "