2020-10-08 13:14:44 +00:00
#!/usr/bin/env python3
import pytest
from helpers . cluster import ClickHouseCluster
from multiprocessing . dummy import Pool
from helpers . network import PartitionManager
from helpers . client import QueryRuntimeException
from helpers . test_tools import assert_eq_with_retry
cluster = ClickHouseCluster ( __file__ )
node1 = cluster . add_instance ( " node1 " , with_zookeeper = True )
node2 = cluster . add_instance ( " node2 " , with_zookeeper = True )
node3 = cluster . add_instance ( " node3 " , with_zookeeper = True )
2022-03-22 16:39:58 +00:00
2020-10-08 13:14:44 +00:00
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
global cluster
try :
cluster . start ( )
yield cluster
finally :
cluster . shutdown ( )
def test_parallel_quorum_actually_parallel ( started_cluster ) :
2023-07-04 19:32:43 +00:00
settings = {
" insert_quorum " : " 3 " ,
" insert_quorum_parallel " : " 1 " ,
" function_sleep_max_microseconds_per_block " : " 0 " ,
}
2020-10-08 13:14:44 +00:00
for i , node in enumerate ( [ node1 , node2 , node3 ] ) :
node . query (
" CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree( ' /test/r ' , ' {num} ' ) ORDER BY tuple() " . format (
num = i
)
2022-03-22 16:39:58 +00:00
)
2020-10-08 13:14:44 +00:00
p = Pool ( 10 )
def long_insert ( node ) :
node . query (
" INSERT INTO r SELECT number, toString(number) FROM numbers(5) where sleepEachRow(1) == 0 " ,
settings = settings ,
)
job = p . apply_async ( long_insert , ( node1 , ) )
node2 . query ( " INSERT INTO r VALUES (6, ' 6 ' ) " , settings = settings )
assert node1 . query ( " SELECT COUNT() FROM r " ) == " 1 \n "
assert node2 . query ( " SELECT COUNT() FROM r " ) == " 1 \n "
assert node3 . query ( " SELECT COUNT() FROM r " ) == " 1 \n "
node1 . query ( " INSERT INTO r VALUES (7, ' 7 ' ) " , settings = settings )
assert node1 . query ( " SELECT COUNT() FROM r " ) == " 2 \n "
assert node2 . query ( " SELECT COUNT() FROM r " ) == " 2 \n "
assert node3 . query ( " SELECT COUNT() FROM r " ) == " 2 \n "
job . get ( )
assert node1 . query ( " SELECT COUNT() FROM r " ) == " 7 \n "
assert node2 . query ( " SELECT COUNT() FROM r " ) == " 7 \n "
assert node3 . query ( " SELECT COUNT() FROM r " ) == " 7 \n "
p . close ( )
p . join ( )
def test_parallel_quorum_actually_quorum ( started_cluster ) :
for i , node in enumerate ( [ node1 , node2 , node3 ] ) :
node . query (
" CREATE TABLE q (a UInt64, b String) ENGINE=ReplicatedMergeTree( ' /test/q ' , ' {num} ' ) ORDER BY tuple() " . format (
num = i
)
2022-03-22 16:39:58 +00:00
)
2020-10-08 13:14:44 +00:00
with PartitionManager ( ) as pm :
pm . partition_instances ( node2 , node1 , port = 9009 )
pm . partition_instances ( node2 , node3 , port = 9009 )
with pytest . raises ( QueryRuntimeException ) :
2020-10-09 07:19:17 +00:00
node1 . query (
" INSERT INTO q VALUES(1, ' Hello ' ) " ,
settings = {
" insert_quorum " : " 3 " ,
" insert_quorum_parallel " : " 1 " ,
" insert_quorum_timeout " : " 3000 " ,
2022-03-22 16:39:58 +00:00
} ,
2020-10-09 07:19:17 +00:00
)
2020-10-08 13:14:44 +00:00
assert_eq_with_retry ( node1 , " SELECT COUNT() FROM q " , " 1 " )
assert_eq_with_retry ( node2 , " SELECT COUNT() FROM q " , " 0 " )
assert_eq_with_retry ( node3 , " SELECT COUNT() FROM q " , " 1 " )
node1 . query (
" INSERT INTO q VALUES(2, ' wlrd ' ) " ,
settings = {
" insert_quorum " : " 2 " ,
" insert_quorum_parallel " : " 1 " ,
" insert_quorum_timeout " : " 3000 " ,
2022-03-22 16:39:58 +00:00
} ,
2020-10-08 13:14:44 +00:00
)
assert_eq_with_retry ( node1 , " SELECT COUNT() FROM q " , " 2 " )
assert_eq_with_retry ( node2 , " SELECT COUNT() FROM q " , " 0 " )
assert_eq_with_retry ( node3 , " SELECT COUNT() FROM q " , " 2 " )
def insert_value_to_node ( node , settings ) :
node . query ( " INSERT INTO q VALUES(3, ' Hi ' ) " , settings = settings )
2023-11-30 17:02:57 +00:00
def insert_fail_quorum_timeout ( node , settings ) :
if " insert_quorum_timeout " not in settings :
settings [ " insert_quorum_timeout " ] = " 1000 "
2023-11-30 17:35:39 +00:00
error = node . query_and_get_error (
" INSERT INTO q VALUES(3, ' Hi ' ) " , settings = settings
)
assert (
2023-12-19 13:16:24 +00:00
" DB::Exception: Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: Timeout while waiting for quorum "
2023-11-30 17:35:39 +00:00
in error
) , error
2023-11-30 17:02:57 +00:00
2020-10-08 13:14:44 +00:00
p = Pool ( 2 )
2020-10-09 07:19:17 +00:00
res = p . apply_async (
2023-11-30 17:02:57 +00:00
insert_fail_quorum_timeout ,
2020-10-09 07:19:17 +00:00
(
node1 ,
{
" insert_quorum " : " 3 " ,
" insert_quorum_parallel " : " 1 " ,
2023-11-30 17:02:57 +00:00
" insert_quorum_timeout " : " 1000 " ,
2020-10-09 07:19:17 +00:00
} ,
2022-03-22 16:39:58 +00:00
) ,
2020-10-09 07:19:17 +00:00
)
2022-03-22 16:39:58 +00:00
2020-10-09 07:19:17 +00:00
assert_eq_with_retry (
node1 ,
" SELECT COUNT() FROM system.parts WHERE table == ' q ' and active == 1 " ,
" 3 " ,
)
assert_eq_with_retry (
node3 ,
" SELECT COUNT() FROM system.parts WHERE table == ' q ' and active == 1 " ,
" 3 " ,
)
assert_eq_with_retry (
node2 ,
" SELECT COUNT() FROM system.parts WHERE table == ' q ' and active == 1 " ,
" 0 " ,
2022-03-22 16:39:58 +00:00
)
2020-10-09 07:19:17 +00:00
# Insert to the second to satisfy quorum
2023-11-30 17:02:57 +00:00
insert_fail_quorum_timeout (
2023-11-30 17:35:39 +00:00
node2 ,
{
" insert_quorum " : " 3 " ,
" insert_quorum_parallel " : " 1 " ,
" insert_quorum_timeout " : " 1000 " ,
} ,
2020-10-09 07:19:17 +00:00
)
res . get ( )
2020-10-08 13:14:44 +00:00
assert_eq_with_retry ( node1 , " SELECT COUNT() FROM q " , " 3 " )
2023-11-30 17:02:57 +00:00
assert_eq_with_retry ( node2 , " SELECT COUNT() FROM q " , " 0 " )
2020-10-08 13:14:44 +00:00
assert_eq_with_retry ( node3 , " SELECT COUNT() FROM q " , " 3 " )
p . close ( )
p . join ( )
node2 . query ( " SYSTEM SYNC REPLICA q " , timeout = 10 )
assert_eq_with_retry ( node2 , " SELECT COUNT() FROM q " , " 3 " )