2023-01-20 12:55:26 +00:00
import pytest
from helpers . cluster import ClickHouseCluster
cluster = ClickHouseCluster ( __file__ )
2023-01-22 14:20:25 +00:00
nodes = [
cluster . add_instance (
2024-06-19 12:44:26 +00:00
f " n { i } " ,
main_configs = [ " configs/remote_servers.xml " ] ,
with_zookeeper = True ,
macros = { " replica " : f " r { i } " } ,
2023-01-22 14:20:25 +00:00
)
for i in range ( 1 , 5 )
]
2023-01-20 12:55:26 +00:00
@pytest.fixture ( scope = " module " , autouse = True )
def start_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
2024-06-19 12:44:26 +00:00
def insert_data ( table_name , row_num , all_nodes = False ) :
query = (
f " INSERT INTO { table_name } SELECT number % 4, number FROM numbers( { row_num } ) "
)
if all_nodes :
for n in nodes :
n . query ( query )
else :
n1 = nodes [ 0 ]
n1 . query ( query )
@pytest.mark.parametrize ( " custom_key " , [ " sipHash64(key) " , " key " ] )
@pytest.mark.parametrize ( " filter_type " , [ " default " , " range " ] )
@pytest.mark.parametrize (
" cluster " ,
[ " test_multiple_shards_multiple_replicas " , " test_single_shard_multiple_replicas " ] ,
)
def test_parallel_replicas_custom_key_distributed (
start_cluster , cluster , custom_key , filter_type
) :
for node in nodes :
node . rotate_logs ( )
row_num = 1000
n1 = nodes [ 0 ]
n1 . query ( f " DROP TABLE IF EXISTS dist_table ON CLUSTER { cluster } SYNC " )
n1 . query ( f " DROP TABLE IF EXISTS test_table_for_dist ON CLUSTER { cluster } SYNC " )
2023-01-20 12:55:26 +00:00
n1 . query (
2024-06-19 12:44:26 +00:00
f " CREATE TABLE test_table_for_dist ON CLUSTER { cluster } (key UInt32, value String) Engine=MergeTree ORDER BY (key, sipHash64(value)) "
2023-01-20 12:55:26 +00:00
)
2024-06-19 12:44:26 +00:00
2023-01-20 12:55:26 +00:00
n1 . query (
f """
2024-06-19 12:44:26 +00:00
CREATE TABLE dist_table AS test_table_for_dist
2023-01-20 12:55:26 +00:00
Engine = Distributed (
{ cluster } ,
currentDatabase ( ) ,
2024-06-19 12:44:26 +00:00
test_table_for_dist ,
2023-01-20 12:55:26 +00:00
rand ( )
)
"""
)
2024-06-19 12:44:26 +00:00
insert_data ( " dist_table " , row_num )
n1 . query ( " SYSTEM FLUSH DISTRIBUTED dist_table " )
expected_result = " "
for i in range ( 4 ) :
expected_result + = f " { i } \t 250 \n "
2023-01-20 12:55:26 +00:00
2023-01-22 14:20:25 +00:00
n1 = nodes [ 0 ]
2024-06-19 12:44:26 +00:00
assert (
n1 . query (
" SELECT key, count() FROM dist_table GROUP BY key ORDER BY key " ,
settings = {
" max_parallel_replicas " : 4 ,
" parallel_replicas_custom_key " : custom_key ,
" parallel_replicas_custom_key_filter_type " : filter_type ,
2024-06-25 10:13:44 +00:00
" prefer_localhost_replica " : 0 ,
2024-06-19 12:44:26 +00:00
} ,
)
== expected_result
2023-03-03 15:25:25 +00:00
)
2024-06-19 12:44:26 +00:00
if cluster == " test_multiple_shards_multiple_replicas " :
# we simply process query on all replicas for each shard by appending the filter on replica
assert all (
node . contains_in_log ( " Processing query on a replica using custom_key " )
for node in nodes
)
2023-01-20 12:55:26 +00:00
2023-03-03 15:14:49 +00:00
@pytest.mark.parametrize ( " custom_key " , [ " sipHash64(key) " , " key " ] )
2023-01-20 12:55:26 +00:00
@pytest.mark.parametrize ( " filter_type " , [ " default " , " range " ] )
@pytest.mark.parametrize (
" cluster " ,
2024-06-19 12:44:26 +00:00
[ " test_single_shard_multiple_replicas " ] ,
2023-01-20 12:55:26 +00:00
)
2024-06-19 12:44:26 +00:00
def test_parallel_replicas_custom_key_mergetree (
start_cluster , cluster , custom_key , filter_type
) :
2023-01-20 12:55:26 +00:00
for node in nodes :
node . rotate_logs ( )
row_num = 1000
2024-06-19 12:44:26 +00:00
n1 = nodes [ 0 ]
n1 . query ( f " DROP TABLE IF EXISTS test_table_for_mt ON CLUSTER { cluster } SYNC " )
n1 . query (
f " CREATE TABLE test_table_for_mt ON CLUSTER { cluster } (key UInt32, value String) Engine=MergeTree ORDER BY (key, sipHash64(value)) "
)
insert_data ( " test_table_for_mt " , row_num , all_nodes = True )
2023-01-22 14:20:25 +00:00
2023-03-03 15:14:49 +00:00
expected_result = " "
for i in range ( 4 ) :
expected_result + = f " { i } \t 250 \n "
2023-01-22 14:20:25 +00:00
n1 = nodes [ 0 ]
2023-01-20 12:55:26 +00:00
assert (
2023-03-03 15:14:49 +00:00
n1 . query (
2024-06-19 12:44:26 +00:00
" SELECT key, count() FROM test_table_for_mt GROUP BY key ORDER BY key " ,
2023-03-03 15:14:49 +00:00
settings = {
" max_parallel_replicas " : 4 ,
" parallel_replicas_custom_key " : custom_key ,
" parallel_replicas_custom_key_filter_type " : filter_type ,
2024-06-19 12:44:26 +00:00
" parallel_replicas_for_non_replicated_merge_tree " : 1 ,
" cluster_for_parallel_replicas " : cluster ,
2023-03-03 15:14:49 +00:00
} ,
2023-01-20 12:55:26 +00:00
)
2023-03-03 15:14:49 +00:00
== expected_result
2023-01-20 12:55:26 +00:00
)
2024-06-19 12:44:26 +00:00
@pytest.mark.parametrize ( " custom_key " , [ " sipHash64(key) " , " key " ] )
@pytest.mark.parametrize ( " filter_type " , [ " default " , " range " ] )
@pytest.mark.parametrize (
" cluster " ,
[ " test_single_shard_multiple_replicas " ] ,
)
def test_parallel_replicas_custom_key_replicatedmergetree (
start_cluster , cluster , custom_key , filter_type
) :
for node in nodes :
node . rotate_logs ( )
row_num = 1000
n1 = nodes [ 0 ]
n1 . query ( f " DROP TABLE IF EXISTS test_table_for_rmt ON CLUSTER { cluster } SYNC " )
n1 . query (
f " CREATE TABLE test_table_for_rmt ON CLUSTER { cluster } (key UInt32, value String) Engine=ReplicatedMergeTree( ' /clickhouse/tables ' , ' {{ replica }} ' ) ORDER BY (key, sipHash64(value)) "
)
insert_data ( " test_table_for_rmt " , row_num , all_nodes = False )
expected_result = " "
for i in range ( 4 ) :
expected_result + = f " { i } \t 250 \n "
n1 = nodes [ 0 ]
assert (
n1 . query (
" SELECT key, count() FROM test_table_for_rmt GROUP BY key ORDER BY key " ,
settings = {
" max_parallel_replicas " : 4 ,
" parallel_replicas_custom_key " : custom_key ,
" parallel_replicas_custom_key_filter_type " : filter_type ,
" cluster_for_parallel_replicas " : cluster ,
} ,
2023-01-20 12:55:26 +00:00
)
2024-06-19 12:44:26 +00:00
== expected_result
)