diff --git a/src/Coordination/NuKeeperServer.cpp b/src/Coordination/NuKeeperServer.cpp index cbd52b98377..40508b08761 100644 --- a/src/Coordination/NuKeeperServer.cpp +++ b/src/Coordination/NuKeeperServer.cpp @@ -19,12 +19,16 @@ namespace ErrorCodes extern const int RAFT_ERROR; } -NuKeeperServer::NuKeeperServer(int server_id_, const std::string & hostname_, int port_, ResponsesQueue & responses_queue_) +NuKeeperServer::NuKeeperServer( + int server_id_, const std::string & hostname_, int port_, + const CoordinationSettingsPtr & coordination_settings_, + ResponsesQueue & responses_queue_) : server_id(server_id_) , hostname(hostname_) , port(port_) , endpoint(hostname + ":" + std::to_string(port)) - , state_machine(nuraft::cs_new(responses_queue_)) + , coordination_settings(coordination_settings_) + , state_machine(nuraft::cs_new(responses_queue_, coordination_settings)) , state_manager(nuraft::cs_new(server_id, endpoint)) , responses_queue(responses_queue_) { @@ -43,17 +47,18 @@ void NuKeeperServer::addServer(int server_id_, const std::string & server_uri_, } -void NuKeeperServer::startup(int64_t operation_timeout_ms) +void NuKeeperServer::startup() { nuraft::raft_params params; - params.heart_beat_interval_ = 500; - params.election_timeout_lower_bound_ = 1000; - params.election_timeout_upper_bound_ = 2000; - params.reserved_log_items_ = 5000; - params.snapshot_distance_ = 5000; - params.client_req_timeout_ = operation_timeout_ms; - params.auto_forwarding_ = true; - params.auto_forwarding_req_timeout_ = operation_timeout_ms * 2; + params.heart_beat_interval_ = coordination_settings->heart_beat_interval_ms.totalMilliseconds(); + params.election_timeout_lower_bound_ = coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds(); + params.election_timeout_upper_bound_ = coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds(); + params.reserved_log_items_ = coordination_settings->reserved_log_items; + params.snapshot_distance_ = coordination_settings->snapshot_distance; + params.client_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds(); + params.auto_forwarding_ = coordination_settings->auto_forwarding; + params.auto_forwarding_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds() * 2; + params.return_method_ = nuraft::raft_params::blocking; nuraft::asio_service::options asio_opts{}; @@ -65,6 +70,7 @@ void NuKeeperServer::startup(int64_t operation_timeout_ms) if (!raft_instance) throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance"); + /// FIXME static constexpr auto MAX_RETRY = 100; for (size_t i = 0; i < MAX_RETRY; ++i) { @@ -80,7 +86,7 @@ void NuKeeperServer::startup(int64_t operation_timeout_ms) void NuKeeperServer::shutdown() { state_machine->shutdownStorage(); - if (!launcher.shutdown(5)) + if (!launcher.shutdown(coordination_settings->shutdown_timeout.totalSeconds())) LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Failed to shutdown RAFT server in {} seconds", 5); } @@ -173,6 +179,7 @@ bool NuKeeperServer::isLeaderAlive() const bool NuKeeperServer::waitForServer(int32_t id) const { + /// FIXME for (size_t i = 0; i < 50; ++i) { if (raft_instance->get_srv_config(id) != nullptr) @@ -180,17 +187,22 @@ bool NuKeeperServer::waitForServer(int32_t id) const LOG_DEBUG(&Poco::Logger::get("NuRaftInit"), "Waiting for server {} to join the cluster", id); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } + + LOG_DEBUG(&Poco::Logger::get("NuRaftInit"), "Cannot wait for server {}", id); return false; } -void NuKeeperServer::waitForServers(const std::vector & ids) const +bool NuKeeperServer::waitForServers(const std::vector & ids) const { for (int32_t id : ids) - waitForServer(id); + if (!waitForServer(id)) + return false; + return true; } void NuKeeperServer::waitForCatchUp() const { + /// FIXME while (raft_instance->is_catching_up() || raft_instance->is_receiving_snapshot() || raft_instance->is_leader()) { LOG_DEBUG(&Poco::Logger::get("NuRaftInit"), "Waiting current RAFT instance to catch up"); diff --git a/src/Coordination/NuKeeperServer.h b/src/Coordination/NuKeeperServer.h index 6fa2ae44ce2..bb5870fe89a 100644 --- a/src/Coordination/NuKeeperServer.h +++ b/src/Coordination/NuKeeperServer.h @@ -5,6 +5,7 @@ #include #include #include +#include #include namespace DB @@ -21,6 +22,8 @@ private: std::string endpoint; + CoordinationSettingsPtr coordination_settings; + nuraft::ptr state_machine; nuraft::ptr state_manager; @@ -34,9 +37,12 @@ private: ResponsesQueue & responses_queue; public: - NuKeeperServer(int server_id_, const std::string & hostname_, int port_, ResponsesQueue & responses_queue_); + NuKeeperServer( + int server_id_, const std::string & hostname_, int port_, + const CoordinationSettingsPtr & coordination_settings_, + ResponsesQueue & responses_queue_); - void startup(int64_t operation_timeout_ms); + void startup(); void putRequest(const NuKeeperStorage::RequestForSession & request); @@ -51,7 +57,7 @@ public: bool isLeaderAlive() const; bool waitForServer(int32_t server_id) const; - void waitForServers(const std::vector & ids) const; + bool waitForServers(const std::vector & ids) const; void waitForCatchUp() const; void shutdown(); diff --git a/src/Coordination/NuKeeperStateMachine.cpp b/src/Coordination/NuKeeperStateMachine.cpp index 9be8e889fa3..d282f57ce73 100644 --- a/src/Coordination/NuKeeperStateMachine.cpp +++ b/src/Coordination/NuKeeperStateMachine.cpp @@ -8,8 +8,6 @@ namespace DB { -static constexpr int MAX_SNAPSHOTS = 3; - NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data) { ReadBufferFromNuraftBuffer buffer(data); @@ -43,8 +41,9 @@ nuraft::ptr writeResponses(NuKeeperStorage::ResponsesForSessions } -NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, int64_t tick_time) - : storage(tick_time) +NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, const CoordinationSettingsPtr & coordination_settings_) + : coordination_settings(coordination_settings_) + , storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds()) , responses_queue(responses_queue_) , last_committed_idx(0) , log(&Poco::Logger::get("NuRaftStateMachine")) @@ -129,7 +128,7 @@ NuKeeperStateMachine::StorageSnapshotPtr NuKeeperStateMachine::readSnapshot(nura NuKeeperStorageSerializer serializer; ReadBufferFromNuraftBuffer reader(in); - NuKeeperStorage new_storage(500 /*FIXME*/); + NuKeeperStorage new_storage(coordination_settings->dead_session_check_period_ms.totalMilliseconds()); serializer.deserialize(new_storage, reader); return std::make_shared(ss, new_storage); } @@ -153,15 +152,19 @@ void NuKeeperStateMachine::create_snapshot( { std::lock_guard lock(snapshots_lock); snapshots[s.get_last_log_idx()] = snapshot; - int num = snapshots.size(); - auto entry = snapshots.begin(); - - for (int i = 0; i < num - MAX_SNAPSHOTS; ++i) + size_t num = snapshots.size(); + if (num > coordination_settings->max_stored_snapshots) { - if (entry == snapshots.end()) - break; - entry = snapshots.erase(entry); + auto entry = snapshots.begin(); + + for (size_t i = 0; i < num - coordination_settings->max_stored_snapshots; ++i) + { + if (entry == snapshots.end()) + break; + entry = snapshots.erase(entry); + } } + } nuraft::ptr except(nullptr); bool ret = true; diff --git a/src/Coordination/NuKeeperStateMachine.h b/src/Coordination/NuKeeperStateMachine.h index 5f3065ee144..87748db20a5 100644 --- a/src/Coordination/NuKeeperStateMachine.h +++ b/src/Coordination/NuKeeperStateMachine.h @@ -4,6 +4,7 @@ #include // Y_IGNORE #include #include +#include namespace DB { @@ -13,7 +14,7 @@ using ResponsesQueue = ThreadSafeQueue; class NuKeeperStateMachine : public nuraft::state_machine { public: - NuKeeperStateMachine(ResponsesQueue & responses_queue_, int64_t tick_time = 500); + NuKeeperStateMachine(ResponsesQueue & responses_queue_, const CoordinationSettingsPtr & coordination_settings_); nuraft::ptr pre_commit(const size_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; } @@ -72,10 +73,12 @@ private: StorageSnapshotPtr createSnapshotInternal(nuraft::snapshot & s); - static StorageSnapshotPtr readSnapshot(nuraft::snapshot & s, nuraft::buffer & in); + StorageSnapshotPtr readSnapshot(nuraft::snapshot & s, nuraft::buffer & in); static void writeSnapshot(const StorageSnapshotPtr & snapshot, nuraft::ptr & out); + CoordinationSettingsPtr coordination_settings; + NuKeeperStorage storage; ResponsesQueue & responses_queue; diff --git a/src/Coordination/NuKeeperStorageDispatcher.cpp b/src/Coordination/NuKeeperStorageDispatcher.cpp index 86bdae9cc37..914985ee534 100644 --- a/src/Coordination/NuKeeperStorageDispatcher.cpp +++ b/src/Coordination/NuKeeperStorageDispatcher.cpp @@ -12,7 +12,8 @@ namespace ErrorCodes } NuKeeperStorageDispatcher::NuKeeperStorageDispatcher() - : log(&Poco::Logger::get("NuKeeperDispatcher")) + : coordination_settings(std::make_shared()) + , log(&Poco::Logger::get("NuKeeperDispatcher")) { } @@ -23,7 +24,7 @@ void NuKeeperStorageDispatcher::requestThread() { NuKeeperStorage::RequestForSession request; - UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds()); + UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds()); if (requests_queue.tryPop(request, max_wait)) { @@ -49,7 +50,7 @@ void NuKeeperStorageDispatcher::responseThread() { NuKeeperStorage::ResponseForSession response_for_session; - UInt64 max_wait = UInt64(operation_timeout.totalMilliseconds()); + UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds()); if (responses_queue.tryPop(response_for_session, max_wait)) { @@ -97,7 +98,7 @@ bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestP /// Put close requests without timeouts if (request->getOpNum() == Coordination::OpNum::Close) requests_queue.push(std::move(request_info)); - else if (!requests_queue.tryPush(std::move(request_info), operation_timeout.totalMilliseconds())) + else if (!requests_queue.tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds())) throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); return true; } @@ -134,8 +135,8 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati std::string myhostname; int myport; int32_t my_priority = 1; + coordination_settings->loadFromConfig("test_keeper_server.coordination_settings", config); - operation_timeout = Poco::Timespan(0, config.getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000); Poco::Util::AbstractConfiguration::Keys keys; config.keys("test_keeper_server.raft_configuration", keys); bool my_can_become_leader = true; @@ -163,10 +164,10 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati ids.push_back(server_id); } - server = std::make_unique(myid, myhostname, myport, responses_queue); + server = std::make_unique(myid, myhostname, myport, coordination_settings, responses_queue); try { - server->startup(operation_timeout.totalMilliseconds()); + server->startup(); if (shouldBuildQuorum(myid, my_priority, my_can_become_leader, server_configs)) { for (const auto & [id, hostname, port, can_become_leader, priority] : server_configs) @@ -183,8 +184,8 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati } else { - LOG_DEBUG(log, "Waiting for {} servers to build cluster", ids.size()); - server->waitForServers(ids); + while (!server->waitForServers(ids)) + LOG_DEBUG(log, "Waiting for {} servers to build cluster", ids.size()); server->waitForCatchUp(); } } @@ -283,8 +284,7 @@ void NuKeeperStorageDispatcher::sessionCleanerTask() tryLogCurrentException(__PRETTY_FUNCTION__); } - /*FIXME*/ - std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::this_thread::sleep_for(std::chrono::milliseconds(coordination_settings->dead_session_check_period_ms.totalMilliseconds())); } } diff --git a/src/Coordination/NuKeeperStorageDispatcher.h b/src/Coordination/NuKeeperStorageDispatcher.h index 6820247a5af..62144b92a7a 100644 --- a/src/Coordination/NuKeeperStorageDispatcher.h +++ b/src/Coordination/NuKeeperStorageDispatcher.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace DB @@ -25,10 +26,9 @@ class NuKeeperStorageDispatcher { private: - Poco::Timespan operation_timeout{0, Coordination::DEFAULT_OPERATION_TIMEOUT_MS * 1000}; - std::mutex push_request_mutex; + CoordinationSettingsPtr coordination_settings; using RequestsQueue = ConcurrentBoundedQueue; RequestsQueue requests_queue{1}; ResponsesQueue responses_queue; diff --git a/tests/config/config.d/test_keeper_port.xml b/tests/config/config.d/test_keeper_port.xml index fff60d749f6..6ca00a972d4 100644 --- a/tests/config/config.d/test_keeper_port.xml +++ b/tests/config/config.d/test_keeper_port.xml @@ -1,9 +1,13 @@ 9181 - 10000 - 30000 1 + + + 10000 + 30000 + + 1 diff --git a/tests/integration/test_testkeeper_back_to_back/configs/enable_test_keeper.xml b/tests/integration/test_testkeeper_back_to_back/configs/enable_test_keeper.xml index fff60d749f6..00a593051f9 100644 --- a/tests/integration/test_testkeeper_back_to_back/configs/enable_test_keeper.xml +++ b/tests/integration/test_testkeeper_back_to_back/configs/enable_test_keeper.xml @@ -1,9 +1,13 @@ 9181 - 10000 - 30000 1 + + + 5000 + 10000 + + 1 diff --git a/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper1.xml b/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper1.xml index e1b6da40338..75065bb2a7a 100644 --- a/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper1.xml +++ b/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper1.xml @@ -1,9 +1,13 @@ 9181 - 5000 - 10000 1 + + + 5000 + 10000 + + 1 diff --git a/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper2.xml b/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper2.xml index 7622aa164da..18937dd4910 100644 --- a/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper2.xml +++ b/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper2.xml @@ -1,9 +1,13 @@ 9181 - 5000 - 10000 2 + + + 5000 + 10000 + + 1 diff --git a/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper3.xml b/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper3.xml index 1edbfa7271e..5330367cd89 100644 --- a/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper3.xml +++ b/tests/integration/test_testkeeper_multinode/configs/enable_test_keeper3.xml @@ -1,9 +1,13 @@ 9181 - 5000 - 10000 3 + + + 5000 + 10000 + + 1 diff --git a/tests/integration/test_testkeeper_multinode/test.py b/tests/integration/test_testkeeper_multinode/test.py index ff001fb75ee..05879613ba6 100644 --- a/tests/integration/test_testkeeper_multinode/test.py +++ b/tests/integration/test_testkeeper_multinode/test.py @@ -52,6 +52,47 @@ def get_fake_zk(nodename): _fake_zk_instance.start() return _fake_zk_instance +def test_read_write_multinode(started_cluster): + try: + node1_zk = get_fake_zk("node1") + node2_zk = get_fake_zk("node2") + node3_zk = get_fake_zk("node3") + + node1_zk.create("/test_read_write_multinode_node1", b"somedata1") + node2_zk.create("/test_read_write_multinode_node2", b"somedata2") + node3_zk.create("/test_read_write_multinode_node3", b"somedata3") + + # stale reads are allowed + while node1_zk.exists("/test_read_write_multinode_node2") is None: + time.sleep(0.1) + + while node1_zk.exists("/test_read_write_multinode_node3") is None: + time.sleep(0.1) + + while node2_zk.exists("/test_read_write_multinode_node3") is None: + time.sleep(0.1) + + assert node3_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1" + assert node2_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1" + assert node1_zk.get("/test_read_write_multinode_node1")[0] == b"somedata1" + + assert node3_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2" + assert node2_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2" + assert node1_zk.get("/test_read_write_multinode_node2")[0] == b"somedata2" + + assert node3_zk.get("/test_read_write_multinode_node3")[0] == b"somedata3" + assert node2_zk.get("/test_read_write_multinode_node3")[0] == b"somedata3" + assert node1_zk.get("/test_read_write_multinode_node3")[0] == b"somedata3" + + finally: + try: + for zk_conn in [node1_zk, node2_zk, node3_zk]: + zk_conn.stop() + zk_conn.close() + except: + pass + + def test_watch_on_follower(started_cluster): try: node1_zk = get_fake_zk("node1") @@ -105,7 +146,6 @@ def test_watch_on_follower(started_cluster): pass - # in extremely rare case it can take more than 5 minutes in debug build with sanitizer @pytest.mark.timeout(600) def test_blocade_leader(started_cluster):