Wait on startup for Keeper

This commit is contained in:
Antonio Andelic 2022-09-21 15:12:16 +00:00
parent aa700836b8
commit 6798b500e9
22 changed files with 37 additions and 40 deletions

View File

@ -1282,8 +1282,18 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config().has("keeper_server"))
{
#if USE_NURAFT
//// If we don't have configured connection probably someone trying to use clickhouse-server instead
//// of clickhouse-keeper, so start synchronously.
bool can_initialize_keeper_async = false;
if (has_zookeeper) /// We have configured connection to some zookeeper cluster
{
/// If we cannot connect to some other node from our cluster then we have to wait our Keeper start
/// synchronously.
can_initialize_keeper_async = global_context->tryCheckClientConnectionToMyKeeperCluster();
}
/// Initialize keeper RAFT.
global_context->initializeKeeperDispatcher(/* start_async */ true);
global_context->initializeKeeperDispatcher(can_initialize_keeper_async);
FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher());
auto config_getter = [this] () -> const Poco::Util::AbstractConfiguration &

View File

@ -705,7 +705,7 @@ void KeeperServer::waitInit()
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
LOG_WARNING(log, "Failed to wait for RAFT initialization in {}ms, will continue in background", timeout);
}
std::vector<int64_t> KeeperServer::getDeadSessions()

View File

