diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index e958774c4a0..2c7bf8cc1f0 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -27,6 +28,17 @@ void ZooKeeperResponse::write(WriteBuffer & out) const out.next(); } +std::string ZooKeeperRequest::toString() const +{ + return fmt::format( + "XID = {}\n" + "OpNum = {}\n" + "Additional info:\n{}", + xid, + getOpNum(), + toStringImpl()); +} + void ZooKeeperRequest::write(WriteBuffer & out) const { /// Excessive copy to calculate length. @@ -48,6 +60,11 @@ void ZooKeeperSyncRequest::readImpl(ReadBuffer & in) Coordination::read(path, in); } +std::string ZooKeeperSyncRequest::toStringImpl() const +{ + return fmt::format("path = {}", path); +} + void ZooKeeperSyncResponse::readImpl(ReadBuffer & in) { Coordination::read(path, in); @@ -93,6 +110,17 @@ void ZooKeeperAuthRequest::readImpl(ReadBuffer & in) Coordination::read(data, in); } +std::string ZooKeeperAuthRequest::toStringImpl() const +{ + return fmt::format( + "type = {}\n" + "scheme = {}\n" + "data = {}", + type, + scheme, + data); +} + void ZooKeeperCreateRequest::writeImpl(WriteBuffer & out) const { Coordination::write(path, out); @@ -124,6 +152,19 @@ void ZooKeeperCreateRequest::readImpl(ReadBuffer & in) is_sequential = true; } +std::string ZooKeeperCreateRequest::toStringImpl() const +{ + return fmt::format( + "path = {}\n" + "data = {}\n" + "is_ephemeral = {}\n" + "is_sequential = {}", + path, + data, + is_ephemeral, + is_sequential); +} + void ZooKeeperCreateResponse::readImpl(ReadBuffer & in) { Coordination::read(path_created, in); @@ -140,6 +181,15 @@ void ZooKeeperRemoveRequest::writeImpl(WriteBuffer & out) const Coordination::write(version, out); } +std::string ZooKeeperRemoveRequest::toStringImpl() const +{ + return fmt::format( + "path = {}\n" + "version = {}", + path, + version); +} + void ZooKeeperRemoveRequest::readImpl(ReadBuffer & in) { Coordination::read(path, in); @@ -158,6 +208,11 @@ void ZooKeeperExistsRequest::readImpl(ReadBuffer & in) Coordination::read(has_watch, in); } +std::string ZooKeeperExistsRequest::toStringImpl() const +{ + return fmt::format("path = {}", path); +} + void ZooKeeperExistsResponse::readImpl(ReadBuffer & in) { Coordination::read(stat, in); @@ -180,6 +235,11 @@ void ZooKeeperGetRequest::readImpl(ReadBuffer & in) Coordination::read(has_watch, in); } +std::string ZooKeeperGetRequest::toStringImpl() const +{ + return fmt::format("path = {}", path); +} + void ZooKeeperGetResponse::readImpl(ReadBuffer & in) { Coordination::read(data, in); @@ -206,6 +266,17 @@ void ZooKeeperSetRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } +std::string ZooKeeperSetRequest::toStringImpl() const +{ + return fmt::format( + "path = {}\n", + "data = {}\n" + "version = {}", + path, + data, + version); +} + void ZooKeeperSetResponse::readImpl(ReadBuffer & in) { Coordination::read(stat, in); @@ -228,6 +299,11 @@ void ZooKeeperListRequest::readImpl(ReadBuffer & in) Coordination::read(has_watch, in); } +std::string ZooKeeperListRequest::toStringImpl() const +{ + return fmt::format("path = {}", path); +} + void ZooKeeperListResponse::readImpl(ReadBuffer & in) { Coordination::read(names, in); @@ -255,6 +331,11 @@ void ZooKeeperSetACLRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } +std::string ZooKeeperSetACLRequest::toStringImpl() const +{ + return fmt::format("path = {}\n", "version = {}", path, version); +} + void ZooKeeperSetACLResponse::writeImpl(WriteBuffer & out) const { Coordination::write(stat, out); @@ -275,6 +356,11 @@ void ZooKeeperGetACLRequest::writeImpl(WriteBuffer & out) const Coordination::write(path, out); } +std::string ZooKeeperGetACLRequest::toStringImpl() const +{ + return fmt::format("path = {}", path); +} + void ZooKeeperGetACLResponse::writeImpl(WriteBuffer & out) const { Coordination::write(acl, out); @@ -299,6 +385,11 @@ void ZooKeeperCheckRequest::readImpl(ReadBuffer & in) Coordination::read(version, in); } +std::string ZooKeeperCheckRequest::toStringImpl() const +{ + return fmt::format("path = {}\n", "version = {}", path, version); +} + void ZooKeeperErrorResponse::readImpl(ReadBuffer & in) { Coordination::Error read_error; @@ -401,6 +492,17 @@ void ZooKeeperMultiRequest::readImpl(ReadBuffer & in) } } +std::string ZooKeeperMultiRequest::toStringImpl() const +{ + auto out = fmt::memory_buffer(); + for (const auto & request : requests) + { + const auto & zk_request = dynamic_cast(*request); + format_to(std::back_inserter(out), "SubRequest\n{}\n", zk_request.toString()); + } + return {out.data(), out.size()}; +} + bool ZooKeeperMultiRequest::isReadRequest() const { /// Possibly we can do better diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index 532488c08f8..9c54dfb9f0c 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -68,10 +68,13 @@ struct ZooKeeperRequest : virtual Request /// Writes length, xid, op_num, then the rest. void write(WriteBuffer & out) const; + std::string toString() const; virtual void writeImpl(WriteBuffer &) const = 0; virtual void readImpl(ReadBuffer &) = 0; + virtual std::string toStringImpl() const { return ""; } + static std::shared_ptr read(ReadBuffer & in); virtual ZooKeeperResponsePtr makeResponse() const = 0; @@ -100,6 +103,7 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Sync; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -150,6 +154,7 @@ struct ZooKeeperAuthRequest final : ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Auth; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -202,6 +207,7 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Create; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -232,6 +238,7 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Remove; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -255,6 +262,7 @@ struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Exists; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -278,6 +286,7 @@ struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Get; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -304,6 +313,7 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Set; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -328,6 +338,7 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::List; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -363,6 +374,7 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::Check; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -397,6 +409,7 @@ struct ZooKeeperSetACLRequest final : SetACLRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::SetACL; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return false; } @@ -417,6 +430,7 @@ struct ZooKeeperGetACLRequest final : GetACLRequest, ZooKeeperRequest OpNum getOpNum() const override { return OpNum::GetACL; } void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override { return true; } @@ -441,6 +455,7 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest void writeImpl(WriteBuffer & out) const override; void readImpl(ReadBuffer & in) override; + std::string toStringImpl() const override; ZooKeeperResponsePtr makeResponse() const override; bool isReadRequest() const override; diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 24375269f7f..378cec4f2b0 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -405,9 +405,10 @@ nuraft::ptr getZooKeeperLogEntry(const KeeperStorage::RequestFor request_for_session.request->write(write_buf); DB::writeIntBinary(request_for_session.time, write_buf); DB::writeIntBinary(request_for_session.zxid, write_buf); - DB::writeIntBinary(request_for_session.digest.version, write_buf); - if (request_for_session.digest.version != KeeperStorage::DigestVersion::NO_DIGEST) - DB::writeIntBinary(request_for_session.digest.value, write_buf); + assert(request_for_session.digest); + DB::writeIntBinary(request_for_session.digest->version, write_buf); + if (request_for_session.digest->version != KeeperStorage::DigestVersion::NO_DIGEST) + DB::writeIntBinary(request_for_session.digest->value, write_buf); return write_buf.getBuffer(); } diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index d45f58a0bf7..47551a9cc3e 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -151,6 +151,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr if (snapshot.version >= SnapshotVersion::V5) { writeBinary(snapshot.zxid, out); + writeBinary(static_cast(KeeperStorage::CURRENT_DIGEST_VERSION), out); writeBinary(snapshot.nodes_digest, out); } @@ -243,10 +244,22 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial deserialization_result.snapshot_meta = deserializeSnapshotMetadata(in); KeeperStorage & storage = *deserialization_result.storage; + bool recalculate_digest = storage.digest_enabled; if (version >= SnapshotVersion::V5) { readBinary(storage.zxid, in); - readBinary(storage.nodes_digest, in); + uint8_t digest_version; + readBinary(digest_version, in); + if (digest_version != KeeperStorage::DigestVersion::NO_DIGEST) + { + uint64_t nodes_digest; + readBinary(nodes_digest, in); + if (digest_version == KeeperStorage::CURRENT_DIGEST_VERSION) + { + storage.nodes_digest = nodes_digest; + recalculate_digest = false; + } + } } else storage.zxid = deserialization_result.snapshot_meta->get_last_log_idx(); @@ -299,6 +312,9 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial storage.ephemerals[node.stat.ephemeralOwner].insert(path); current_size++; + + if (recalculate_digest) + storage.nodes_digest += node.getDigest(path); } for (const auto & itr : storage.container) diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index df417b72b2e..c4a299b2532 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -128,9 +128,10 @@ KeeperStorage::RequestForSession KeeperStateMachine::parseRequest(nuraft::buffer if (!buffer.eof()) { - readIntBinary(request_for_session.digest.version, buffer); - if (request_for_session.digest.version != KeeperStorage::DigestVersion::NO_DIGEST) - readIntBinary(request_for_session.digest.value, buffer); + request_for_session.digest.emplace(); + readIntBinary(request_for_session.digest->version, buffer); + if (request_for_session.digest->version != KeeperStorage::DigestVersion::NO_DIGEST) + readIntBinary(request_for_session.digest->value, buffer); } return request_for_session; @@ -180,9 +181,19 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n } - if (digest_enabled && !KeeperStorage::checkDigest(request_for_session.digest, storage->getNodesDigest(true))) + assert(request_for_session.digest); + auto local_nodes_digest = storage->getNodesDigest(true); + if (digest_enabled && !KeeperStorage::checkDigest(*request_for_session.digest, local_nodes_digest)) { - LOG_ERROR(log, "Digest for nodes is not matching after applying request of type {}", request_for_session.request->getOpNum()); + LOG_ERROR( + log, + "Digest for nodes is not matching after applying request of type '{}'.\nExpected digest - {}, actual digest {} (digest version {}). Keeper will " + "terminate to avoid inconsistencies.\nExtra information about the request:\n{}", + request_for_session.request->getOpNum(), + request_for_session.digest->value, + local_nodes_digest.value, + request_for_session.digest->version, + request_for_session.request->toString()); std::terminate(); } diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 3475e7ec0b7..66eff3b62f9 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -28,7 +28,8 @@ public: /// Read state from the latest snapshot void init(); - KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data); + static KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data); + void preprocess(const KeeperStorage::RequestForSession & request_for_session); nuraft::ptr pre_commit(uint64_t log_idx, nuraft::buffer & data) override; diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 71d48e53f09..e64962bd01a 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -236,7 +236,7 @@ struct Overloaded : Ts... template Overloaded(Ts...) -> Overloaded; -std::shared_ptr KeeperStorage::UncommittedState::getNode(StringRef path, std::optional last_zxid) const +std::shared_ptr KeeperStorage::UncommittedState::getNode(StringRef path, std::optional current_zxid) const { std::shared_ptr node{nullptr}; @@ -273,7 +273,7 @@ std::shared_ptr KeeperStorage::UncommittedState::getNode(St }, [&](auto && /*delta*/) {}, }, - last_zxid); + current_zxid); return node; } @@ -1657,7 +1657,7 @@ void KeeperStorage::preprocessRequest( int64_t time, int64_t new_last_zxid, bool check_acl, - Digest expected_digest) + std::optional digest) { int64_t last_zxid = getNextZXID() - 1; @@ -1669,7 +1669,9 @@ void KeeperStorage::preprocessRequest( } else { - // if we are leader node, the request potentially already got processed + // if we are Leader node, the request potentially already got preprocessed + // Leader can preprocess requests in a batch so the ZXID we are searching isn't + // guaranteed to be last auto txn_it = std::lower_bound( uncommitted_transactions.begin(), uncommitted_transactions.end(), @@ -1687,7 +1689,7 @@ void KeeperStorage::preprocessRequest( } else { - if (txn_it->zxid == new_last_zxid && checkDigest(txn_it->nodes_digest, expected_digest)) + if (txn_it->zxid == new_last_zxid) // we found the preprocessed request with the same ZXID, we can skip it return; @@ -1698,10 +1700,17 @@ void KeeperStorage::preprocessRequest( TransactionInfo transaction{.zxid = new_last_zxid}; SCOPE_EXIT({ - if (expected_digest.version == DigestVersion::NO_DIGEST && digest_enabled) - transaction.nodes_digest = Digest{CURRENT_DIGEST_VERSION, calculateNodesDigest(getNodesDigest(false).value, transaction.zxid)}; + if (digest_enabled) + { + // if the leader has the same digest calculation version we + // can skip recalculating and use that value + if (digest && digest->version == CURRENT_DIGEST_VERSION) + transaction.nodes_digest = *digest; + else + transaction.nodes_digest = Digest{CURRENT_DIGEST_VERSION, calculateNodesDigest(getNodesDigest(false).value, transaction.zxid)}; + } else - transaction.nodes_digest = expected_digest; + transaction.nodes_digest = Digest{DigestVersion::NO_DIGEST}; uncommitted_transactions.emplace_back(transaction); }); @@ -1825,7 +1834,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest( if (is_local) { - assert(!zk_request->isReadRequest()); + assert(zk_request->isReadRequest()); if (check_acl && !request_processor->checkAuth(*this, session_id, true)) { response = zk_request->makeResponse(); diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 4227f7b404f..d4c60735bd7 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -101,7 +101,7 @@ public: int64_t time; Coordination::ZooKeeperRequestPtr request; int64_t zxid{0}; - Digest digest; + std::optional digest; }; struct AuthID @@ -211,11 +211,11 @@ public: explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { } template - void applyDeltas(StringRef path, const Visitor & visitor, std::optional last_zxid = std::nullopt) const + void applyDeltas(StringRef path, const Visitor & visitor, std::optional current_zxid = std::nullopt) const { for (const auto & delta : deltas) { - if (last_zxid && delta.zxid >= last_zxid) + if (current_zxid && delta.zxid >= current_zxid) break; if (path.empty() || delta.path == path) @@ -223,7 +223,7 @@ public: } } - bool hasACL(int64_t session_id, bool is_local, std::function predicate) + bool hasACL(int64_t session_id, bool is_local, std::function predicate) { for (const auto & session_auth : storage.session_and_auth[session_id]) { @@ -234,7 +234,6 @@ public: if (is_local) return false; - for (const auto & delta : deltas) { if (const auto * auth_delta = std::get_if(&delta.operation); @@ -245,7 +244,7 @@ public: return false; } - std::shared_ptr getNode(StringRef path, std::optional last_zxid = std::nullopt) const; + std::shared_ptr getNode(StringRef path, std::optional current_zxid = std::nullopt) const; bool hasNode(StringRef path) const; Coordination::ACLs getACLs(StringRef path) const; @@ -361,7 +360,7 @@ public: int64_t time, int64_t new_last_zxid, bool check_acl = true, - Digest digest = {DigestVersion::NO_DIGEST, 0}); + std::optional digest = std::nullopt); void rollbackRequest(int64_t rollback_zxid); void finalize(); diff --git a/src/Coordination/ZooKeeperDataReader.cpp b/src/Coordination/ZooKeeperDataReader.cpp index 7090234b593..4d1745edc6a 100644 --- a/src/Coordination/ZooKeeperDataReader.cpp +++ b/src/Coordination/ZooKeeperDataReader.cpp @@ -520,7 +520,7 @@ bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*l if (request->getOpNum() == Coordination::OpNum::Multi && hasErrorsInMultiRequest(request)) return true; - storage.preprocessRequest(request, session_id, time, zxid, /* check_acl = */ false, {KeeperStorage::DigestVersion::NO_DIGEST}); + storage.preprocessRequest(request, session_id, time, zxid, /* check_acl = */ false); storage.processRequest(request, session_id, time, zxid, /* check_acl = */ false); } }