2024-08-15 08:58:36 +00:00
import uuid
2024-09-06 15:27:46 +00:00
2024-09-27 10:19:39 +00:00
import pytest
2024-03-26 12:30:16 +00:00
from helpers . cluster import ClickHouseCluster
cluster = ClickHouseCluster ( __file__ )
2024-03-26 12:48:01 +00:00
node1 = cluster . add_instance (
" node1 " , main_configs = [ " configs/remote_servers.xml " ] , with_zookeeper = True
)
node2 = cluster . add_instance (
" node2 " , main_configs = [ " configs/remote_servers.xml " ] , with_zookeeper = True
)
node3 = cluster . add_instance (
" node3 " , main_configs = [ " configs/remote_servers.xml " ] , with_zookeeper = True
)
2024-03-26 12:30:16 +00:00
@pytest.fixture ( scope = " module " )
def start_cluster ( ) :
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
2024-04-24 21:13:07 +00:00
def create_tables ( cluster , table_name , skip_last_replica ) :
2024-03-26 12:30:16 +00:00
node1 . query (
2024-08-15 08:58:36 +00:00
f " CREATE TABLE { table_name } (key Int64, value String) Engine=ReplicatedMergeTree( ' /test_parallel_replicas/shard1/ { table_name } ' , ' r1 ' ) ORDER BY (key) "
2024-03-26 12:30:16 +00:00
)
node2 . query (
2024-08-15 08:58:36 +00:00
f " CREATE TABLE { table_name } (key Int64, value String) Engine=ReplicatedMergeTree( ' /test_parallel_replicas/shard1/ { table_name } ' , ' r2 ' ) ORDER BY (key) "
2024-03-26 12:30:16 +00:00
)
2024-04-24 21:13:07 +00:00
if not skip_last_replica :
node3 . query (
2024-08-15 08:58:36 +00:00
f " CREATE TABLE { table_name } (key Int64, value String) Engine=ReplicatedMergeTree( ' /test_parallel_replicas/shard1/ { table_name } ' , ' r3 ' ) ORDER BY (key) "
2024-04-24 21:13:07 +00:00
)
2024-03-26 12:30:16 +00:00
# populate data
node1 . query (
f " INSERT INTO { table_name } SELECT number % 4, number FROM numbers(1000) "
)
node1 . query (
f " INSERT INTO { table_name } SELECT number % 4, number FROM numbers(1000, 1000) "
)
node1 . query (
f " INSERT INTO { table_name } SELECT number % 4, number FROM numbers(2000, 1000) "
)
node1 . query (
f " INSERT INTO { table_name } SELECT number % 4, number FROM numbers(3000, 1000) "
)
node2 . query ( f " SYSTEM SYNC REPLICA { table_name } " )
2024-04-24 21:13:07 +00:00
if not skip_last_replica :
node3 . query ( f " SYSTEM SYNC REPLICA { table_name } " )
def test_skip_replicas_without_table ( start_cluster ) :
cluster_name = " test_1_shard_3_replicas "
table_name = " tt "
create_tables ( cluster_name , table_name , skip_last_replica = True )
expected_result = " "
for i in range ( 4 ) :
expected_result + = f " { i } \t 1000 \n "
2024-08-15 08:58:36 +00:00
log_comment = uuid . uuid4 ( )
2024-04-24 21:13:07 +00:00
assert (
node1 . query (
f " SELECT key, count() FROM { table_name } GROUP BY key ORDER BY key " ,
settings = {
2024-09-06 15:27:46 +00:00
" enable_parallel_replicas " : 2 ,
2024-04-24 21:13:07 +00:00
" max_parallel_replicas " : 3 ,
" cluster_for_parallel_replicas " : cluster_name ,
" log_comment " : log_comment ,
} ,
)
== expected_result
)
node1 . query ( " SYSTEM FLUSH LOGS " )
2024-04-24 21:23:57 +00:00
assert (
node1 . query (
2024-09-06 15:27:46 +00:00
f " SELECT ProfileEvents[ ' DistributedConnectionMissingTable ' ], ProfileEvents[ ' ParallelReplicasUnavailableCount ' ] FROM system.query_log WHERE type = ' QueryFinish ' AND query_id IN (SELECT query_id FROM system.query_log WHERE current_database = currentDatabase() AND log_comment = ' { log_comment } ' AND type = ' QueryFinish ' AND initial_query_id = query_id) SETTINGS enable_parallel_replicas=0 "
2024-04-24 21:23:57 +00:00
)
2024-04-30 14:25:19 +00:00
== " 1 \t 1 \n "
2024-04-24 21:23:57 +00:00
)
2024-08-15 08:58:36 +00:00
node1 . query ( f " DROP TABLE { table_name } SYNC " )
node2 . query ( f " DROP TABLE { table_name } SYNC " )
2024-03-26 12:30:16 +00:00
def test_skip_unresponsive_replicas ( start_cluster ) :
cluster_name = " test_1_shard_3_replicas "
table_name = " tt "
2024-04-24 21:13:07 +00:00
create_tables ( cluster_name , table_name , skip_last_replica = False )
2024-03-26 12:30:16 +00:00
expected_result = " "
for i in range ( 4 ) :
expected_result + = f " { i } \t 1000 \n "
node1 . query ( " SYSTEM ENABLE FAILPOINT receive_timeout_on_table_status_response " )
assert (
node1 . query (
f " SELECT key, count() FROM { table_name } GROUP BY key ORDER BY key " ,
settings = {
2024-09-06 15:27:46 +00:00
" enable_parallel_replicas " : 2 ,
2024-03-26 12:30:16 +00:00
" max_parallel_replicas " : 3 ,
" cluster_for_parallel_replicas " : cluster_name ,
} ,
)
== expected_result
)
2024-08-15 08:58:36 +00:00
node1 . query ( f " DROP TABLE { table_name } SYNC " )
node2 . query ( f " DROP TABLE { table_name } SYNC " )
node3 . query ( f " DROP TABLE { table_name } SYNC " )