Start keeper asynchronously if has connection to other nodes

This commit is contained in:
alesapin 2021-10-14 13:21:41 +03:00
parent 111c0672be
commit 90ff7f05fd
21 changed files with 357 additions and 14 deletions

View File

@ -359,7 +359,7 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
auto servers = std::make_shared<std::vector<ProtocolServerAdapter>>();
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeKeeperDispatcher();
global_context->initializeKeeperDispatcher(/* start_async = */false);
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper

View File

@ -986,8 +986,9 @@ if (ThreadFuzzer::instance().isEffective())
if (config().has("keeper_server"))
{
#if USE_NURAFT
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeKeeperDispatcher();
bool has_connection = has_zookeeper && global_context->tryCheckZooKeeperConnection();
/// Initialize keeper RAFT.
global_context->initializeKeeperDispatcher(has_connection);
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper

View File

@ -28,7 +28,7 @@ struct Settings;
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How many time we will until RAFT shutdown", 0) \
M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \
M(Milliseconds, startup_timeout, 180000, "How many time we will until RAFT to start", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \

View File

@ -241,7 +241,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
return true;
}
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper)
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async)
{
LOG_DEBUG(log, "Initializing storage dispatcher");
int myid = config.getInt("keeper_server.server_id");
@ -262,8 +262,16 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
server->startup();
LOG_DEBUG(log, "Server initialized, waiting for quorum");
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
if (!start_async)
{
server->waitInit();
LOG_DEBUG(log, "Quorum initialized");
}
else
{
LOG_INFO(log, "Starting Keeper asynchronously, server will accept connections to Keeper when it will be ready");
}
}
catch (...)
{
@ -363,7 +371,7 @@ void KeeperDispatcher::sessionCleanerTask()
try
{
/// Only leader node must check dead sessions
if (isLeader())
if (server->checkInit() && isLeader())
{
auto dead_sessions = server->getDeadSessions();

View File

@ -100,7 +100,12 @@ public:
/// Initialization from config.
/// standalone_keeper -- we are standalone keeper application (not inside clickhouse server)
void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper);
void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper, bool start_async);
bool checkInit() const
{
return server && server->checkInit();
}
/// Shutdown internal keeper parts (server, state machine, log storage, etc)
void shutdown();

View File

@ -353,6 +353,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
void KeeperServer::waitInit()
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
if (!initialized_cv.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return initialized_flag.load(); }))
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");

View File

@ -80,6 +80,12 @@ public:
/// Wait server initialization (see callbackFunc)
void waitInit();
/// Return true if KeeperServer initialized
bool checkInit() const
{
return initialized_flag;
}
void shutdown();
int getServerID() const { return server_id; }

View File

@ -1749,6 +1749,20 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper;
}
bool Context::tryCheckZooKeeperConnection() const
{
try
{
getZooKeeper();
return true;
}
catch (...)
{
return false;
}
}
UInt32 Context::getZooKeeperSessionUptime() const
{
std::lock_guard lock(shared->zookeeper_mutex);
@ -1776,19 +1790,24 @@ void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log);
}
void Context::initializeKeeperDispatcher() const
void Context::initializeKeeperDispatcher(bool start_async) const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
if (shared->keeper_storage_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times");
const auto & config = getConfigRef();
if (config.has("keeper_server"))
{
bool is_standalone_app = getApplicationType() == ApplicationType::KEEPER;
if (start_async && !is_standalone_app)
LOG_INFO(shared->log, "Connected to ZooKeeper (or Keeper) before internal Keeper start, will wait for Keeper asynchronously");
shared->keeper_storage_dispatcher = std::make_shared<KeeperDispatcher>();
shared->keeper_storage_dispatcher->initialize(config, getApplicationType() == ApplicationType::KEEPER);
shared->keeper_storage_dispatcher->initialize(config, is_standalone_app, start_async);
}
#endif
}

View File

