mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
RMT: Do not block retries when establishing a new keeper connection
This commit is contained in:
parent
f59ed02acb
commit
8225acae3a
@ -221,14 +221,17 @@ void StorageReplicatedMergeTree::setZooKeeper()
|
||||
/// strange effects. So we always use only one session for all tables.
|
||||
/// (excluding auxiliary zookeepers)
|
||||
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
if (zookeeper_name == default_zookeeper_name)
|
||||
{
|
||||
current_zookeeper = getContext()->getZooKeeper();
|
||||
auto new_keeper = getContext()->getZooKeeper();
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
current_zookeeper = new_keeper;
|
||||
}
|
||||
else
|
||||
{
|
||||
current_zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
auto new_keeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
|
||||
std::lock_guard lock(current_zookeeper_mutex);
|
||||
current_zookeeper = new_keeper;
|
||||
}
|
||||
}
|
||||
|
||||
@ -365,7 +368,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
bool has_zookeeper = getContext()->hasZooKeeper() || getContext()->hasAuxiliaryZooKeeper(zookeeper_name);
|
||||
if (has_zookeeper)
|
||||
{
|
||||
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
|
||||
/// It's possible for getZooKeeper() to timeout if zookeeper host(s) can't
|
||||
/// be reached. In such cases Poco::Exception is thrown after a connection
|
||||
/// timeout - refer to src/Common/ZooKeeper/ZooKeeperImpl.cpp:866 for more info.
|
||||
///
|
||||
|
@ -3,6 +3,7 @@
|
||||
import pytest
|
||||
import time
|
||||
import threading
|
||||
import uuid
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from multiprocessing.dummy import Pool
|
||||
from helpers.network import PartitionManager
|
||||
@ -118,3 +119,35 @@ def test_query_timeout_with_zk_down(started_cluster):
|
||||
finally:
|
||||
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||||
node1.query("DROP TABLE IF EXISTS zk_down SYNC")
|
||||
|
||||
|
||||
def test_retries_should_not_wait_for_global_connection(started_cluster):
|
||||
pm = PartitionManager()
|
||||
try:
|
||||
node1.query(
|
||||
"CREATE TABLE zk_down_retries (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/zk_down', '0') ORDER BY tuple()"
|
||||
)
|
||||
|
||||
cluster.stop_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||||
# Apart from stopping keepers, we introduce a network delay to make connection retries slower
|
||||
# We want to check that retries are not blocked during that time
|
||||
pm.add_network_delay(node1, 1000)
|
||||
|
||||
query_id = uuid.uuid4()
|
||||
|
||||
with pytest.raises(QueryRuntimeException):
|
||||
node1.query(
|
||||
"INSERT INTO zk_down_retries SELECT number, toString(number) FROM numbers(10) SETTINGS insert_keeper_max_retries=10, insert_keeper_retry_max_backoff_ms=100",
|
||||
query_id=str(query_id)
|
||||
)
|
||||
pm.heal_all()
|
||||
# Use query_log for execution time since we want to ignore the network delay introduced (also in client)
|
||||
node1.query("SYSTEM FLUSH LOGS")
|
||||
res = node1.query(f"SELECT query_duration_ms FROM system.query_log WHERE type != 'QueryStart' AND query_id = '{query_id}'")
|
||||
query_duration = int(res)
|
||||
# It should be around 1 second. 5 seconds is being generous (debug and so on). Used to take 35 seconds without the fix
|
||||
assert query_duration < 5000
|
||||
finally:
|
||||
pm.heal_all()
|
||||
cluster.start_zookeeper_nodes(["zoo1", "zoo2", "zoo3"])
|
||||
node1.query("DROP TABLE IF EXISTS zk_down SYNC")
|
||||
|
Loading…
Reference in New Issue
Block a user