diff --git a/contrib/NuRaft b/contrib/NuRaft index 1be805e7cb2..e15858f8ad0 160000 --- a/contrib/NuRaft +++ b/contrib/NuRaft @@ -1 +1 @@ -Subproject commit 1be805e7cb2494aa8170015493474379b0362dfc +Subproject commit e15858f8ad0ce8aba85cf74e3763874c76bf927c diff --git a/src/Coordination/CoordinationSettings.cpp b/src/Coordination/CoordinationSettings.cpp index 3e03ee0d6f4..f634bcbb281 100644 --- a/src/Coordination/CoordinationSettings.cpp +++ b/src/Coordination/CoordinationSettings.cpp @@ -189,6 +189,9 @@ KeeperConfigurationAndSettings::loadFromConfig(const Poco::Util::AbstractConfigu ret->coordination_settings->loadFromConfig("keeper_server.coordination_settings", config); + if (ret->coordination_settings->quorum_reads) + LOG_WARNING(&Poco::Logger::get("KeeperConfigurationAndSettings"), "Setting 'quorum_reads' is deprecated. Please use 'read_mode'"); + return ret; } diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index c436c1b6635..d6b0977b4fa 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -26,6 +26,7 @@ struct Settings; M(Milliseconds, heart_beat_interval_ms, 500, "Heartbeat interval between quorum nodes", 0) \ M(Milliseconds, election_timeout_lower_bound_ms, 1000, "Lower bound of election timer (avoid too often leader elections)", 0) \ M(Milliseconds, election_timeout_upper_bound_ms, 2000, "Upper bound of election timer (avoid too often leader elections)", 0) \ + M(Milliseconds, leadership_expiry, 0, "How often will leader node check if it still has majority. Set it lower or equal to election_timeout_lower_bound_ms to have linearizable reads.", 0) \ M(UInt64, reserved_log_items, 100000, "How many log items to store (don't remove during compaction)", 0) \ M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \ M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \ @@ -38,11 +39,12 @@ struct Settings; M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \ M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \ M(UInt64, max_requests_batch_size, 100, "Max size of batch in requests count before it will be sent to RAFT", 0) \ - M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \ + M(Bool, quorum_reads, false, "Deprecated - use read_mode. Execute read requests as writes through whole RAFT consesus with similar speed", 0) \ M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) \ M(Bool, compress_logs, true, "Write compressed coordination logs in ZSTD format", 0) \ M(Bool, compress_snapshots_with_zstd_format, true, "Write compressed snapshots in ZSTD format (instead of custom LZ4)", 0) \ - M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) + M(UInt64, configuration_change_tries_count, 20, "How many times we will try to apply configuration change (add/remove server) to the cluster", 0) \ + M(String, read_mode, "nonlinear", "How should reads be processed. Valid values: 'nonlinear', 'fastlinear', 'quorum'. 'nonlinear' is the fastest option because there are no consistency requirements", 0) DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 261e43d80e4..3445ef5ea23 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -6,6 +7,8 @@ #include #include #include +#include +#include #include #include @@ -30,22 +33,83 @@ namespace ErrorCodes KeeperDispatcher::KeeperDispatcher() : responses_queue(std::numeric_limits::max()) + , read_requests_queue(std::numeric_limits::max()) + , finalize_requests_queue(std::numeric_limits::max()) , configuration_and_settings(std::make_shared()) , log(&Poco::Logger::get("KeeperDispatcher")) { } +/// ZooKeepers has 2 requirements: +/// - writes need to be linearizable +/// - all requests from single session need to be processed in the order of their arrival +/// +/// Because of that, we cannot process read and write requests from SAME session at the same time. +/// To be able to process read and write requests in parallel we need to make sure that only 1 type +/// of request is being processed from a single session. +/// Multiple types from different sessions can be processed at the same time. +/// +/// We do some in-session housekeeping to make sure that the multithreaded request processing is correct. +/// When a request is received from a client, we check if there are requests being processed from that same +/// session, and if yes, of what type. If the types are the same, and there are no requests of different +/// type inbetetween, we can instanly add it to active request queue. Otherwise, we need to wait until +/// all requests of the other type are processed. +/// +/// There are multiple threads used for processing the request, each of them communicating with a queue. +/// Assumption: only one type of request is being processed from a same session at any point in time (read or write). +/// +/// requestThread -> requests currently being processed +/// readRequestThread -> thread for processing read requests +/// finalizeRequestThread -> thread for finalizing requests: +/// - in-session housekeeping, add requests to the active request queue if there are any +/// +/// If reads are linearizable without quorum, a request can possibly wait for a certain log to be committed. +/// In that case we add it to the waiting queue for that log. +/// When that log is committed, the committing thread will send that read request to readRequestThread so it can be processed. +/// void KeeperDispatcher::requestThread() { setThreadName("KeeperReqT"); /// Result of requests batch from previous iteration - RaftAppendResult prev_result = nullptr; - /// Requests from previous iteration. We store them to be able - /// to send errors to the client. - KeeperStorage::RequestsForSessions prev_batch; + RaftResult prev_result = nullptr; + const auto previous_quorum_done = [&] { return !prev_result || prev_result->has_result() || prev_result->get_result_code() != nuraft::cmd_result_code::OK; }; + const auto needs_quorum = [](const auto & coordination_settings, const auto & request) + { + return coordination_settings->quorum_reads || coordination_settings->read_mode.toString() == "quorum" || !request.request->isReadRequest(); + }; + + KeeperStorage::RequestsForSessions quorum_requests; + KeeperStorage::RequestsForSessions read_requests; + + auto process_quorum_requests = [&, this]() mutable + { + /// Forcefully process all previous pending requests + if (prev_result) + forceWaitAndProcessResult(prev_result); + + prev_result = server->putRequestBatch(quorum_requests); + + if (prev_result) + { + prev_result->when_ready([&, requests_for_sessions = std::move(quorum_requests)](nuraft::cmd_result> & result, nuraft::ptr &) mutable + { + if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT) + addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT); + else if (result.get_result_code() != nuraft::cmd_result_code::OK) + addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); + }); + } + + quorum_requests.clear(); + }; + + /// ZooKeeper requires that the requests inside a single session are processed in a strict order + /// (we cannot process later requests before all the previous once are processed) + /// By making sure that at this point we can either have just read requests or just write requests + /// from a single session, we can process them independently while (!shutdown_called) { KeeperStorage::RequestForSession request; @@ -54,94 +118,67 @@ void KeeperDispatcher::requestThread() uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds(); uint64_t max_batch_size = coordination_settings->max_requests_batch_size; - /// The code below do a very simple thing: batch all write (quorum) requests into vector until - /// previous write batch is not finished or max_batch size achieved. The main complexity goes from - /// the ability to process read requests without quorum (from local state). So when we are collecting - /// requests into a batch we must check that the new request is not read request. Otherwise we have to - /// process all already accumulated write requests, wait them synchronously and only after that process - /// read request. So reads are some kind of "separator" for writes. try { - if (requests_queue->tryPop(request, max_wait)) + if (active_requests_queue->tryPop(request, max_wait)) { CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); if (shutdown_called) break; - KeeperStorage::RequestsForSessions current_batch; + if (needs_quorum(coordination_settings, request)) + quorum_requests.emplace_back(request); + else + read_requests.emplace_back(request); - bool has_read_request = false; - - /// If new request is not read request or we must to process it through quorum. - /// Otherwise we will process it locally. - if (coordination_settings->quorum_reads || !request.request->isReadRequest()) + /// Waiting until previous append will be successful, or batch is big enough + /// has_result == false && get_result_code == OK means that our request still not processed. + /// Sometimes NuRaft set errorcode without setting result, so we check both here. + while (true) { - current_batch.emplace_back(request); + if (quorum_requests.size() > max_batch_size) + break; - /// Waiting until previous append will be successful, or batch is big enough - /// has_result == false && get_result_code == OK means that our request still not processed. - /// Sometimes NuRaft set errorcode without setting result, so we check both here. - while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size) + if (read_requests.size() > max_batch_size) { - /// Trying to get batch requests as fast as possible - if (requests_queue->tryPop(request, 1)) - { - CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); - /// Don't append read request into batch, we have to process them separately - if (!coordination_settings->quorum_reads && request.request->isReadRequest()) - { - has_read_request = true; - break; - } - else - { + processReadRequests(coordination_settings, read_requests); - current_batch.emplace_back(request); - } - } - - if (shutdown_called) + if (previous_quorum_done()) break; } + + /// Trying to get batch requests as fast as possible + if (active_requests_queue->tryPop(request, 1)) + { + CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); + if (needs_quorum(coordination_settings, request)) + quorum_requests.emplace_back(request); + else + read_requests.emplace_back(request); + } + else + { + /// batch of read requests can send at most one request + /// so we don't care if the previous batch hasn't received response + if (!read_requests.empty()) + processReadRequests(coordination_settings, read_requests); + + /// if we still didn't process previous batch we can + /// increase are current batch even more + if (previous_quorum_done()) + break; + } + + if (shutdown_called) + break; } - else - has_read_request = true; if (shutdown_called) break; - /// Forcefully process all previous pending requests - if (prev_result) - forceWaitAndProcessResult(prev_result, prev_batch); + if (!quorum_requests.empty()) + process_quorum_requests(); - /// Process collected write requests batch - if (!current_batch.empty()) - { - auto result = server->putRequestBatch(current_batch); - - if (result) - { - if (has_read_request) /// If we will execute read request next, than we have to process result now - forceWaitAndProcessResult(result, current_batch); - } - else - { - addErrorResponses(current_batch, Coordination::Error::ZCONNECTIONLOSS); - current_batch.clear(); - } - - prev_batch = std::move(current_batch); - prev_result = result; - } - - /// Read request always goes after write batch (last request) - if (has_read_request) - { - if (server->isLeaderAlive()) - server->putLocalReadRequest(request); - else - addErrorResponses({request}, Coordination::Error::ZCONNECTIONLOSS); - } } } catch (...) @@ -151,6 +188,72 @@ void KeeperDispatcher::requestThread() } } +void KeeperDispatcher::processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests) +{ + if (coordination_settings->read_mode.toString() == "fastlinear") + { + // we just want to know what's the current latest committed log on Leader node + auto leader_info_result = server->getLeaderInfo(); + if (leader_info_result) + { + leader_info_result->when_ready([&, requests_for_sessions = std::move(read_requests)](nuraft::cmd_result> & result, nuraft::ptr & exception) mutable + { + if (!result.get_accepted() || result.get_result_code() == nuraft::cmd_result_code::TIMEOUT) + { + addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT); + return; + } + + if (result.get_result_code() != nuraft::cmd_result_code::OK) + { + addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); + return; + } + + if (exception) + { + LOG_INFO(log, "Got exception while waiting for read results {}", exception->what()); + addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); + return; + } + + auto & leader_info_ctx = result.get(); + + if (!leader_info_ctx) + { + addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); + return; + } + + KeeperServer::NodeInfo leader_info; + leader_info.term = leader_info_ctx->get_ulong(); + leader_info.last_committed_index = leader_info_ctx->get_ulong(); + std::lock_guard lock(leader_waiter_mutex); + auto node_info = server->getNodeInfo(); + + /// we're behind, we need to wait + if (node_info.term < leader_info.term || node_info.last_committed_index < leader_info.last_committed_index) + { + auto & leader_waiter = leader_waiters[leader_info]; + leader_waiter.insert(leader_waiter.end(), requests_for_sessions.begin(), requests_for_sessions.end()); + LOG_TRACE(log, "waiting for term {}, idx {}", leader_info.term, leader_info.last_committed_index); + } + /// process it in background thread + else if (!read_requests_queue.push(std::move(requests_for_sessions))) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); + }); + } + } + else + { + assert(coordination_settings->read_mode.toString() == "nonlinear"); + if (!read_requests_queue.push(std::move(read_requests))) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); + } + + read_requests.clear(); +} + void KeeperDispatcher::responseThread() { setThreadName("KeeperRspT"); @@ -200,6 +303,65 @@ void KeeperDispatcher::snapshotThread() } } +/// Background thread for processing read requests +void KeeperDispatcher::readRequestThread() +{ + setThreadName("KeeperReadT"); + while (!shutdown_called) + { + KeeperStorage::RequestsForSessions requests; + if (!read_requests_queue.pop(requests)) + break; + + if (shutdown_called) + break; + + try + { + for (const auto & request_info : requests) + { + if (server->isLeaderAlive()) + server->putLocalReadRequest(request_info); + else + addErrorResponses({request_info}, Coordination::Error::ZCONNECTIONLOSS); + } + + if (!finalize_requests_queue.push(std::move(requests))) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + +/// We finalize requests every time we commit a single log with request +/// or process a batch of read requests. +/// Because it can get heavy, we do it in background thread. +void KeeperDispatcher::finalizeRequestsThread() +{ + setThreadName("KeeperFinalT"); + while (!shutdown_called) + { + KeeperStorage::RequestsForSessions requests; + if (!finalize_requests_queue.pop(requests)) + break; + + if (shutdown_called) + break; + + try + { + finalizeRequests(requests); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response) { std::lock_guard lock(session_to_response_callback_mutex); @@ -255,6 +417,30 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = session_id; + { + std::lock_guard lock{unprocessed_request_mutex}; + auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id); + if (unprocessed_requests_it == unprocessed_requests_for_session.end()) + { + auto & unprocessed_requests = unprocessed_requests_for_session[session_id]; + unprocessed_requests.unprocessed_num = 1; + unprocessed_requests.is_read = request->isReadRequest(); + } + else + { + auto & unprocessed_requests = unprocessed_requests_it->second; + + /// queue is not empty, or the request types don't match, put it in the waiting queue + if (!unprocessed_requests.request_queue.empty() || unprocessed_requests.is_read != request->isReadRequest()) + { + unprocessed_requests.request_queue.push_back(std::move(request_info)); + return true; + } + + ++unprocessed_requests.unprocessed_num; + } + } + std::lock_guard lock(push_request_mutex); if (shutdown_called) @@ -263,10 +449,10 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ /// Put close requests without timeouts if (request->getOpNum() == Coordination::OpNum::Close) { - if (!requests_queue->push(std::move(request_info))) + if (!active_requests_queue->push(std::move(request_info))) throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR); } - else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) + else if (!active_requests_queue->tryPush(std::move(request_info), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) { throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); } @@ -279,13 +465,23 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf LOG_DEBUG(log, "Initializing storage dispatcher"); configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper); - requests_queue = std::make_unique(configuration_and_settings->coordination_settings->max_requests_batch_size); + active_requests_queue = std::make_unique(configuration_and_settings->coordination_settings->max_requests_batch_size); request_thread = ThreadFromGlobalPool([this] { requestThread(); }); responses_thread = ThreadFromGlobalPool([this] { responseThread(); }); snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); }); + read_request_thread = ThreadFromGlobalPool([this] { readRequestThread(); }); + finalize_requests_thread = ThreadFromGlobalPool([this] { finalizeRequestsThread(); }); - server = std::make_unique(configuration_and_settings, config, responses_queue, snapshots_queue); + server = std::make_unique( + configuration_and_settings, + config, + responses_queue, + snapshots_queue, + [this](const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx) + { onRequestCommit(request_for_session, log_term, log_idx); }, + [this](uint64_t term, uint64_t last_idx) + { onApplySnapshot(term, last_idx); }); try { @@ -333,9 +529,9 @@ void KeeperDispatcher::shutdown() if (session_cleaner_thread.joinable()) session_cleaner_thread.join(); - if (requests_queue) + if (active_requests_queue) { - requests_queue->finish(); + active_requests_queue->finish(); if (request_thread.joinable()) request_thread.join(); @@ -349,6 +545,14 @@ void KeeperDispatcher::shutdown() if (snapshot_thread.joinable()) snapshot_thread.join(); + read_requests_queue.finish(); + if (read_request_thread.joinable()) + read_request_thread.join(); + + finalize_requests_queue.finish(); + if (finalize_requests_thread.joinable()) + finalize_requests_thread.join(); + update_configuration_queue.finish(); if (update_configuration_thread.joinable()) update_configuration_thread.join(); @@ -357,7 +561,7 @@ void KeeperDispatcher::shutdown() KeeperStorage::RequestForSession request_for_session; /// Set session expired for all pending requests - while (requests_queue && requests_queue->tryPop(request_for_session)) + while (active_requests_queue && active_requests_queue->tryPop(request_for_session)) { CurrentMetrics::sub(CurrentMetrics::KeeperOutstandingRequets); auto response = request_for_session.request->makeResponse(); @@ -474,7 +678,7 @@ void KeeperDispatcher::sessionCleanerTask() }; { std::lock_guard lock(push_request_mutex); - if (!requests_queue->push(std::move(request_info))) + if (!active_requests_queue->push(std::move(request_info))) LOG_INFO(log, "Cannot push close request to queue while cleaning outdated sessions"); CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets); } @@ -524,19 +728,12 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession } } -void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions) +void KeeperDispatcher::forceWaitAndProcessResult(RaftResult & result) { if (!result->has_result()) result->get(); - /// If we get some errors, than send them to clients - if (!result->get_accepted() || result->get_result_code() == nuraft::cmd_result_code::TIMEOUT) - addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT); - else if (result->get_result_code() != nuraft::cmd_result_code::OK) - addErrorResponses(requests_for_sessions, Coordination::Error::ZCONNECTIONLOSS); - result = nullptr; - requests_for_sessions.clear(); } int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms) @@ -584,7 +781,7 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms) /// Push new session request to queue { std::lock_guard lock(push_request_mutex); - if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms)) + if (!active_requests_queue->tryPush(std::move(request_info), session_timeout_ms)) throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED); CurrentMetrics::add(CurrentMetrics::KeeperOutstandingRequets); } @@ -657,6 +854,122 @@ void KeeperDispatcher::updateConfigurationThread() } } +// Used to update the state for a session based on the requests +// - update the number of current unprocessed requests for the session +// - if the number of unprocessed requests is 0, we can start adding next type of requests +// from unprocessed requests queue to the active queue +void KeeperDispatcher::finalizeRequests(const KeeperStorage::RequestsForSessions & requests_for_sessions) +{ + std::unordered_map counts_for_session; + + for (const auto & request_for_session : requests_for_sessions) + { + ++counts_for_session[request_for_session.session_id]; + } + + std::lock_guard lock{unprocessed_request_mutex}; + for (const auto [session_id, count] : counts_for_session) + { + auto unprocessed_requests_it = unprocessed_requests_for_session.find(session_id); + if (unprocessed_requests_it == unprocessed_requests_for_session.end()) + continue; + + auto & unprocessed_requests = unprocessed_requests_it->second; + unprocessed_requests.unprocessed_num -= count; + + if (unprocessed_requests.unprocessed_num == 0) + { + if (!unprocessed_requests.request_queue.empty()) + { + auto & unprocessed_requests_queue = unprocessed_requests.request_queue; + unprocessed_requests.is_read = !unprocessed_requests.is_read; + // start adding next type of requests + while (!unprocessed_requests_queue.empty() && unprocessed_requests_queue.front().request->isReadRequest() == unprocessed_requests.is_read) + { + auto & front_request = unprocessed_requests_queue.front(); + + /// Put close requests without timeouts + if (front_request.request->getOpNum() == Coordination::OpNum::Close) + { + if (!active_requests_queue->push(std::move(front_request))) + throw Exception("Cannot push request to queue", ErrorCodes::SYSTEM_ERROR); + } + else if (!active_requests_queue->tryPush(std::move(front_request), configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds())) + { + throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); + } + + ++unprocessed_requests.unprocessed_num; + unprocessed_requests_queue.pop_front(); + } + } + else + { + unprocessed_requests_for_session.erase(unprocessed_requests_it); + } + } + } +} + +// Finalize request +// Process read requests that were waiting for this commit +void KeeperDispatcher::onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx) +{ + if (!finalize_requests_queue.push({request_for_session})) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); + + KeeperStorage::RequestsForSessions requests; + { + std::lock_guard lock(leader_waiter_mutex); + auto request_queue_it = leader_waiters.find(KeeperServer::NodeInfo{.term = log_term, .last_committed_index = log_idx}); + if (request_queue_it != leader_waiters.end()) + { + requests = std::move(request_queue_it->second); + leader_waiters.erase(request_queue_it); + } + } + + if (requests.empty()) + return; + + if (!read_requests_queue.push(std::move(requests))) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); +} + +/// Process all read request that are waiting for lower or currently last processed log index +void KeeperDispatcher::onApplySnapshot(uint64_t term, uint64_t last_idx) +{ + KeeperServer::NodeInfo current_node_info{term, last_idx}; + KeeperStorage::RequestsForSessions requests; + { + std::lock_guard lock(leader_waiter_mutex); + for (auto leader_waiter_it = leader_waiters.begin(); leader_waiter_it != leader_waiters.end();) + { + auto waiting_node_info = leader_waiter_it->first; + if (waiting_node_info.term <= current_node_info.term + && waiting_node_info.last_committed_index <= current_node_info.last_committed_index) + { + for (auto & request : leader_waiter_it->second) + { + requests.push_back(std::move(request)); + } + + leader_waiter_it = leader_waiters.erase(leader_waiter_it); + } + else + { + ++leader_waiter_it; + } + } + } + + if (requests.empty()) + return; + + if (!read_requests_queue.push(std::move(requests))) + throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push read requests to queue"); +} + bool KeeperDispatcher::isServerActive() const { return checkInit() && hasLeader() && !server->isRecovering(); @@ -721,7 +1034,7 @@ Keeper4LWInfo KeeperDispatcher::getKeeper4LWInfo() const Keeper4LWInfo result = server->getPartiallyFilled4LWInfo(); { std::lock_guard lock(push_request_mutex); - result.outstanding_requests_count = requests_queue->size(); + result.outstanding_requests_count = active_requests_queue->size(); } { std::lock_guard lock(session_to_response_callback_mutex); diff --git a/src/Coordination/KeeperDispatcher.h b/src/Coordination/KeeperDispatcher.h index 5e2701299f4..6421db87793 100644 --- a/src/Coordination/KeeperDispatcher.h +++ b/src/Coordination/KeeperDispatcher.h @@ -32,9 +32,12 @@ private: using UpdateConfigurationQueue = ConcurrentBoundedQueue; /// Size depends on coordination settings - std::unique_ptr requests_queue; + /// Request currently being processed + std::unique_ptr active_requests_queue; ResponsesQueue responses_queue; SnapshotsQueue snapshots_queue{1}; + ConcurrentBoundedQueue read_requests_queue; + ConcurrentBoundedQueue finalize_requests_queue; /// More than 1k updates is definitely misconfiguration. UpdateConfigurationQueue update_configuration_queue{1000}; @@ -64,6 +67,8 @@ private: ThreadFromGlobalPool snapshot_thread; /// Apply or wait for configuration changes ThreadFromGlobalPool update_configuration_thread; + ThreadFromGlobalPool read_request_thread; + ThreadFromGlobalPool finalize_requests_thread; /// RAFT wrapper. std::unique_ptr server; @@ -77,6 +82,34 @@ private: /// Counter for new session_id requests. std::atomic internal_session_id_counter{0}; + /// A read request needs to have at least the log it was the last committed log on the leader + /// at the time the request was being made. + /// If the node is stale, we need to wait to commit that log before doing local read requests to achieve + /// linearizability. + std::unordered_map leader_waiters; + std::mutex leader_waiter_mutex; + + /// We can be actively processing one type of requests (either read or write) from a single session. + /// If we receive a request of a type that is not currently being processed, we put it in the waiting queue. + /// Also, we want to process them in ariving order, so if we have a different type in the queue, we cannot process that request + /// but wait for all the previous requests to finish. + /// E.g. READ -> WRITE -> READ, the last READ will go to the waiting queue even though we are currently processing the first READ + /// because we have WRITE request before it that needs to be processed. + struct UnprocessedRequests + { + /// how many requests are currently in the active request queue + size_t unprocessed_num{0}; + /// is_read currently being processed + bool is_read{false}; + std::list request_queue; + }; + + // Called every time a batch of requests are processed. + void finalizeRequests(const KeeperStorage::RequestsForSessions & requests_for_sessions); + + std::unordered_map unprocessed_requests_for_session; + std::mutex unprocessed_request_mutex; + /// Thread put requests to raft void requestThread(); /// Thread put responses for subscribed sessions @@ -88,6 +121,12 @@ private: /// Thread apply or wait configuration changes from leader void updateConfigurationThread(); + void readRequestThread(); + + void finalizeRequestsThread(); + + void processReadRequests(const CoordinationSettingsPtr & coordination_settings, KeeperStorage::RequestsForSessions & read_requests); + void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response); /// Add error responses for requests to responses queue. @@ -96,7 +135,7 @@ private: /// Forcefully wait for result and sets errors if something when wrong. /// Clears both arguments - void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions); + static void forceWaitAndProcessResult(RaftResult & result); public: /// Just allocate some objects, real initialization is done by `intialize method` @@ -116,6 +155,12 @@ public: return server && server->checkInit(); } + /// Called when a single log with request is committed. + void onRequestCommit(const KeeperStorage::RequestForSession & request_for_session, uint64_t log_term, uint64_t log_idx); + + /// Called when a snapshot is applied + void onApplySnapshot(uint64_t term, uint64_t last_idx); + /// Is server accepting requests, i.e. connected to the cluster /// and achieved quorum bool isServerActive() const; diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 8186ddd0c00..b708c5a51ba 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -105,7 +105,9 @@ KeeperServer::KeeperServer( const KeeperConfigurationAndSettingsPtr & configuration_and_settings_, const Poco::Util::AbstractConfiguration & config, ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_) + SnapshotsQueue & snapshots_queue_, + KeeperStateMachine::CommitCallback commit_callback, + KeeperStateMachine::ApplySnapshotCallback apply_snapshot_callback) : server_id(configuration_and_settings_->server_id) , coordination_settings(configuration_and_settings_->coordination_settings) , log(&Poco::Logger::get("KeeperServer")) @@ -113,7 +115,7 @@ KeeperServer::KeeperServer( , keeper_context{std::make_shared()} , create_snapshot_on_exit(config.getBool("keeper_server.create_snapshot_on_exit", true)) { - if (coordination_settings->quorum_reads) + if (coordination_settings->quorum_reads || coordination_settings->read_mode.toString() == "quorum") LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower."); keeper_context->digest_enabled = config.getBool("keeper_server.digest_enabled", false); @@ -125,7 +127,9 @@ KeeperServer::KeeperServer( configuration_and_settings_->snapshot_storage_path, coordination_settings, keeper_context, - checkAndGetSuperdigest(configuration_and_settings_->super_digest)); + checkAndGetSuperdigest(configuration_and_settings_->super_digest), + std::move(commit_callback), + std::move(apply_snapshot_callback)); state_manager = nuraft::cs_new( server_id, @@ -176,6 +180,13 @@ struct KeeperServer::KeeperRaftServer : public nuraft::raft_server reconfigure(new_config); } + RaftResult getLeaderInfo() + { + nuraft::ptr req + = nuraft::cs_new(0ull, nuraft::msg_type::leader_status_request, 0, 0, 0ull, 0ull, 0ull); + return send_msg_to_leader(req); + } + void commit_in_bg() override { // For NuRaft, if any commit fails (uncaught exception) the whole server aborts as a safety @@ -269,6 +280,20 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co coordination_settings->election_timeout_lower_bound_ms.totalMilliseconds(), "election_timeout_lower_bound_ms", log); params.election_timeout_upper_bound_ = getValueOrMaxInt32AndLogWarning( coordination_settings->election_timeout_upper_bound_ms.totalMilliseconds(), "election_timeout_upper_bound_ms", log); + + params.leadership_expiry_ = getValueOrMaxInt32AndLogWarning(coordination_settings->leadership_expiry.totalMilliseconds(), "leadership_expiry", log); + + if (coordination_settings->read_mode.toString() == "fastlinear") + { + if (params.leadership_expiry_ == 0) + params.leadership_expiry_ = params.election_timeout_lower_bound_; + else if (params.leadership_expiry_ > params.election_timeout_lower_bound_) + { + LOG_WARNING(log, "To use fast linearizable reads, leadership_expiry should be set to a value that is less or equal to the election_timeout_upper_bound_ms. " + "Based on current settings, there are no guarantees for linearizability of reads."); + } + } + params.reserved_log_items_ = getValueOrMaxInt32AndLogWarning(coordination_settings->reserved_log_items, "reserved_log_items", log); params.snapshot_distance_ = getValueOrMaxInt32AndLogWarning(coordination_settings->snapshot_distance, "snapshot_distance", log); @@ -487,7 +512,7 @@ void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & state_machine->processReadRequest(request_for_session); } -RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions) +RaftResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions) { std::vector> entries; for (const auto & request_for_session : requests_for_sessions) @@ -713,6 +738,20 @@ std::vector KeeperServer::getDeadSessions() return state_machine->getDeadSessions(); } +RaftResult KeeperServer::getLeaderInfo() +{ + std::lock_guard lock{server_write_mutex}; + if (is_recovering) + return nullptr; + + return raft_instance->getLeaderInfo(); +} + +KeeperServer::NodeInfo KeeperServer::getNodeInfo() +{ + return { .term = raft_instance->get_term(), .last_committed_index = state_machine->last_commit_index() }; +} + ConfigUpdateActions KeeperServer::getConfigurationDiff(const Poco::Util::AbstractConfiguration & config) { auto diff = state_manager->getConfigurationDiff(config); diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 6873ef2a01e..02ab643044a 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -14,7 +14,7 @@ namespace DB { -using RaftAppendResult = nuraft::ptr>>; +using RaftResult = nuraft::ptr>>; class KeeperServer { @@ -71,7 +71,9 @@ public: const KeeperConfigurationAndSettingsPtr & settings_, const Poco::Util::AbstractConfiguration & config_, ResponsesQueue & responses_queue_, - SnapshotsQueue & snapshots_queue_); + SnapshotsQueue & snapshots_queue_, + KeeperStateMachine::CommitCallback commit_callback, + KeeperStateMachine::ApplySnapshotCallback apply_snapshot_callback); /// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings. void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true); @@ -84,7 +86,7 @@ public: /// Put batch of requests into Raft and get result of put. Responses will be set separately into /// responses_queue. - RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests); + RaftResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests); /// Return set of the non-active sessions std::vector getDeadSessions(); @@ -119,6 +121,17 @@ public: int getServerID() const { return server_id; } + struct NodeInfo + { + uint64_t term; + uint64_t last_committed_index; + + bool operator==(const NodeInfo &) const = default; + }; + + RaftResult getLeaderInfo(); + NodeInfo getNodeInfo(); + /// Get configuration diff between current configuration in RAFT and in XML file ConfigUpdateActions getConfigurationDiff(const Poco::Util::AbstractConfiguration & config); @@ -126,10 +139,23 @@ public: /// Synchronously check for update results with retries. void applyConfigurationUpdate(const ConfigUpdateAction & task); - /// Wait configuration update for action. Used by followers. /// Return true if update was successfully received. bool waitConfigurationUpdate(const ConfigUpdateAction & task); }; } +namespace std +{ + template <> + struct hash + { + size_t operator()(const DB::KeeperServer::NodeInfo & info) const + { + SipHash hash_state; + hash_state.update(info.term); + hash_state.update(info.last_committed_index); + return hash_state.get64(); + } + }; +} diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index c5a66ce29ca..477d8104796 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -44,7 +44,9 @@ KeeperStateMachine::KeeperStateMachine( const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_, const KeeperContextPtr & keeper_context_, - const std::string & superdigest_) + const std::string & superdigest_, + CommitCallback commit_callback_, + ApplySnapshotCallback apply_snapshot_callback_) : coordination_settings(coordination_settings_) , snapshot_manager( snapshots_path_, @@ -58,6 +60,8 @@ KeeperStateMachine::KeeperStateMachine( , last_committed_idx(0) , log(&Poco::Logger::get("KeeperStateMachine")) , superdigest(superdigest_) + , commit_callback(std::move(commit_callback_)) + , apply_snapshot_callback(std::move(apply_snapshot_callback_)) , keeper_context(keeper_context_) { } @@ -223,11 +227,11 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req return true; } -nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data) +nuraft::ptr KeeperStateMachine::commit_ext(const ext_op_params & params) { - auto request_for_session = parseRequest(data); + auto request_for_session = parseRequest(*params.data); if (!request_for_session.zxid) - request_for_session.zxid = log_idx; + request_for_session.zxid = params.log_idx; /// Special processing of session_id request if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) @@ -272,8 +276,9 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n assertDigest(*request_for_session.digest, storage->getNodesDigest(true), *request_for_session.request, true); } + last_committed_idx = params.log_idx; + commit_callback(request_for_session, params.log_term, params.log_idx); ProfileEvents::increment(ProfileEvents::KeeperCommits); - last_committed_idx = log_idx; return nullptr; } @@ -306,6 +311,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) ProfileEvents::increment(ProfileEvents::KeeperSnapshotApplys); last_committed_idx = s.get_last_log_idx(); + apply_snapshot_callback(s.get_last_log_term(), s.get_last_log_idx()); return true; } @@ -320,6 +326,10 @@ void KeeperStateMachine::commit_config(const uint64_t /* log_idx */, nuraft::ptr void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data) { auto request_for_session = parseRequest(data); + + if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) + return; + // If we received a log from an older node, use the log_idx as the zxid // log_idx will always be larger or equal to the zxid so we can safely do this // (log_idx is increased for all logs, while zxid is only increased for requests) diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index fbd4fdc5ac2..f44dfd503b0 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -20,13 +20,18 @@ using SnapshotsQueue = ConcurrentBoundedQueue; class KeeperStateMachine : public nuraft::state_machine { public: + using CommitCallback = std::function; + using ApplySnapshotCallback = std::function; + KeeperStateMachine( ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_, const KeeperContextPtr & keeper_context_, - const std::string & superdigest_ = ""); + const std::string & superdigest_ = "", + CommitCallback commit_callback_ = [](const KeeperStorage::RequestForSession &, uint64_t, uint64_t){}, + ApplySnapshotCallback apply_snapshot_callback_ = [](uint64_t, uint64_t){}); /// Read state from the latest snapshot void init(); @@ -37,7 +42,7 @@ public: nuraft::ptr pre_commit(uint64_t log_idx, nuraft::buffer & data) override; - nuraft::ptr commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT + nuraft::ptr commit_ext(const ext_op_params & params) override; /// NOLINT /// Save new cluster config to our snapshot (copy of the config stored in StateManager) void commit_config(const uint64_t log_idx, nuraft::ptr & new_conf) override; /// NOLINT @@ -145,6 +150,11 @@ private: /// Special part of ACL system -- superdigest specified in server config. const std::string superdigest; + /// call when a request is committed + const CommitCallback commit_callback; + /// call when snapshot is applied + const ApplySnapshotCallback apply_snapshot_callback; + KeeperContextPtr keeper_context; }; diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index fa6bfca7c7a..b98fd0e56e8 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1330,8 +1330,9 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint changelog.append(entry); changelog.end_of_append_batch(0, 0); - state_machine->pre_commit(i, changelog.entry_at(i)->get_buf()); - state_machine->commit(i, changelog.entry_at(i)->get_buf()); + auto entry_buf = changelog.entry_at(i)->get_buf_ptr(); + state_machine->pre_commit(i, *entry_buf); + state_machine->commit_ext(nuraft::state_machine::ext_op_params{i, entry_buf}); bool snapshot_created = false; if (i % settings->snapshot_distance == 0) { @@ -1375,8 +1376,9 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint for (size_t i = restore_machine->last_commit_index() + 1; i < restore_changelog.next_slot(); ++i) { - restore_machine->pre_commit(i, changelog.entry_at(i)->get_buf()); - restore_machine->commit(i, changelog.entry_at(i)->get_buf()); + auto entry = changelog.entry_at(i)->get_buf_ptr(); + restore_machine->pre_commit(i, *entry); + restore_machine->commit_ext(nuraft::state_machine::ext_op_params{i, entry}); } auto & source_storage = state_machine->getStorage(); @@ -1477,18 +1479,18 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove) std::shared_ptr request_c = std::make_shared(); request_c->path = "/hello"; request_c->is_ephemeral = true; - auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c); - state_machine->pre_commit(1, entry_c->get_buf()); - state_machine->commit(1, entry_c->get_buf()); + auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c)->get_buf_ptr(); + state_machine->pre_commit(1, *entry_c); + state_machine->commit_ext(nuraft::state_machine::ext_op_params{1, entry_c}); const auto & storage = state_machine->getStorage(); EXPECT_EQ(storage.ephemerals.size(), 1); std::shared_ptr request_d = std::make_shared(); request_d->path = "/hello"; /// Delete from other session - auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d); - state_machine->pre_commit(2, entry_d->get_buf()); - state_machine->commit(2, entry_d->get_buf()); + auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d)->get_buf_ptr(); + state_machine->pre_commit(2, *entry_d); + state_machine->commit_ext(nuraft::state_machine::ext_op_params{2, entry_d}); EXPECT_EQ(storage.ephemerals.size(), 0); } diff --git a/tests/jepsen.clickhouse-keeper/resources/keeper_config.xml b/tests/jepsen.clickhouse-keeper/resources/keeper_config.xml index 2ab747fbd71..677de5f6769 100644 --- a/tests/jepsen.clickhouse-keeper/resources/keeper_config.xml +++ b/tests/jepsen.clickhouse-keeper/resources/keeper_config.xml @@ -21,7 +21,8 @@ 1000 2000 4000 - {quorum_reads} + 0 + fastlinear {snapshot_distance} {stale_log_gap} {reserved_log_items} diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/counter.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/counter.clj index 60b29bd799a..e6e94371501 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/counter.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/counter.clj @@ -27,7 +27,12 @@ (invoke! [this test op] (case (:f op) - :read (exec-with-retries 30 (fn [] + :read (try + (assoc op + :type :ok + :value (count (zk-list conn root-path))) + (catch Exception _ (assoc op :type :info, :error :connect-error))) + :final-read (exec-with-retries 30 (fn [] (assoc op :type :ok :value (count (zk-list conn root-path))))) @@ -49,7 +54,5 @@ :checker (checker/compose {:counter (checker/counter) :perf (checker/perf)}) - :generator (->> (range) - (map (fn [x] - (->> (gen/mix [r add]))))) - :final-generator (gen/once {:type :invoke, :f :read, :value nil})}) + :generator (gen/mix [r add]) + :final-generator (gen/once {:type :invoke, :f :final-read, :value nil})}) diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj index c354e36e430..9e85b37dd75 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/db.clj @@ -98,7 +98,6 @@ #"\{srv2\}" (get nodes 1) #"\{srv3\}" (get nodes 2) #"\{id\}" (str (inc (.indexOf nodes node))) - #"\{quorum_reads\}" (str (boolean (:quorum test))) #"\{snapshot_distance\}" (str (:snapshot-distance test)) #"\{stale_log_gap\}" (str (:stale-log-gap test)) #"\{reserved_log_items\}" (str (:reserved-log-items test))}] diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/main.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/main.clj index cd1aa540e45..1919c8ce3ec 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/main.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/main.clj @@ -103,7 +103,7 @@ current-nemesis (get custom-nemesis/custom-nemesises (:nemesis opts))] (merge tests/noop-test opts - {:name (str "clickhouse-keeper-quorum=" quorum "-" (name (:workload opts)) "-" (name (:nemesis opts))) + {:name (str "clickhouse-keeper-" (name (:workload opts)) "-" (name (:nemesis opts))) :os ubuntu/os :db (get-db opts) :pure-generators true diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/register.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/register.clj index a1605192b51..228cb3f46ef 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/register.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/register.clj @@ -20,7 +20,8 @@ (assoc this :conn (zk-connect node 9181 30000))) (setup! [this test] - (zk-create-range conn 300)) ; 300 nodes to be sure + (exec-with-retries 30 (fn [] + (zk-create-range conn 300)))) (invoke! [_ test op] (let [[k v] (:value op) diff --git a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/utils.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/utils.clj index 3625b24b4f9..cdb25ba0a2d 100644 --- a/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/utils.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse_keeper/utils.clj @@ -45,7 +45,7 @@ (defn zk-connect [host port timeout] - (exec-with-retries 30 (fn [] (zk/connect (str host ":" port) :timeout-msec timeout)))) + (zk/connect (str host ":" port) :timeout-msec timeout)) (defn zk-create-range [conn n]