Add coordination settings

This commit is contained in:
alesapin 2021-02-09 17:47:18 +03:00
parent 9667bdcbd2
commit b130fbfd78
12 changed files with 139 additions and 55 deletions

View File

@ -19,12 +19,16 @@ namespace ErrorCodes
extern const int RAFT_ERROR;
}
NuKeeperServer::NuKeeperServer(int server_id_, const std::string & hostname_, int port_, ResponsesQueue & responses_queue_)
NuKeeperServer::NuKeeperServer(
int server_id_, const std::string & hostname_, int port_,
const CoordinationSettingsPtr & coordination_settings_,
ResponsesQueue & responses_queue_)
: server_id(server_id_)
, hostname(hostname_)
, port(port_)
, endpoint(hostname + ":" + std::to_string(port))
, state_machine(nuraft::cs_new<NuKeeperStateMachine>(responses_queue_))
, coordination_settings(coordination_settings_)
, state_machine(nuraft::cs_new<NuKeeperStateMachine>(responses_queue_, coordination_settings))
, state_manager(nuraft::cs_new<InMemoryStateManager>(server_id, endpoint))
, responses_queue(responses_queue_)
{
@ -43,17 +47,18 @@ void NuKeeperServer::addServer(int server_id_, const std::string & server_uri_,
}
void NuKeeperServer::startup(int64_t operation_timeout_ms)
void NuKeeperServer::startup()
{
nuraft::raft_params params;
params.heart_beat_interval_ = 500;
params.election_timeout_lower_bound_ = 1000;
params.election_timeout_upper_bound_ = 2000;
params.reserved_log_items_ = 5000;
params.snapshot_distance_ = 5000;
params.client_req_timeout_ = operation_timeout_ms;
params.auto_forwarding_ = true;
params.auto_forwarding_req_timeout_ = operation_timeout_ms * 2;
params.heart_beat_interval_ = coordination_settings->heart_beat_interval_ms.totalMilliseconds();
params.election_timeout_lower_bound_ = coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds();
params.election_timeout_upper_bound_ = coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds();
params.reserved_log_items_ = coordination_settings->reserved_log_items;
params.snapshot_distance_ = coordination_settings->snapshot_distance;
params.client_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds();
params.auto_forwarding_ = coordination_settings->auto_forwarding;
params.auto_forwarding_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds() * 2;
params.return_method_ = nuraft::raft_params::blocking;
nuraft::asio_service::options asio_opts{};
@ -65,6 +70,7 @@ void NuKeeperServer::startup(int64_t operation_timeout_ms)
if (!raft_instance)
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
/// FIXME
static constexpr auto MAX_RETRY = 100;
for (size_t i = 0; i < MAX_RETRY; ++i)
{
@ -80,7 +86,7 @@ void NuKeeperServer::startup(int64_t operation_timeout_ms)
void NuKeeperServer::shutdown()
{
state_machine->shutdownStorage();
if (!launcher.shutdown(5))
if (!launcher.shutdown(coordination_settings->shutdown_timeout.totalSeconds()))
LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Failed to shutdown RAFT server in {} seconds", 5);
}
@ -173,6 +179,7 @@ bool NuKeeperServer::isLeaderAlive() const
bool NuKeeperServer::waitForServer(int32_t id) const
{
/// FIXME
for (size_t i = 0; i < 50; ++i)
{
if (raft_instance->get_srv_config(id) != nullptr)
@ -180,17 +187,22 @@ bool NuKeeperServer::waitForServer(int32_t id) const
LOG_DEBUG(&Poco::Logger::get("NuRaftInit"), "Waiting for server {} to join the cluster", id);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
LOG_DEBUG(&Poco::Logger::get("NuRaftInit"), "Cannot wait for server {}", id);
return false;
}
void NuKeeperServer::waitForServers(const std::vector<int32_t> & ids) const
bool NuKeeperServer::waitForServers(const std::vector<int32_t> & ids) const
{
for (int32_t id : ids)
waitForServer(id);
if (!waitForServer(id))
return false;
return true;
}
void NuKeeperServer::waitForCatchUp() const
{
/// FIXME
while (raft_instance->is_catching_up() || raft_instance->is_receiving_snapshot() || raft_instance->is_leader())
{
LOG_DEBUG(&Poco::Logger::get("NuRaftInit"), "Waiting current RAFT instance to catch up");

View File

@ -5,6 +5,7 @@
#include <Coordination/InMemoryStateManager.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/NuKeeperStorage.h>
#include <Coordination/CoordinationSettings.h>
#include <unordered_map>
namespace DB
@ -21,6 +22,8 @@ private:
std::string endpoint;
CoordinationSettingsPtr coordination_settings;
nuraft::ptr<NuKeeperStateMachine> state_machine;
nuraft::ptr<nuraft::state_mgr> state_manager;
@ -34,9 +37,12 @@ private:
ResponsesQueue & responses_queue;
public:
NuKeeperServer(int server_id_, const std::string & hostname_, int port_, ResponsesQueue & responses_queue_);
NuKeeperServer(
int server_id_, const std::string & hostname_, int port_,
const CoordinationSettingsPtr & coordination_settings_,
ResponsesQueue & responses_queue_);
void startup(int64_t operation_timeout_ms);
void startup();
void putRequest(const NuKeeperStorage::RequestForSession & request);
@ -51,7 +57,7 @@ public:
bool isLeaderAlive() const;
bool waitForServer(int32_t server_id) const;
void waitForServers(const std::vector<int32_t> & ids) const;
bool waitForServers(const std::vector<int32_t> & ids) const;
void waitForCatchUp() const;
void shutdown();

View File

@ -8,8 +8,6 @@
namespace DB
{
static constexpr int MAX_SNAPSHOTS = 3;
NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
{
ReadBufferFromNuraftBuffer buffer(data);
@ -43,8 +41,9 @@ nuraft::ptr<nuraft::buffer> writeResponses(NuKeeperStorage::ResponsesForSessions
}
NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, int64_t tick_time)
: storage(tick_time)
NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, const CoordinationSettingsPtr & coordination_settings_)
: coordination_settings(coordination_settings_)
, storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds())
, responses_queue(responses_queue_)
, last_committed_idx(0)
, log(&Poco::Logger::get("NuRaftStateMachine"))
@ -129,7 +128,7 @@ NuKeeperStateMachine::StorageSnapshotPtr NuKeeperStateMachine::readSnapshot(nura
NuKeeperStorageSerializer serializer;
ReadBufferFromNuraftBuffer reader(in);
NuKeeperStorage new_storage(500 /*FIXME*/);
NuKeeperStorage new_storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds());
serializer.deserialize(new_storage, reader);
return std::make_shared<StorageSnapshot>(ss, new_storage);
}
@ -153,15 +152,19 @@ void NuKeeperStateMachine::create_snapshot(
{
std::lock_guard<std::mutex> lock(snapshots_lock);
snapshots[s.get_last_log_idx()] = snapshot;
int num = snapshots.size();
auto entry = snapshots.begin();
for (int i = 0; i < num - MAX_SNAPSHOTS; ++i)
size_t num = snapshots.size();
if (num > coordination_settings->max_stored_snapshots)
{
if (entry == snapshots.end())
break;
entry = snapshots.erase(entry);
auto entry = snapshots.begin();
for (size_t i = 0; i < num - coordination_settings->max_stored_snapshots; ++i)
{
if (entry == snapshots.end())
break;
entry = snapshots.erase(entry);
}
}
}
nuraft::ptr<std::exception> except(nullptr);
bool ret = true;

View File

@ -4,6 +4,7 @@
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <common/logger_useful.h>
#include <Coordination/ThreadSafeQueue.h>
#include <Coordination/CoordinationSettings.h>
namespace DB
{
@ -13,7 +14,7 @@ using ResponsesQueue = ThreadSafeQueue<NuKeeperStorage::ResponseForSession>;
class NuKeeperStateMachine : public nuraft::state_machine
{
public:
NuKeeperStateMachine(ResponsesQueue & responses_queue_, int64_t tick_time = 500);
NuKeeperStateMachine(ResponsesQueue & responses_queue_, const CoordinationSettingsPtr & coordination_settings_);
nuraft::ptr<nuraft::buffer> pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
@ -72,10 +73,12 @@ private:
StorageSnapshotPtr createSnapshotInternal(nuraft::snapshot & s);
static StorageSnapshotPtr readSnapshot(nuraft::snapshot & s, nuraft::buffer & in);
StorageSnapshotPtr readSnapshot(nuraft::snapshot & s, nuraft::buffer & in);
static void writeSnapshot(const StorageSnapshotPtr & snapshot, nuraft::ptr<nuraft::buffer> & out);
CoordinationSettingsPtr coordination_settings;
NuKeeperStorage storage;
ResponsesQueue & responses_queue;

View File

@ -12,7 +12,8 @@ namespace ErrorCodes
}
NuKeeperStorageDispatcher::NuKeeperStorageDispatcher()
: log(&Poco::Logger::get("NuKeeperDispatcher"))
: coordination_settings(std::make_shared<CoordinationSettings>())
, log(&Poco::Logger::get("NuKeeperDispatcher"))
{
}
@ -23,7 +24,7 @@ void NuKeeperStorageDispatcher::requestThread()
{
NuKeeperStorage::RequestForSession request;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds());
if (requests_queue.tryPop(request, max_wait))
{
@ -49,7 +50,7 @@ void NuKeeperStorageDispatcher::responseThread()
{
NuKeeperStorage::ResponseForSession response_for_session;
UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds());
UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds());
if (responses_queue.tryPop(response_for_session, max_wait))
{
@ -97,7 +98,7 @@ bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestP
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue.push(std::move(request_info));
else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds()))
else if (!requests_queue.tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
@ -134,8 +135,8 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
std::string myhostname;
int myport;
int32_t my_priority = 1;
coordination_settings->loadFromConfig("test_keeper_server.coordination_settings", config);
operation_timeout = Poco::Timespan(0, config.getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000);
Poco::Util::AbstractConfiguration::Keys keys;
config.keys("test_keeper_server.raft_configuration", keys);
bool my_can_become_leader = true;
@ -163,10 +164,10 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
ids.push_back(server_id);
}
server = std::make_unique<NuKeeperServer>(myid, myhostname, myport, responses_queue);
server = std::make_unique<NuKeeperServer>(myid, myhostname, myport, coordination_settings, responses_queue);
try
{
server->startup(operation_timeout.totalMilliseconds());
server->startup();
if (shouldBuildQuorum(myid, my_priority, my_can_become_leader, server_configs))
{
for (const auto & [id, hostname, port, can_become_leader, priority] : server_configs)
@ -183,8 +184,8 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
}
else
{
LOG_DEBUG(log, "Waiting for {} servers to build cluster", ids.size());
server->waitForServers(ids);
while (!server->waitForServers(ids))
LOG_DEBUG(log, "Waiting for {} servers to build cluster", ids.size());
server->waitForCatchUp();
}
}
@ -283,8 +284,7 @@ void NuKeeperStorageDispatcher::sessionCleanerTask()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/*FIXME*/
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::this_thread::sleep_for(std::chrono::milliseconds(coordination_settings->dead_session_check_period_ms.totalMilliseconds()));
}
}

View File

@ -14,6 +14,7 @@
#include <common/logger_useful.h>
#include <functional>
#include <Coordination/NuKeeperServer.h>
#include <Coordination/CoordinationSettings.h>
namespace DB
@ -25,10 +26,9 @@ class NuKeeperStorageDispatcher
{
private:
Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000};
std::mutex push_request_mutex;
CoordinationSettingsPtr coordination_settings;
using RequestsQueue = ConcurrentBoundedQueue<NuKeeperStorage::RequestForSession>;
RequestsQueue requests_queue{1};
ResponsesQueue responses_queue;

View File

@ -1,9 +1,13 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<server_id>1</server_id>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>

View File

@ -1,9 +1,13 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>10000</operation_timeout_ms>
<session_timeout_ms>30000</session_timeout_ms>
<server_id>1</server_id>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>

View File

@ -1,9 +1,13 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<server_id>1</server_id>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>

View File

@ -1,9 +1,13 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<server_id>2</server_id>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>

View File

@ -1,9 +1,13 @@
<yandex>
<test_keeper_server>
<tcp_port>9181</tcp_port>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<server_id>3</server_id>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>

View File

@ -52,6 +52,47 @@ def get_fake_zk(nodename):
_fake_zk_instance.start()
return _fake_zk_instance
def test_read_write_multinode(started_cluster):
try:
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3")
node1_zk.create("/test_read_write_multinode_node1", b"somedata1")
node2_zk.create("/test_read_write_multinode_node2", b"somedata2")
node3_zk.create("/test_read_write_multinode_node3", b"somedata3")
# stale reads are allowed
while node1_zk.exists("/test_read_write_multinode_node2") is None:
time.sleep(0.1)
while node1_zk.exists("/test_read_write_multinode_node3") is None:
time.sleep(0.1)
while node2_zk.exists("/test_read_write_multinode_node3") is None:
time.sleep(0.1)
assert node3_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1"
assert node2_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1"
assert node1_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1"
assert node3_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2"
assert node2_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2"
assert node1_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2"
assert node3_zk.get("/test_read_write_multinode_node3")[0] == b"somedata3"
assert node2_zk.get("/test_read_write_multinode_node3")[0] == b"somedata3"
assert node1_zk.get("/test_read_write_multinode_node3")[0] == b"somedata3"
finally:
try:
for zk_conn in [node1_zk, node2_zk, node3_zk]:
zk_conn.stop()
zk_conn.close()
except:
pass
def test_watch_on_follower(started_cluster):
try:
node1_zk = get_fake_zk("node1")
@ -105,7 +146,6 @@ def test_watch_on_follower(started_cluster):
pass
# in extremely rare case it can take more than 5 minutes in debug build with sanitizer
@pytest.mark.timeout(600)
def test_blocade_leader(started_cluster):