@ -666,12 +666,17 @@ public:
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
/// Try to connect to ZooKeeper using getZooKeeper. Useful for internal
/// Keeper start (check connection to some other node). Return true if
/// connected successfully (without exception).
bool tryCheckZooKeeperConnection() const;
UInt32 getZooKeeperSessionUptime() const;
#if USE_NURAFT
std::shared_ptr<KeeperDispatcher> & getKeeperDispatcher() const;
#endif
void initializeKeeperDispatcher() const;
void initializeKeeperDispatcher(bool start_async) const;
void shutdownKeeperDispatcher() const;
/// Set auxiliary zookeepers configuration at server starting or configuration reloading.

View File

@ -286,7 +286,7 @@ void KeeperTCPHandler::runImpl()
return;
}
if (keeper_dispatcher->hasLeader())
if (keeper_dispatcher->checkInit() && keeper_dispatcher->hasLeader())
{
try
{
@ -306,7 +306,8 @@ void KeeperTCPHandler::runImpl()
}
else
{
LOG_WARNING(log, "Ignoring user request, because no alive leader exist");
String reason = keeper_dispatcher->checkInit() ? "server is not initialized yet" : "no alive leader exists";
LOG_WARNING(log, "Ignoring user request, because {}", reason);
sendHandshake(false);
return;
}

View File

@ -2320,6 +2320,9 @@ class ClickHouseInstance:
def replace_config(self, path_to_config, replacement):
self.exec_in_container(["bash", "-c", "echo '{}' > {}".format(replacement, path_to_config)])
def replace_in_config(self, path_to_config, replace, replacement):
self.exec_in_container(["bash", "-c", f"sed -i 's/{replace}/{replacement}/g' {path_to_config}"])
def create_dir(self, destroy_dir=True):
"""Create the instance directory and all the needed files there."""

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>non_existing_node</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>non_existing_node</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
#!/usr/bin/env python3
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml'], stay_alive=True)
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def test_smoke():
try:
cluster.start()
node1_zk = get_fake_zk("node1")
node1_zk.create("/test_alive", b"aaaa")
finally:
cluster.shutdown()

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,32 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>44444</port>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>44444</port>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>44444</port>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,16 @@
<clickhouse>
<zookeeper>
<node index="1">
<host>node1</host>
<port>9181</port>
</node>
<node index="2">
<host>node2</host>
<port>9181</port>
</node>
<node index="3">
<host>node3</host>
<port>9181</port>
</node>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,84 @@
#!/usr/bin/env python3
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/keeper_conf.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/keeper_conf.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/keeper_conf.xml'], stay_alive=True)
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def start(node):
node.start_clickhouse()
def test_start_offline(started_cluster):
p = Pool(3)
try:
node1_zk = get_fake_zk("node1")
node1_zk.create("/test_alive", b"aaaa")
node1.stop_clickhouse()
node2.stop_clickhouse()
node3.stop_clickhouse()
time.sleep(3)
p.map(start, [node2, node3])
node2_zk = get_fake_zk("node2")
node2_zk.create("/test_dead", b"data")
finally:
p.map(start, [node1, node2, node3])
def test_start_non_existing(started_cluster):
p = Pool(3)
try:
node1.stop_clickhouse()
node2.stop_clickhouse()
node3.stop_clickhouse()
node1.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper1.xml', 'node3', 'non_existing_node')
node2.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper2.xml', 'node3', 'non_existing_node')
time.sleep(3)
p.map(start, [node2, node1])
node2_zk = get_fake_zk("node2")
node2_zk.create("/test_non_exising", b"data")
finally:
node1.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper1.xml', 'non_existing_node', 'node3')
node2.replace_in_config('/etc/clickhouse-server/config.d/enable_keeper2.xml', 'non_existing_node', 'node3')
p.map(start, [node1, node2, node3])
def test_restart_third_node(started_cluster):
node1_zk = get_fake_zk("node1")
node1_zk.create("/test_restart", b"aaaa")
node3.restart_clickhouse()
assert node3.contains_in_log("Connected to ZooKeeper (or Keeper) before internal Keeper start")