diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index 56f9de31ec8..df1841d97e3 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -455,6 +455,35 @@ ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return std::m ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return std::make_shared(requests); } ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared(); } +void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const +{ + Coordination::write(internal_id, out); + Coordination::write(session_timeout_ms, out); +} + +void ZooKeeperSessionIDRequest::readImpl(ReadBuffer & in) +{ + Coordination::read(internal_id, in); + Coordination::read(session_timeout_ms, in); +} + +Coordination::ZooKeeperResponsePtr ZooKeeperSessionIDRequest::makeResponse() const +{ + return std::make_shared(); +} + +void ZooKeeperSessionIDResponse::readImpl(ReadBuffer & in) +{ + Coordination::read(internal_id, in); + Coordination::read(session_id, in); +} + +void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const +{ + Coordination::write(internal_id, out); + Coordination::write(session_id, out); +} + void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator) { if (!op_num_to_request.try_emplace(op_num, creator).second) @@ -511,6 +540,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory() registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); registerZooKeeperRequest(*this); + registerZooKeeperRequest(*this); } } diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index ed9fcc4979c..af66c9de7fc 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -390,6 +390,36 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse size_t bytesSize() const override { return MultiResponse::bytesSize() + sizeof(xid) + sizeof(zxid); } }; +/// Fake internal coordination (keeper) response. Never received from client +/// and never send to client. +struct ZooKeeperSessionIDRequest final : ZooKeeperRequest +{ + int64_t internal_id; + int64_t session_timeout_ms; + + Coordination::OpNum getOpNum() const override { return OpNum::SessionID; } + String getPath() const override { return {}; } + void writeImpl(WriteBuffer & out) const override; + void readImpl(ReadBuffer & in) override; + + Coordination::ZooKeeperResponsePtr makeResponse() const override; + bool isReadRequest() const override { return false; } +}; + +/// Fake internal coordination (keeper) response. Never received from client +/// and never send to client. +struct ZooKeeperSessionIDResponse final : ZooKeeperResponse +{ + int64_t internal_id; + int64_t session_id; + + void readImpl(ReadBuffer & in) override; + + void writeImpl(WriteBuffer & out) const override; + + Coordination::OpNum getOpNum() const override { return OpNum::SessionID; } +}; + class ZooKeeperRequestFactory final : private boost::noncopyable { diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.cpp b/src/Common/ZooKeeper/ZooKeeperConstants.cpp index 295094b336b..d2dde4c4cdd 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.cpp +++ b/src/Common/ZooKeeper/ZooKeeperConstants.cpp @@ -21,6 +21,7 @@ static const std::unordered_set VALID_OPERATIONS = static_cast(OpNum::Check), static_cast(OpNum::Multi), static_cast(OpNum::Auth), + static_cast(OpNum::SessionID), }; std::string toString(OpNum op_num) @@ -55,6 +56,8 @@ std::string toString(OpNum op_num) return "Heartbeat"; case OpNum::Auth: return "Auth"; + case OpNum::SessionID: + return "SessionID"; } int32_t raw_op = static_cast(op_num); throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED); diff --git a/src/Common/ZooKeeper/ZooKeeperConstants.h b/src/Common/ZooKeeper/ZooKeeperConstants.h index 81ca6c6a460..f91204693a0 100644 --- a/src/Common/ZooKeeper/ZooKeeperConstants.h +++ b/src/Common/ZooKeeper/ZooKeeperConstants.h @@ -30,6 +30,7 @@ enum class OpNum : int32_t Check = 13, Multi = 14, Auth = 100, + SessionID = 997, /// Special internal request }; std::string toString(OpNum op_num); diff --git a/src/Coordination/Changelog.cpp b/src/Coordination/Changelog.cpp index 7fc24de1cb9..ba1664b23da 100644 --- a/src/Coordination/Changelog.cpp +++ b/src/Coordination/Changelog.cpp @@ -80,7 +80,7 @@ public: {} - off_t appendRecord(ChangelogRecord && record, bool sync) + off_t appendRecord(ChangelogRecord && record) { off_t result = plain_buf.count(); writeIntBinary(computeRecordChecksum(record), plain_buf); @@ -96,23 +96,21 @@ public: entries_written++; - if (sync) - plain_buf.sync(); - else - plain_buf.next(); return result; } void truncateToLength(off_t new_length) { - flush(); + plain_buf.next(); plain_buf.truncate(new_length); plain_buf.seek(new_length, SEEK_SET); } - void flush() + void flush(bool force_fsync) { - plain_buf.sync(); + plain_buf.next(); + if (force_fsync) + plain_buf.sync(); } uint64_t getEntriesWritten() const @@ -247,9 +245,14 @@ private: ReadBufferFromFile read_buf; }; -Changelog::Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_) +Changelog::Changelog( + const std::string & changelogs_dir_, + uint64_t rotate_interval_, + bool force_sync_, + Poco::Logger * log_) : changelogs_dir(changelogs_dir_) , rotate_interval(rotate_interval_) + , force_sync(force_sync_) , log(log_) { namespace fs = std::filesystem; @@ -357,6 +360,9 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin void Changelog::rotate(uint64_t new_start_log_index) { + /// Flush previous log + flush(); + ChangelogFileDescription new_description; new_description.prefix = DEFAULT_PREFIX; new_description.from_log_index = new_start_log_index; @@ -387,7 +393,7 @@ ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_e return record; } -void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync) +void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry) { if (!current_writer) throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before appending records"); @@ -398,14 +404,14 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool if (current_writer->getEntriesWritten() == rotate_interval) rotate(index); - auto offset = current_writer->appendRecord(buildRecord(index, log_entry), force_sync); + auto offset = current_writer->appendRecord(buildRecord(index, log_entry)); if (!index_to_start_pos.try_emplace(index, offset).second) throw Exception(ErrorCodes::LOGICAL_ERROR, "Record with index {} already exists", index); logs[index] = makeClone(log_entry); } -void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync) +void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry) { if (index_to_start_pos.count(index) == 0) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index); @@ -451,7 +457,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry, bool forc current_writer->setEntriesWritten(entries_written); - appendEntry(index, log_entry, force_sync); + appendEntry(index, log_entry); } void Changelog::compact(uint64_t up_to_log_index) @@ -540,7 +546,7 @@ nuraft::ptr Changelog::serializeEntriesToBuffer(uint64_t index, return buf_out; } -void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync) +void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer) { buffer.pos(0); int num_logs = buffer.get_int(); @@ -555,23 +561,23 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, LogEntryPtr log_entry = nuraft::log_entry::deserialize(*buf_local); if (i == 0 && logs.count(cur_index)) - writeAt(cur_index, log_entry, force_sync); + writeAt(cur_index, log_entry); else - appendEntry(cur_index, log_entry, force_sync); + appendEntry(cur_index, log_entry); } } void Changelog::flush() { - current_writer->flush(); + if (current_writer) + current_writer->flush(force_sync); } Changelog::~Changelog() { try { - if (current_writer) - current_writer->flush(); + flush(); } catch (...) { diff --git a/src/Coordination/Changelog.h b/src/Coordination/Changelog.h index 21d96cd1438..d669f56aded 100644 --- a/src/Coordination/Changelog.h +++ b/src/Coordination/Changelog.h @@ -63,17 +63,17 @@ class Changelog { public: - Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_); + Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, bool force_sync_, Poco::Logger * log_); /// Read changelog from files on changelogs_dir_ skipping all entries before from_log_index /// Truncate broken entries, remove files after broken entries. void readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep); - /// Add entry to log with index. Call fsync if force_sync true. - void appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync); + /// Add entry to log with index. + void appendEntry(uint64_t index, const LogEntryPtr & log_entry); /// Write entry at index and truncate all subsequent entries. - void writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync); + void writeAt(uint64_t index, const LogEntryPtr & log_entry); /// Remove log files with to_log_index <= up_to_log_index. void compact(uint64_t up_to_log_index); @@ -101,9 +101,9 @@ public: BufferPtr serializeEntriesToBuffer(uint64_t index, int32_t count); /// Apply entries from buffer overriding existing entries - void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync); + void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer); - /// Fsync log to disk + /// Fsync latest log to disk and flush buffer void flush(); uint64_t size() const @@ -124,6 +124,7 @@ private: private: const std::string changelogs_dir; const uint64_t rotate_interval; + const bool force_sync; Poco::Logger * log; std::map existing_changelogs; diff --git a/src/Coordination/CoordinationSettings.h b/src/Coordination/CoordinationSettings.h index 2b0c6c1042a..7a98e3f200d 100644 --- a/src/Coordination/CoordinationSettings.h +++ b/src/Coordination/CoordinationSettings.h @@ -32,8 +32,9 @@ struct Settings; M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \ M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \ M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \ + M(UInt64, max_requests_batch_size, 100, "Max size of batch in requests count before it will be sent to RAFT", 0) \ M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \ - M(Bool, force_sync, true, " Call fsync on each change in RAFT changelog", 0) + M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0) DECLARE_SETTINGS_TRAITS(CoordinationSettingsTraits, LIST_OF_COORDINATION_SETTINGS) diff --git a/src/Coordination/KeeperLogStore.cpp b/src/Coordination/KeeperLogStore.cpp index f78d57ee1ce..3896cf9b6fd 100644 --- a/src/Coordination/KeeperLogStore.cpp +++ b/src/Coordination/KeeperLogStore.cpp @@ -5,9 +5,12 @@ namespace DB KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_) : log(&Poco::Logger::get("KeeperLogStore")) - , changelog(changelogs_path, rotate_interval_, log) - , force_sync(force_sync_) + , changelog(changelogs_path, rotate_interval_, force_sync_, log) { + if (force_sync_) + LOG_INFO(log, "force_sync enabled"); + else + LOG_INFO(log, "force_sync disabled"); } uint64_t KeeperLogStore::start_index() const @@ -38,7 +41,7 @@ uint64_t KeeperLogStore::append(nuraft::ptr & entry) { std::lock_guard lock(changelog_lock); uint64_t idx = changelog.getNextEntryIndex(); - changelog.appendEntry(idx, entry, force_sync); + changelog.appendEntry(idx, entry); return idx; } @@ -46,7 +49,7 @@ uint64_t KeeperLogStore::append(nuraft::ptr & entry) void KeeperLogStore::write_at(uint64_t index, nuraft::ptr & entry) { std::lock_guard lock(changelog_lock); - changelog.writeAt(index, entry, force_sync); + changelog.writeAt(index, entry); } nuraft::ptr>> KeeperLogStore::log_entries(uint64_t start, uint64_t end) @@ -93,7 +96,7 @@ bool KeeperLogStore::flush() void KeeperLogStore::apply_pack(uint64_t index, nuraft::buffer & pack) { std::lock_guard lock(changelog_lock); - changelog.applyEntriesFromBuffer(index, pack, force_sync); + changelog.applyEntriesFromBuffer(index, pack); } uint64_t KeeperLogStore::size() const @@ -102,4 +105,10 @@ uint64_t KeeperLogStore::size() const return changelog.size(); } +void KeeperLogStore::end_of_append_batch(uint64_t /*start_index*/, uint64_t /*count*/) +{ + std::lock_guard lock(changelog_lock); + changelog.flush(); +} + } diff --git a/src/Coordination/KeeperLogStore.h b/src/Coordination/KeeperLogStore.h index bc2ae719f0e..01315e6e879 100644 --- a/src/Coordination/KeeperLogStore.h +++ b/src/Coordination/KeeperLogStore.h @@ -42,11 +42,12 @@ public: uint64_t size() const; + void end_of_append_batch(uint64_t start_index, uint64_t count) override; + private: mutable std::mutex changelog_lock; Poco::Logger * log; Changelog changelog; - bool force_sync; }; } diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 7827a25afdd..c18bfc1e756 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -73,7 +73,6 @@ KeeperServer::KeeperServer( config.getString("keeper_server.snapshot_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/snapshots"), coordination_settings)) , state_manager(nuraft::cs_new(server_id, "keeper_server", config, coordination_settings)) - , responses_queue(responses_queue_) , log(&Poco::Logger::get("KeeperServer")) { if (coordination_settings->quorum_reads) @@ -111,7 +110,7 @@ void KeeperServer::startup() params.auto_forwarding_ = coordination_settings->auto_forwarding; params.auto_forwarding_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds() * 2; - params.return_method_ = nuraft::raft_params::blocking; + params.return_method_ = nuraft::raft_params::async_handler; nuraft::asio_service::options asio_opts{}; if (state_manager->isSecure()) @@ -222,75 +221,23 @@ nuraft::ptr getZooKeeperLogEntry(int64_t session_id, const Coord } -void KeeperServer::putRequest(const KeeperStorage::RequestForSession & request_for_session) + +void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session) { - auto [session_id, request] = request_for_session; - if (!coordination_settings->quorum_reads && isLeaderAlive() && request->isReadRequest()) - { - state_machine->processReadRequest(request_for_session); - } - else - { - std::vector> entries; - entries.push_back(getZooKeeperLogEntry(session_id, request)); - - std::lock_guard lock(append_entries_mutex); - - auto result = raft_instance->append_entries(entries); - if (!result->get_accepted()) - { - KeeperStorage::ResponsesForSessions responses; - auto response = request->makeResponse(); - response->xid = request->xid; - response->zxid = 0; - response->error = Coordination::Error::ZOPERATIONTIMEOUT; - responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response}); - } - - if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT) - { - KeeperStorage::ResponsesForSessions responses; - auto response = request->makeResponse(); - response->xid = request->xid; - response->zxid = 0; - response->error = Coordination::Error::ZOPERATIONTIMEOUT; - responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response}); - } - else if (result->get_result_code() != nuraft::cmd_result_code::OK) - throw Exception(ErrorCodes::RAFT_ERROR, "Requests result failed with code {} and message: '{}'", result->get_result_code(), result->get_result_str()); - } + state_machine->processReadRequest(request_for_session); } -int64_t KeeperServer::getSessionID(int64_t session_timeout_ms) +RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions) { - /// Just some sanity check. We don't want to make a lot of clients wait with lock. - if (active_session_id_requests > 10) - throw Exception(ErrorCodes::RAFT_ERROR, "Too many concurrent SessionID requests already in flight"); - ++active_session_id_requests; - SCOPE_EXIT({ --active_session_id_requests; }); + std::vector> entries; + for (const auto & [session_id, request] : requests_for_sessions) + entries.push_back(getZooKeeperLogEntry(session_id, request)); - auto entry = nuraft::buffer::alloc(sizeof(int64_t)); - /// Just special session request - nuraft::buffer_serializer bs(entry); - bs.put_i64(session_timeout_ms); - - std::lock_guard lock(append_entries_mutex); - - auto result = raft_instance->append_entries({entry}); - - if (!result->get_accepted()) - throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send session_id request to RAFT"); - - if (result->get_result_code() != nuraft::cmd_result_code::OK) - throw Exception(ErrorCodes::RAFT_ERROR, "session_id request failed to RAFT"); - - auto resp = result->get(); - if (resp == nullptr) - throw Exception(ErrorCodes::RAFT_ERROR, "Received nullptr as session_id"); - - nuraft::buffer_serializer bs_resp(resp); - return bs_resp.get_i64(); + { + std::lock_guard lock(append_entries_mutex); + return raft_instance->append_entries(entries); + } } bool KeeperServer::isLeader() const diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 5af948305ef..1e38ef8f2c0 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -12,6 +12,8 @@ namespace DB { +using RaftAppendResult = nuraft::ptr>>; + class KeeperServer { private: @@ -29,13 +31,10 @@ private: std::mutex append_entries_mutex; - ResponsesQueue & responses_queue; - std::mutex initialized_mutex; std::atomic initialized_flag = false; std::condition_variable initialized_cv; std::atomic initial_batch_committed = false; - std::atomic active_session_id_requests = 0; Poco::Logger * log; @@ -60,9 +59,9 @@ public: void startup(); - void putRequest(const KeeperStorage::RequestForSession & request); + void putLocalReadRequest(const KeeperStorage::RequestForSession & request); - int64_t getSessionID(int64_t session_timeout_ms); + RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests); std::unordered_set getDeadSessions(); diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index c909cfc68e2..481438055f6 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -90,25 +90,28 @@ void KeeperStateMachine::init() nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data) { - if (data.size() == sizeof(int64_t)) + auto request_for_session = parseRequest(data); + if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) { - nuraft::buffer_serializer timeout_data(data); - int64_t session_timeout_ms = timeout_data.get_i64(); - auto response = nuraft::buffer::alloc(sizeof(int64_t)); + const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast(*request_for_session.request); int64_t session_id; - nuraft::buffer_serializer bs(response); { std::lock_guard lock(storage_lock); - session_id = storage->getSessionID(session_timeout_ms); - bs.put_i64(session_id); + session_id = storage->getSessionID(session_id_request.session_timeout_ms); } - LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_timeout_ms); - last_committed_idx = log_idx; - return response; + LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms); + + std::shared_ptr response = std::make_shared(); + response->internal_id = session_id_request.internal_id; + response->session_id = session_id; + + KeeperStorage::ResponseForSession response_for_session; + response_for_session.session_id = -1; + response_for_session.response = response; + responses_queue.push(response_for_session); } else { - auto request_for_session = parseRequest(data); KeeperStorage::ResponsesForSessions responses_for_sessions; { std::lock_guard lock(storage_lock); @@ -116,10 +119,11 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n for (auto & response_for_session : responses_for_sessions) responses_queue.push(response_for_session); } - - last_committed_idx = log_idx; - return nullptr; } + + + last_committed_idx = log_idx; + return nullptr; } bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 0a368268ad5..585426a7441 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -8,7 +8,6 @@ #include #include #include -#include #include namespace DB @@ -18,7 +17,7 @@ using namespace DB; struct KeeperStorageRequest; using KeeperStorageRequestPtr = std::shared_ptr; using ResponseCallback = std::function; -using ChildrenSet = std::set; +using ChildrenSet = std::unordered_set; using SessionAndTimeout = std::unordered_map; struct KeeperStorageSnapshot; diff --git a/src/Coordination/KeeperStorageDispatcher.cpp b/src/Coordination/KeeperStorageDispatcher.cpp index 7f9f9170dc2..41c83a79385 100644 --- a/src/Coordination/KeeperStorageDispatcher.cpp +++ b/src/Coordination/KeeperStorageDispatcher.cpp @@ -1,5 +1,9 @@ #include #include +#include +#include +#include +#include namespace DB { @@ -17,29 +21,106 @@ KeeperStorageDispatcher::KeeperStorageDispatcher() { } + void KeeperStorageDispatcher::requestThread() { setThreadName("KeeperReqT"); + + /// Result of requests batch from previous iteration + RaftAppendResult prev_result = nullptr; + /// Requests from previous iteration. We store them to be able + /// to send errors to the client. + KeeperStorage::RequestsForSessions prev_batch; + while (!shutdown_called) { KeeperStorage::RequestForSession request; UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds()); + uint64_t max_batch_size = coordination_settings->max_requests_batch_size; - if (requests_queue.tryPop(request, max_wait)) + /// The code bellow do a very simple thing: batch all write (quorum) requests into vector until + /// previous write batch is not finished or max_batch size achieved. The main complexity goes from + /// the ability to process read requests without quorum (from local state). So when we are collecting + /// requests into a batch we must check that the new request is not read request. Otherwise we have to + /// process all already accumulated write requests, wait them synchronously and only after that process + /// read request. So reads are some kind of "separator" for writes. + try { - if (shutdown_called) - break; + if (requests_queue->tryPop(request, max_wait)) + { + if (shutdown_called) + break; - try - { - server->putRequest(request); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); + KeeperStorage::RequestsForSessions current_batch; + + bool has_read_request = false; + + /// If new request is not read request or we must to process it through quorum. + /// Otherwise we will process it locally. + if (coordination_settings->quorum_reads || !request.request->isReadRequest() || !server->isLeaderAlive()) + { + current_batch.emplace_back(request); + + /// Waiting until previous append will be successful, or batch is big enough + while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size) + { + /// Trying to get batch requests as fast as possible + if (requests_queue->tryPop(request, 1)) + { + /// Don't append read request into batch, we have to process them separately + if (!coordination_settings->quorum_reads && request.request->isReadRequest() && server->isLeaderAlive()) + { + has_read_request = true; + break; + } + + current_batch.emplace_back(request); + } + + if (shutdown_called) + break; + } + } + else + has_read_request = true; + + if (shutdown_called) + break; + + /// Forcefully process all previous pending requests + if (prev_result) + forceWaitAndProcessResult(std::move(prev_result), std::move(prev_batch)); + + /// Process collected write requests batch + if (!current_batch.empty()) + { + prev_result = server->putRequestBatch(current_batch); + + if (prev_result) + { + if (has_read_request) /// If we will execute read request next, than we have to process result now + forceWaitAndProcessResult(std::move(prev_result), std::move(current_batch)); + } + else + { + addErrorResponses(std::move(current_batch), Coordination::Error::ZRUNTIMEINCONSISTENCY); + } + + prev_batch = current_batch; + } + + /// Read request always goes after write batch (last request) + /// We don't check leader aliveness, because it was alive when this request happen + /// so our state is consistent. + if (has_read_request) + server->putLocalReadRequest(request); } } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } } } @@ -94,14 +175,30 @@ void KeeperStorageDispatcher::snapshotThread() void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response) { std::lock_guard lock(session_to_response_callback_mutex); - auto session_writer = session_to_response_callback.find(session_id); - if (session_writer == session_to_response_callback.end()) - return; + if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::SessionID) + { + const Coordination::ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast(*response); + /// Nobody waits for this session id + if (!new_session_id_response_callback.count(session_id_resp.internal_id)) + return; - session_writer->second(response); - /// Session closed, no more writes - if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close) - session_to_response_callback.erase(session_writer); + auto callback = new_session_id_response_callback[session_id_resp.internal_id]; + callback(response); + new_session_id_response_callback.erase(session_id_resp.internal_id); + } + else + { + auto session_writer = session_to_response_callback.find(session_id); + if (session_writer == session_to_response_callback.end()) + return; + + session_writer->second(response); + /// Session closed, no more writes + if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close) + { + session_to_response_callback.erase(session_writer); + } + } } bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) @@ -119,8 +216,8 @@ bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr std::lock_guard lock(push_request_mutex); /// Put close requests without timeouts if (request->getOpNum() == Coordination::OpNum::Close) - requests_queue.push(std::move(request_info)); - else if (!requests_queue.tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds())) + requests_queue->push(std::move(request_info)); + else if (!requests_queue->tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds())) throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED); return true; } @@ -131,6 +228,7 @@ void KeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration int myid = config.getInt("keeper_server.server_id"); coordination_settings->loadFromConfig("keeper_server.coordination_settings", config); + requests_queue = std::make_unique(coordination_settings->max_requests_batch_size); request_thread = ThreadFromGlobalPool([this] { requestThread(); }); responses_thread = ThreadFromGlobalPool([this] { responseThread(); }); @@ -175,7 +273,7 @@ void KeeperStorageDispatcher::shutdown() session_cleaner_thread.join(); /// FIXME not the best way to notify - requests_queue.push({}); + requests_queue->push({}); if (request_thread.joinable()) request_thread.join(); @@ -192,7 +290,7 @@ void KeeperStorageDispatcher::shutdown() server->shutdown(); KeeperStorage::RequestForSession request_for_session; - while (requests_queue.tryPop(request_for_session)) + while (requests_queue->tryPop(request_for_session)) { if (request_for_session.request) { @@ -249,7 +347,7 @@ void KeeperStorageDispatcher::sessionCleanerTask() request_info.session_id = dead_session; { std::lock_guard lock(push_request_mutex); - requests_queue.push(std::move(request_info)); + requests_queue->push(std::move(request_info)); } finishSession(dead_session); LOG_INFO(log, "Dead session close request pushed"); @@ -273,4 +371,70 @@ void KeeperStorageDispatcher::finishSession(int64_t session_id) session_to_response_callback.erase(session_it); } +void KeeperStorageDispatcher::addErrorResponses(KeeperStorage::RequestsForSessions && requests_for_sessions, Coordination::Error error) +{ + for (const auto & [session_id, request] : requests_for_sessions) + { + KeeperStorage::ResponsesForSessions responses; + auto response = request->makeResponse(); + response->xid = request->xid; + response->zxid = 0; + response->error = error; + responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response}); + } + requests_for_sessions.clear(); +} + +void KeeperStorageDispatcher::forceWaitAndProcessResult(RaftAppendResult && result, KeeperStorage::RequestsForSessions && requests_for_sessions) +{ + if (!result->has_result()) + result->get(); + + /// If we get some errors, than send them to clients + if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT) + addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZOPERATIONTIMEOUT); + else if (result->get_result_code() != nuraft::cmd_result_code::OK) + addErrorResponses(std::move(requests_for_sessions), Coordination::Error::ZRUNTIMEINCONSISTENCY); + + result = nullptr; +} + +int64_t KeeperStorageDispatcher::getSessionID(long session_timeout_ms) +{ + KeeperStorage::RequestForSession request_info; + std::shared_ptr request = std::make_shared(); + request->internal_id = internal_session_id_counter.fetch_add(1); + request->session_timeout_ms = session_timeout_ms; + request_info.request = request; + request_info.session_id = -1; + + auto promise = std::make_shared>(); + auto future = promise->get_future(); + { + std::lock_guard lock(session_to_response_callback_mutex); + new_session_id_response_callback[request->internal_id] = [promise] (const Coordination::ZooKeeperResponsePtr & response) + { + if (response->getOpNum() != Coordination::OpNum::SessionID) + promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR, + "Incorrect response of type {} instead of SessionID response", Coordination::toString(response->getOpNum())))); + + if (response->error != Coordination::Error::ZOK) + promise->set_exception(std::make_exception_ptr(zkutil::KeeperException("SessionID request failed with error", response->error))); + + promise->set_value(dynamic_cast(*response).session_id); + }; + } + + { + std::lock_guard lock(push_request_mutex); + if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms)) + throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED); + } + + if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready) + throw Exception("Cannot receive session id within session timeout", ErrorCodes::TIMEOUT_EXCEEDED); + + return future.get(); +} + } diff --git a/src/Coordination/KeeperStorageDispatcher.h b/src/Coordination/KeeperStorageDispatcher.h index 622b63be800..a3526186e0e 100644 --- a/src/Coordination/KeeperStorageDispatcher.h +++ b/src/Coordination/KeeperStorageDispatcher.h @@ -32,24 +32,42 @@ private: using RequestsQueue = ConcurrentBoundedQueue; using SessionToResponseCallback = std::unordered_map; - RequestsQueue requests_queue{1}; + /// Size depends on coordination settings + std::unique_ptr requests_queue; ResponsesQueue responses_queue; SnapshotsQueue snapshots_queue{1}; std::atomic shutdown_called{false}; std::mutex session_to_response_callback_mutex; + /// These two maps looks similar, but serves different purposes. + /// The first map is subscription map for normal responses like + /// (get, set, list, etc.). Dispatcher determines callback for each response + /// using session id from this map. SessionToResponseCallback session_to_response_callback; + /// But when client connects to the server for the first time it doesn't + /// have session_id. It request it from server. We give temporary + /// internal id for such requests just to much client with its response. + SessionToResponseCallback new_session_id_response_callback; + /// Reading and batching new requests from client handlers ThreadFromGlobalPool request_thread; + /// Pushing responses to clients client handlers + /// using session_id. ThreadFromGlobalPool responses_thread; + /// Cleaning old dead sessions ThreadFromGlobalPool session_cleaner_thread; + /// Dumping new snapshots to disk ThreadFromGlobalPool snapshot_thread; + /// RAFT wrapper. Most important class. std::unique_ptr server; Poco::Logger * log; + /// Counter for new session_id requests. + std::atomic internal_session_id_counter{0}; + private: void requestThread(); void responseThread(); @@ -57,6 +75,14 @@ private: void snapshotThread(); void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response); + /// Add error responses for requests to responses queue. + /// Clears requests. + void addErrorResponses(KeeperStorage::RequestsForSessions && requests_for_sessions, Coordination::Error error); + + /// Forcefully wait for result and sets errors if something when wrong. + /// Clears both arguments + void forceWaitAndProcessResult(RaftAppendResult && result, KeeperStorage::RequestsForSessions && requests_for_sessions); + public: KeeperStorageDispatcher(); @@ -78,10 +104,7 @@ public: return server->isLeaderAlive(); } - int64_t getSessionID(long session_timeout_ms) - { - return server->getSessionID(session_timeout_ms); - } + int64_t getSessionID(long session_timeout_ms); void registerSession(int64_t session_id, ZooKeeperResponseCallback callback); /// Call if we don't need any responses for this session no more (session was expired)