Fix some races and better reaction to leader change

This commit is contained in:
alesapin 2021-02-01 14:27:26 +03:00
parent 67412bd529
commit eb5c77f558
7 changed files with 54 additions and 19 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 9eb76db3ff1a78f672303b5b51dcbe0f9b22cf96
Subproject commit c6f8528ead61f7e4565164c6f15afef221235aa8

2
contrib/boost vendored

@ -1 +1 @@
Subproject commit b2368f43f37c4a592b17b1e9a474b93749c47319
Subproject commit 48f40ebb539220d328958f8823b094c0b07a4e79

View File

@ -33,7 +33,11 @@ void NuKeeperServer::addServer(int server_id_, const std::string & server_uri_,
{
nuraft::srv_config config(server_id_, 0, server_uri_, "", /* follower= */ !can_become_leader_, priority);
auto ret1 = raft_instance->add_srv(config);
if (ret1->get_result_code() != nuraft::cmd_result_code::OK)
auto code = ret1->get_result_code();
if (code == nuraft::cmd_result_code::TIMEOUT
|| code == nuraft::cmd_result_code::BAD_REQUEST
|| code == nuraft::cmd_result_code::NOT_LEADER
|| code == nuraft::cmd_result_code::FAILED)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot add server to RAFT quorum with code {}, message '{}'", ret1->get_result_code(), ret1->get_result_str());
}
@ -41,9 +45,9 @@ void NuKeeperServer::addServer(int server_id_, const std::string & server_uri_,
void NuKeeperServer::startup()
{
nuraft::raft_params params;
params.heart_beat_interval_ = 100;
params.election_timeout_lower_bound_ = 200;
params.election_timeout_upper_bound_ = 400;
params.heart_beat_interval_ = 1000;
params.election_timeout_lower_bound_ = 3000;
params.election_timeout_upper_bound_ = 6000;
params.reserved_log_items_ = 5000;
params.snapshot_distance_ = 5000;
params.client_req_timeout_ = 10000;
@ -59,7 +63,7 @@ void NuKeeperServer::startup()
if (!raft_instance)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
static constexpr auto MAX_RETRY = 30;
static constexpr auto MAX_RETRY = 100;
for (size_t i = 0; i < MAX_RETRY; ++i)
{
if (raft_instance->is_initialized())
@ -169,6 +173,8 @@ TestKeeperStorage::ResponsesForSessions NuKeeperServer::putRequests(const TestKe
entries.push_back(getZooKeeperLogEntry(session_id, request));
}
std::lock_guard lock(append_entries_mutex);
auto result = raft_instance->append_entries(entries);
if (!result->get_accepted())
{
@ -215,6 +221,8 @@ int64_t NuKeeperServer::getSessionID()
nuraft::buffer_serializer bs(entry);
bs.put_i64(0);
std::lock_guard lock(append_entries_mutex);
auto result = raft_instance->append_entries({entry});
if (!result->get_accepted())
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send session_id request to RAFT");

View File

@ -37,6 +37,8 @@ private:
TestKeeperStorage::ResponsesForSessions readZooKeeperResponses(nuraft::ptr<nuraft::buffer> & buffer);
std::mutex append_entries_mutex;
public:
NuKeeperServer(int server_id_, const std::string & hostname_, int port_);

View File

@ -11,6 +11,11 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}
TestKeeperStorageDispatcher::TestKeeperStorageDispatcher()
: log(&Poco::Logger::get("TestKeeperDispatcher"))
{
}
void TestKeeperStorageDispatcher::processingThread()
{
setThreadName("TestKeeperSProc");
@ -101,6 +106,7 @@ namespace
void TestKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config)
{
LOG_DEBUG(log, "Initializing storage dispatcher");
int myid = config.getInt("test_keeper_server.server_id");
std::string myhostname;
int myport;
@ -134,26 +140,39 @@ void TestKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigura
}
server = std::make_unique<NuKeeperServer>(myid, myhostname, myport);
try
{
server->startup();
if (shouldBuildQuorum(myid, my_priority, my_can_become_leader, server_configs))
{
for (const auto & [id, hostname, port, can_become_leader, priority] : server_configs)
{
LOG_DEBUG(log, "Adding server with id {} ({}:{})", id, hostname, port);
do
{
server->addServer(id, hostname + ":" + std::to_string(port), can_become_leader, priority);
}
while (!server->waitForServer(id));
LOG_DEBUG(log, "Server with id {} ({}:{}) added to cluster", id, hostname, port);
}
}
else
{
LOG_DEBUG(log, "Waiting for {} servers to build cluster", ids.size());
server->waitForServers(ids);
server->waitForCatchUp();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
LOG_DEBUG(log, "Dispatcher initialized");
}
void TestKeeperStorageDispatcher::shutdown()
@ -166,6 +185,7 @@ void TestKeeperStorageDispatcher::shutdown()
if (shutdown_called)
return;
LOG_DEBUG(log, "Shutting down storage dispatcher");
shutdown_called = true;
if (processing_thread.joinable())
@ -189,6 +209,8 @@ void TestKeeperStorageDispatcher::shutdown()
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
LOG_DEBUG(log, "Dispatcher shut down");
}
TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher()

View File

@ -5,6 +5,7 @@
#include <functional>
#include <Coordination/NuKeeperServer.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <common/logger_useful.h>
namespace DB
{
@ -30,14 +31,15 @@ private:
ThreadFromGlobalPool processing_thread;
std::unique_ptr<NuKeeperServer> server;
std::mutex session_id_mutex;
Poco::Logger * log;
private:
void processingThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:
TestKeeperStorageDispatcher() = default;
TestKeeperStorageDispatcher();
void initialize(const Poco::Util::AbstractConfiguration & config);
@ -59,7 +61,6 @@ public:
int64_t getSessionID()
{
std::lock_guard lock(session_id_mutex);
return server->getSessionID();
}

View File

@ -328,6 +328,7 @@ void TestKeeperTCPHandler::runImpl()
catch (const Exception & e)
{
LOG_WARNING(log, "Cannot receive session id {}", e.displayText());
sendHandshake(false);
return;
}
@ -336,6 +337,7 @@ void TestKeeperTCPHandler::runImpl()
}
else
{
LOG_WARNING(log, "Ignoring user request, because no alive leader exist");
sendHandshake(false);
return;
}