2021-01-20 16:25:30 +00:00
|
|
|
#include <Coordination/TestKeeperStorageDispatcher.h>
|
2021-01-19 14:22:28 +00:00
|
|
|
#include <Common/setThreadName.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int TIMEOUT_EXCEEDED;
|
|
|
|
}
|
|
|
|
|
2021-02-01 11:27:26 +00:00
|
|
|
TestKeeperStorageDispatcher::TestKeeperStorageDispatcher()
|
|
|
|
: log(&Poco::Logger::get("TestKeeperDispatcher"))
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
void TestKeeperStorageDispatcher::processingThread()
|
|
|
|
{
|
|
|
|
setThreadName("TestKeeperSProc");
|
2021-01-26 14:08:31 +00:00
|
|
|
while (!shutdown_called)
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
2021-01-26 07:47:04 +00:00
|
|
|
TestKeeperStorage::RequestForSession request;
|
2021-01-19 14:22:28 +00:00
|
|
|
|
2021-01-26 07:47:04 +00:00
|
|
|
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
|
2021-01-19 14:22:28 +00:00
|
|
|
|
2021-01-26 07:47:04 +00:00
|
|
|
if (requests_queue.tryPop(request, max_wait))
|
|
|
|
{
|
2021-01-26 14:08:31 +00:00
|
|
|
if (shutdown_called)
|
2021-01-26 07:47:04 +00:00
|
|
|
break;
|
2021-01-26 14:08:31 +00:00
|
|
|
|
2021-01-26 07:47:04 +00:00
|
|
|
try
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
2021-01-25 12:29:12 +00:00
|
|
|
auto responses = server->putRequests({request});
|
2021-01-19 14:22:28 +00:00
|
|
|
for (const auto & response_for_session : responses)
|
|
|
|
setResponse(response_for_session.session_id, response_for_session.response);
|
|
|
|
}
|
2021-01-26 07:47:04 +00:00
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void TestKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
|
|
|
auto session_writer = session_to_response_callback.find(session_id);
|
|
|
|
if (session_writer == session_to_response_callback.end())
|
2021-01-21 11:37:20 +00:00
|
|
|
return;
|
2021-01-19 14:22:28 +00:00
|
|
|
|
|
|
|
session_writer->second(response);
|
|
|
|
/// Session closed, no more writes
|
|
|
|
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
|
|
|
|
session_to_response_callback.erase(session_writer);
|
|
|
|
}
|
|
|
|
|
2021-01-25 12:29:12 +00:00
|
|
|
bool TestKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
|
|
|
|
|
|
|
{
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
|
|
|
if (session_to_response_callback.count(session_id) == 0)
|
2021-01-25 12:29:12 +00:00
|
|
|
return false;
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2021-01-22 16:04:57 +00:00
|
|
|
TestKeeperStorage::RequestForSession request_info;
|
2021-01-19 14:22:28 +00:00
|
|
|
request_info.request = request;
|
|
|
|
request_info.session_id = session_id;
|
|
|
|
|
|
|
|
std::lock_guard lock(push_request_mutex);
|
|
|
|
/// Put close requests without timeouts
|
|
|
|
if (request->getOpNum() == Coordination::OpNum::Close)
|
|
|
|
requests_queue.push(std::move(request_info));
|
|
|
|
else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
|
|
|
|
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
|
2021-01-25 12:29:12 +00:00
|
|
|
return true;
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2021-01-28 12:07:26 +00:00
|
|
|
namespace
|
|
|
|
{
|
|
|
|
bool shouldBuildQuorum(int32_t myid, int32_t my_priority, bool my_can_become_leader, const std::vector<std::tuple<int, std::string, int, bool, int32_t>> & server_configs)
|
|
|
|
{
|
|
|
|
if (!my_can_become_leader)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
int32_t minid = myid;
|
|
|
|
bool has_equal_priority = false;
|
|
|
|
for (const auto & [id, hostname, port, can_become_leader, priority] : server_configs)
|
|
|
|
{
|
|
|
|
if (my_priority < priority)
|
|
|
|
return false;
|
|
|
|
else if (my_priority == priority)
|
|
|
|
has_equal_priority = true;
|
|
|
|
minid = std::min(minid, id);
|
|
|
|
}
|
|
|
|
|
|
|
|
if (has_equal_priority)
|
|
|
|
return minid == myid;
|
|
|
|
else
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
2021-01-25 12:29:12 +00:00
|
|
|
|
|
|
|
void TestKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config)
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
2021-02-01 11:27:26 +00:00
|
|
|
LOG_DEBUG(log, "Initializing storage dispatcher");
|
2021-01-25 12:29:12 +00:00
|
|
|
int myid = config.getInt("test_keeper_server.server_id");
|
|
|
|
std::string myhostname;
|
|
|
|
int myport;
|
2021-01-28 12:07:26 +00:00
|
|
|
int32_t my_priority = 1;
|
2021-01-25 12:29:12 +00:00
|
|
|
|
|
|
|
Poco::Util::AbstractConfiguration::Keys keys;
|
|
|
|
config.keys("test_keeper_server.raft_configuration", keys);
|
2021-01-25 14:10:18 +00:00
|
|
|
bool my_can_become_leader = true;
|
2021-01-25 12:29:12 +00:00
|
|
|
|
2021-01-28 12:07:26 +00:00
|
|
|
std::vector<std::tuple<int, std::string, int, bool, int32_t>> server_configs;
|
2021-01-27 17:54:25 +00:00
|
|
|
std::vector<int32_t> ids;
|
2021-01-25 12:29:12 +00:00
|
|
|
for (const auto & server_key : keys)
|
|
|
|
{
|
|
|
|
int server_id = config.getInt("test_keeper_server.raft_configuration." + server_key + ".id");
|
|
|
|
std::string hostname = config.getString("test_keeper_server.raft_configuration." + server_key + ".hostname");
|
|
|
|
int port = config.getInt("test_keeper_server.raft_configuration." + server_key + ".port");
|
2021-01-25 14:10:18 +00:00
|
|
|
bool can_become_leader = config.getBool("test_keeper_server.raft_configuration." + server_key + ".can_become_leader", true);
|
2021-01-28 12:07:26 +00:00
|
|
|
int32_t priority = config.getInt("test_keeper_server.raft_configuration." + server_key + ".priority", 1);
|
2021-01-25 12:29:12 +00:00
|
|
|
if (server_id == myid)
|
|
|
|
{
|
|
|
|
myhostname = hostname;
|
|
|
|
myport = port;
|
2021-01-25 14:10:18 +00:00
|
|
|
my_can_become_leader = can_become_leader;
|
2021-01-28 12:07:26 +00:00
|
|
|
my_priority = priority;
|
2021-01-25 12:29:12 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-01-28 12:07:26 +00:00
|
|
|
server_configs.emplace_back(server_id, hostname, port, can_become_leader, priority);
|
2021-01-25 12:29:12 +00:00
|
|
|
}
|
2021-01-27 17:54:25 +00:00
|
|
|
ids.push_back(server_id);
|
2021-01-25 12:29:12 +00:00
|
|
|
}
|
|
|
|
|
2021-01-27 17:54:25 +00:00
|
|
|
server = std::make_unique<NuKeeperServer>(myid, myhostname, myport);
|
2021-02-01 11:27:26 +00:00
|
|
|
try
|
2021-01-25 14:10:18 +00:00
|
|
|
{
|
2021-02-01 11:27:26 +00:00
|
|
|
server->startup();
|
|
|
|
if (shouldBuildQuorum(myid, my_priority, my_can_become_leader, server_configs))
|
2021-01-27 17:54:25 +00:00
|
|
|
{
|
2021-02-01 11:27:26 +00:00
|
|
|
for (const auto & [id, hostname, port, can_become_leader, priority] : server_configs)
|
2021-01-27 17:54:25 +00:00
|
|
|
{
|
2021-02-01 11:27:26 +00:00
|
|
|
LOG_DEBUG(log, "Adding server with id {} ({}:{})", id, hostname, port);
|
|
|
|
do
|
|
|
|
{
|
|
|
|
server->addServer(id, hostname + ":" + std::to_string(port), can_become_leader, priority);
|
|
|
|
}
|
|
|
|
while (!server->waitForServer(id));
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Server with id {} ({}:{}) added to cluster", id, hostname, port);
|
2021-01-27 17:54:25 +00:00
|
|
|
}
|
2021-02-01 11:27:26 +00:00
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
LOG_DEBUG(log, "Waiting for {} servers to build cluster", ids.size());
|
|
|
|
server->waitForServers(ids);
|
|
|
|
server->waitForCatchUp();
|
2021-01-27 17:54:25 +00:00
|
|
|
}
|
|
|
|
}
|
2021-02-01 11:27:26 +00:00
|
|
|
catch (...)
|
2021-01-27 17:54:25 +00:00
|
|
|
{
|
2021-02-01 11:27:26 +00:00
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
throw;
|
2021-01-25 14:10:18 +00:00
|
|
|
}
|
2021-01-25 12:29:12 +00:00
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
processing_thread = ThreadFromGlobalPool([this] { processingThread(); });
|
2021-01-25 12:29:12 +00:00
|
|
|
|
2021-02-01 11:27:26 +00:00
|
|
|
LOG_DEBUG(log, "Dispatcher initialized");
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2021-01-26 14:08:31 +00:00
|
|
|
void TestKeeperStorageDispatcher::shutdown()
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2021-01-26 14:08:31 +00:00
|
|
|
{
|
|
|
|
std::lock_guard lock(push_request_mutex);
|
|
|
|
|
|
|
|
if (shutdown_called)
|
|
|
|
return;
|
|
|
|
|
2021-02-01 11:27:26 +00:00
|
|
|
LOG_DEBUG(log, "Shutting down storage dispatcher");
|
2021-01-26 14:08:31 +00:00
|
|
|
shutdown_called = true;
|
|
|
|
|
|
|
|
if (processing_thread.joinable())
|
|
|
|
processing_thread.join();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (server)
|
|
|
|
{
|
|
|
|
TestKeeperStorage::RequestsForSessions expired_requests;
|
2021-01-29 12:39:04 +00:00
|
|
|
TestKeeperStorage::RequestForSession request;
|
|
|
|
while (requests_queue.tryPop(request))
|
|
|
|
expired_requests.push_back(TestKeeperStorage::RequestForSession{request});
|
2021-01-26 14:08:31 +00:00
|
|
|
|
|
|
|
auto expired_responses = server->shutdown(expired_requests);
|
|
|
|
|
|
|
|
for (const auto & response_for_session : expired_responses)
|
|
|
|
setResponse(response_for_session.session_id, response_for_session.response);
|
|
|
|
}
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
2021-02-01 11:27:26 +00:00
|
|
|
|
|
|
|
LOG_DEBUG(log, "Dispatcher shut down");
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|
|
|
|
|
2021-01-26 14:08:31 +00:00
|
|
|
TestKeeperStorageDispatcher::~TestKeeperStorageDispatcher()
|
|
|
|
{
|
|
|
|
shutdown();
|
|
|
|
}
|
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
void TestKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
|
|
|
if (!session_to_response_callback.try_emplace(session_id, callback).second)
|
|
|
|
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
|
|
|
|
}
|
|
|
|
|
2021-01-21 11:37:20 +00:00
|
|
|
void TestKeeperStorageDispatcher::finishSession(int64_t session_id)
|
|
|
|
{
|
|
|
|
std::lock_guard lock(session_to_response_callback_mutex);
|
|
|
|
auto session_it = session_to_response_callback.find(session_id);
|
|
|
|
if (session_it != session_to_response_callback.end())
|
|
|
|
session_to_response_callback.erase(session_it);
|
|
|
|
}
|
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
}
|