@ -3,7 +3,6 @@
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
cluster = ClickHouseCluster(__file__)
@ -16,7 +15,6 @@ node1 = cluster.add_instance(
def started_cluster():
try:
cluster.start()
keeper_utils.wait_until_connected(cluster, node1)
yield cluster
finally:

View File

@ -1,6 +1,5 @@
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
from kazoo.client import KazooClient, KazooState
from kazoo.security import ACL, make_digest_acl, make_acl
from kazoo.exceptions import (
@ -26,7 +25,6 @@ SUPERAUTH = "super:admin"
def started_cluster():
try:
cluster.start()
keeper_utils.wait_until_connected(cluster, node)
yield cluster
@ -457,7 +455,6 @@ def test_auth_snapshot(started_cluster):
)
node.restart_clickhouse()
keeper_utils.wait_until_connected(cluster, node)
connection = get_fake_zk()

View File

@ -1,6 +1,5 @@
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import random
import string
import os
@ -62,7 +61,6 @@ def stop_zk(zk):
def started_cluster():
try:
cluster.start()
keeper_utils.wait_until_connected(cluster, node)
yield cluster

View File

@ -2,7 +2,6 @@
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance(
@ -225,6 +224,5 @@ def test_invalid_configs(started_cluster):
"/etc/clickhouse-server/config.d/enable_keeper1.xml", NORMAL_CONFIG
)
node1.start_clickhouse()
keeper_utils.wait_until_connected(cluster, node1)
assert node1.query("SELECT 1") == "1\n"

View File

@ -2,7 +2,6 @@
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import random
import string
import os
@ -48,8 +47,6 @@ def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
yield cluster
finally:

View File

@ -31,7 +31,6 @@ NOT_SERVING_REQUESTS_ERROR_MSG = "This instance is not currently serving request
def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
yield cluster

View File

@ -45,7 +45,6 @@ TODO remove this when jepsen tests will be written.
def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
yield cluster
@ -65,10 +64,15 @@ def get_fake_zk(nodename, timeout=30.0):
return _fake_zk_instance
def wait_nodes():
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
# in extremely rare case it can take more than 5 minutes in debug build with sanitizer
@pytest.mark.timeout(600)
def test_blocade_leader(started_cluster):
for i in range(100):
wait_nodes()
try:
for i, node in enumerate([node1, node2, node3]):
node.query(
@ -272,6 +276,7 @@ def restart_replica_for_sure(node, table_name, zk_replica_path):
@pytest.mark.timeout(600)
def test_blocade_leader_twice(started_cluster):
for i in range(100):
wait_nodes()
try:
for i, node in enumerate([node1, node2, node3]):
node.query(

View File

@ -33,7 +33,6 @@ from kazoo.client import KazooClient, KazooState
def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
yield cluster
@ -45,6 +44,10 @@ def smaller_exception(ex):
return "\n".join(str(ex).split("\n")[0:2])
def wait_nodes():
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout
@ -55,6 +58,7 @@ def get_fake_zk(nodename, timeout=30.0):
def test_read_write_multinode(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3")
@ -96,6 +100,7 @@ def test_read_write_multinode(started_cluster):
def test_watch_on_follower(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3")
@ -152,6 +157,7 @@ def test_watch_on_follower(started_cluster):
def test_session_expiration(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3", timeout=3.0)
@ -193,6 +199,7 @@ def test_session_expiration(started_cluster):
def test_follower_restart(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1")
node1_zk.create("/test_restart_node", b"hello")
@ -217,6 +224,7 @@ def test_follower_restart(started_cluster):
def test_simple_replicated_table(started_cluster):
wait_nodes()
for i, node in enumerate([node1, node2, node3]):
node.query(
"CREATE TABLE t (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t', '{}') ORDER BY tuple()".format(

View File

@ -34,8 +34,6 @@ def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
yield cluster
finally:

View File

@ -2,7 +2,6 @@
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import time
import os
from kazoo.client import KazooClient, KazooState
@ -25,7 +24,6 @@ node3 = cluster.add_instance(
def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
yield cluster

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import random
import string
import os
@ -33,8 +32,6 @@ def started_cluster():
try:
cluster.start()
keeper_utils.wait_until_connected(cluster, node)
yield cluster
finally:
@ -51,7 +48,6 @@ def get_connection_zk(nodename, timeout=30.0):
def restart_clickhouse():
node.restart_clickhouse(kill=True)
keeper_utils.wait_until_connected(cluster, node)
def test_state_after_restart(started_cluster):

View File

@ -25,7 +25,6 @@ from kazoo.client import KazooClient, KazooState
def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
yield cluster

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import string
import os
import time

View File

@ -1,6 +1,5 @@
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import os
from kazoo.client import KazooClient
@ -28,7 +27,6 @@ def get_fake_zk(node, timeout=30.0):
def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2])
yield cluster

View File

@ -36,7 +36,6 @@ def create_random_path(prefix="", depth=1):
def started_cluster():
try:
cluster.start()
keeper_utils.wait_until_connected(cluster, node)
yield cluster

View File

@ -29,7 +29,6 @@ def wait_nodes():
def started_cluster():
try:
cluster.start()
wait_nodes()
yield cluster

View File

@ -3,7 +3,6 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import helpers.keeper_utils as keeper_utils
import random
import string
import os
@ -32,7 +31,6 @@ def get_fake_zk(nodename, timeout=30.0):
def test_smoke():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2])
node1_zk = get_fake_zk("node1")
node1_zk.create("/test_alive", b"aaaa")

View File

@ -40,7 +40,6 @@ def get_fake_zk(nodename, timeout=30.0):
def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2, node3])
yield cluster
@ -77,10 +76,10 @@ def test_start_offline(started_cluster):
p.map(start, [node2, node3])
assert node2.contains_in_log(
"Connected to ZooKeeper (or Keeper) before internal Keeper start"
"Cannot connect to ZooKeeper (or Keeper) before internal Keeper start"
)
assert node3.contains_in_log(
"Connected to ZooKeeper (or Keeper) before internal Keeper start"
"Cannot connect to ZooKeeper (or Keeper) before internal Keeper start"
)
node2_zk = get_fake_zk("node2")
@ -113,10 +112,10 @@ def test_start_non_existing(started_cluster):
p.map(start, [node2, node1])
assert node1.contains_in_log(
"Connected to ZooKeeper (or Keeper) before internal Keeper start"
"Cannot connect to ZooKeeper (or Keeper) before internal Keeper start"
)
assert node2.contains_in_log(
"Connected to ZooKeeper (or Keeper) before internal Keeper start"
"Cannot connect to ZooKeeper (or Keeper) before internal Keeper start"
)
node2_zk = get_fake_zk("node2")

View File

@ -30,7 +30,6 @@ from kazoo.client import KazooClient, KazooState
def started_cluster():
try:
cluster.start()
keeper_utils.wait_nodes(cluster, [node1, node2])
yield cluster
@ -42,6 +41,10 @@ def smaller_exception(ex):
return "\n".join(str(ex).split("\n")[0:2])
def wait_nodes():
keeper_utils.wait_nodes(cluster, [node1, node2])
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(
hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout
@ -52,6 +55,7 @@ def get_fake_zk(nodename, timeout=30.0):
def test_read_write_two_nodes(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
@ -83,6 +87,7 @@ def test_read_write_two_nodes(started_cluster):
def test_read_write_two_nodes_with_blocade(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1", timeout=5.0)
node2_zk = get_fake_zk("node2", timeout=5.0)

View File

@ -12,7 +12,6 @@ from kazoo.exceptions import (
)
import os
import time
import socket
cluster = ClickHouseCluster(__file__)