From 6fba1c96edea332bba6bc3c1b790476ab017c2b8 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Tue, 17 May 2022 13:53:12 +0000 Subject: [PATCH] Define small test for digest check --- src/Common/ErrorCodes.cpp | 1 + src/Coordination/KeeperStateMachine.cpp | 53 ++++++++---- src/Coordination/KeeperStorage.cpp | 3 + src/Coordination/tests/gtest_coordination.cpp | 86 ++++++++++++++++--- 4 files changed, 110 insertions(+), 33 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index eb84e24b713..5a40362489f 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -624,6 +624,7 @@ M(653, CANNOT_PARSE_BACKUP_SETTINGS) \ M(654, WRONG_BACKUP_SETTINGS) \ M(655, FAILED_TO_RESTORE_METADATA_ON_OTHER_NODE) \ + M(656, INVALID_STATE) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index c4a299b2532..25e497e32a3 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -4,7 +4,9 @@ #include #include #include +#include "Common/ZooKeeper/ZooKeeperCommon.h" #include +#include "Coordination/KeeperStorage.h" #include #include @@ -13,7 +15,7 @@ namespace DB namespace ErrorCodes { - extern const int LOGICAL_ERROR; + extern const int INVALID_STATE; extern const int SYSTEM_ERROR; } @@ -137,12 +139,39 @@ KeeperStorage::RequestForSession KeeperStateMachine::parseRequest(nuraft::buffer return request_for_session; } +namespace +{ + +void assertDigest(const KeeperStorage::Digest & first, const KeeperStorage::Digest & second, const Coordination::ZooKeeperRequest & request, bool committing) +{ + if (!KeeperStorage::checkDigest(first, second)) + { + throw DB::Exception( + DB::ErrorCodes::INVALID_STATE, + "Digest for nodes is not matching after {} request of type '{}'.\nExpected digest - {}, actual digest {} (digest version {}). Keeper will " + "terminate to avoid inconsistencies.\nExtra information about the request:\n{}", + committing ? "committing" : "preprocessing", + request.getOpNum(), + first.value, + second.value, + first.version, + request.toString()); + } +} + +} + void KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & request_for_session) { if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) return; - std::lock_guard lock(storage_and_responses_lock); - storage->preprocessRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, request_for_session.zxid, true /* check_acl */, request_for_session.digest); + { + std::lock_guard lock(storage_and_responses_lock); + storage->preprocessRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, request_for_session.zxid, true /* check_acl */, request_for_session.digest); + } + + if (digest_enabled && request_for_session.digest) + assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, false); } nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data) @@ -182,21 +211,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n assert(request_for_session.digest); - auto local_nodes_digest = storage->getNodesDigest(true); - if (digest_enabled && !KeeperStorage::checkDigest(*request_for_session.digest, local_nodes_digest)) - { - LOG_ERROR( - log, - "Digest for nodes is not matching after applying request of type '{}'.\nExpected digest - {}, actual digest {} (digest version {}). Keeper will " - "terminate to avoid inconsistencies.\nExtra information about the request:\n{}", - request_for_session.request->getOpNum(), - request_for_session.digest->value, - local_nodes_digest.value, - request_for_session.digest->version, - request_for_session.request->toString()); - std::terminate(); - } - + assertDigest(*request_for_session.digest, storage->getNodesDigest(true), *request_for_session.request, true); last_committed_idx = log_idx; return nullptr; } @@ -208,7 +223,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) { /// save snapshot into memory std::lock_guard lock(snapshots_lock); if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Required to apply snapshot with last log index {}, but our last log index is {}", + throw Exception(ErrorCodes::INVALID_STATE, "Required to apply snapshot with last log index {}, but our last log index is {}", s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx()); latest_snapshot_ptr = latest_snapshot_buf; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 2d0b10f0d9a..3f07ef067d5 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1701,6 +1701,9 @@ void KeeperStorage::preprocessRequest( TransactionInfo transaction{.zxid = new_last_zxid}; SCOPE_EXIT({ if (digest_enabled) + // if the version of digest we got from the leader is the same as the one this instances has, we can simply copy the value + // and just check the digest on the commit + // a mistake can happen while applying the changes to the uncommitted_state so for now let's just recalculate the digest here also transaction.nodes_digest = Digest{CURRENT_DIGEST_VERSION, calculateNodesDigest(getNodesDigest(false).value, transaction.zxid)}; else transaction.nodes_digest = Digest{DigestVersion::NO_DIGEST}; diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index e518c07a8ff..24fb4a3d96e 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1224,7 +1224,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotBroken) EXPECT_THROW(manager.restoreFromLatestSnapshot(), DB::Exception); } -nuraft::ptr getBufferFromZKRequest(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request) +nuraft::ptr getBufferFromZKRequest(int64_t session_id, int64_t zxid, const Coordination::ZooKeeperRequestPtr & request, const std::optional digest = std::nullopt) { DB::WriteBufferFromNuraftBuffer buf; DB::writeIntBinary(session_id, buf); @@ -1232,12 +1232,18 @@ nuraft::ptr getBufferFromZKRequest(int64_t session_id, const Coo using namespace std::chrono; auto time = duration_cast(system_clock::now().time_since_epoch()).count(); DB::writeIntBinary(time, buf); + DB::writeIntBinary(zxid, buf); + if (digest) + { + DB::writeIntBinary(DB::KeeperStorage::CURRENT_DIGEST_VERSION, buf); + DB::writeIntBinary(*digest, buf); + } return buf.getBuffer(); } -nuraft::ptr getLogEntryFromZKRequest(size_t term, int64_t session_id, const Coordination::ZooKeeperRequestPtr & request) +nuraft::ptr getLogEntryFromZKRequest(size_t term, int64_t session_id, int64_t zxid, const Coordination::ZooKeeperRequestPtr & request, const std::optional digest = std::nullopt) { - auto buffer = getBufferFromZKRequest(session_id, request); + auto buffer = getBufferFromZKRequest(session_id, zxid, request, digest); return nuraft::cs_new(term, buffer); } @@ -1259,7 +1265,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint { std::shared_ptr request = std::make_shared(); request->path = "/hello_" + std::to_string(i); - auto entry = getLogEntryFromZKRequest(0, 1, request); + auto entry = getLogEntryFromZKRequest(0, 1, 1, request); changelog.append(entry); changelog.end_of_append_batch(0, 0); @@ -1410,7 +1416,7 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove) std::shared_ptr request_c = std::make_shared(); request_c->path = "/hello"; request_c->is_ephemeral = true; - auto entry_c = getLogEntryFromZKRequest(0, 1, request_c); + auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c); state_machine->pre_commit(1, entry_c->get_buf()); state_machine->commit(1, entry_c->get_buf()); const auto & storage = state_machine->getStorage(); @@ -1419,7 +1425,7 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove) std::shared_ptr request_d = std::make_shared(); request_d->path = "/hello"; /// Delete from other session - auto entry_d = getLogEntryFromZKRequest(0, 2, request_d); + auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d); state_machine->pre_commit(2, entry_d->get_buf()); state_machine->commit(2, entry_d->get_buf()); @@ -1440,7 +1446,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges) { std::shared_ptr request = std::make_shared(); request->path = "/hello_" + std::to_string(i); - auto entry = getLogEntryFromZKRequest(0, 1, request); + auto entry = getLogEntryFromZKRequest(0, 1, i, request); changelog.append(entry); changelog.end_of_append_batch(0, 0); } @@ -1455,7 +1461,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges) { std::shared_ptr request = std::make_shared(); request->path = "/hello_" + std::to_string(100 + i); - auto entry = getLogEntryFromZKRequest(0, 1, request); + auto entry = getLogEntryFromZKRequest(0, 1, i, request); changelog_1.append(entry); changelog_1.end_of_append_batch(0, 0); } @@ -1470,7 +1476,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges) { std::shared_ptr request = std::make_shared(); request->path = "/hello_" + std::to_string(200 + i); - auto entry = getLogEntryFromZKRequest(0, 1, request); + auto entry = getLogEntryFromZKRequest(0, 1, i, request); changelog_2.append(entry); changelog_2.end_of_append_batch(0, 0); } @@ -1490,7 +1496,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges) { std::shared_ptr request = std::make_shared(); request->path = "/hello_" + std::to_string(300 + i); - auto entry = getLogEntryFromZKRequest(0, 1, request); + auto entry = getLogEntryFromZKRequest(0, 1, i, request); changelog_3.append(entry); changelog_3.end_of_append_batch(0, 0); } @@ -1537,7 +1543,7 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite) { std::shared_ptr request = std::make_shared(); request->path = "/hello_" + std::to_string(i); - auto entry = getLogEntryFromZKRequest(0, 1, request); + auto entry = getLogEntryFromZKRequest(0, 1, i, request); changelog.append(entry); changelog.end_of_append_batch(0, 0); } @@ -1549,7 +1555,7 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite) { std::shared_ptr request = std::make_shared(); request->path = "/hello_" + std::to_string(i); - auto entry = getLogEntryFromZKRequest(0, 1, request); + auto entry = getLogEntryFromZKRequest(0, 1, i, request); changelog1.append(entry); changelog1.end_of_append_batch(0, 0); } @@ -1560,7 +1566,7 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite) { std::shared_ptr request = std::make_shared(); request->path = "/hello_" + std::to_string(i); - auto entry = getLogEntryFromZKRequest(0, 1, request); + auto entry = getLogEntryFromZKRequest(0, 1, i, request); changelog2.append(entry); changelog2.end_of_append_batch(0, 0); } @@ -1769,7 +1775,7 @@ TEST_P(CoordinationTest, TestLogGap) { std::shared_ptr request = std::make_shared(); request->path = "/hello_" + std::to_string(i); - auto entry = getLogEntryFromZKRequest(0, 1, request); + auto entry = getLogEntryFromZKRequest(0, 1, i, request); changelog.append(entry); changelog.end_of_append_batch(0, 0); } @@ -1907,6 +1913,58 @@ TEST_P(CoordinationTest, TestUncommittedStateBasicCrud) ASSERT_FALSE(get_committed_data()); } +TEST_P(CoordinationTest, TestDigest) +{ + using namespace Coordination; + using namespace DB; + + ChangelogDirTest snapshots1("./snapshots1"); + ChangelogDirTest snapshots2("./snapshots2"); + CoordinationSettingsPtr settings = std::make_shared(); + + ResponsesQueue queue(std::numeric_limits::max()); + SnapshotsQueue snapshots_queue{1}; + const auto test_digest = [&](const auto modify_digest) + { + auto state_machine1 = std::make_shared(queue, snapshots_queue, "./snapshots1", settings); + auto state_machine2 = std::make_shared(queue, snapshots_queue, "./snapshots2", settings); + state_machine1->init(); + state_machine2->init(); + + std::shared_ptr request_c = std::make_shared(); + request_c->path = "/hello"; + auto zxid = state_machine1->getNextZxid(); + auto entry_c = getLogEntryFromZKRequest(0, 1, zxid, request_c); + state_machine1->pre_commit(1, entry_c->get_buf()); + auto correct_digest = state_machine1->getNodesDigest(); + ASSERT_EQ(correct_digest.version, DB::KeeperStorage::CURRENT_DIGEST_VERSION); + entry_c = getLogEntryFromZKRequest(0, 1, zxid, request_c, correct_digest.value); + + if (modify_digest) + { + std::shared_ptr modified_c = std::make_shared(); + modified_c->path = "modified"; + auto modified_entry = getLogEntryFromZKRequest(0, 1, zxid, modified_c, correct_digest.value); + ASSERT_THROW(state_machine2->pre_commit(1, modified_entry->get_buf()), DB::Exception); + } + else + ASSERT_NO_THROW(state_machine2->pre_commit(1, entry_c->get_buf())); + + if (modify_digest) + { + auto new_digest = modify_digest ? correct_digest.value + 1 : correct_digest.value; + auto modified_entry = getLogEntryFromZKRequest(0, 1, zxid, request_c, new_digest); + ASSERT_THROW(state_machine1->commit(1, modified_entry->get_buf()), DB::Exception); + } + else + ASSERT_NO_THROW(state_machine1->commit(1, entry_c->get_buf())); + }; + + test_digest(true); + test_digest(true); + test_digest(false); + test_digest(false); +} INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite, CoordinationTest,