Define small test for digest check

This commit is contained in:
Antonio Andelic 2022-05-17 13:53:12 +00:00
parent c5e4598447
commit 6fba1c96ed
4 changed files with 110 additions and 33 deletions

View File

@ -624,6 +624,7 @@
M(653, CANNOT_PARSE_BACKUP_SETTINGS) \
M(654, WRONG_BACKUP_SETTINGS) \
M(655, FAILED_TO_RESTORE_METADATA_ON_OTHER_NODE) \
M(656, INVALID_STATE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -4,7 +4,9 @@
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include "Coordination/KeeperStorage.h"
#include <Coordination/KeeperSnapshotManager.h>
#include <future>
@ -13,7 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_STATE;
extern const int SYSTEM_ERROR;
}
@ -137,12 +139,39 @@ KeeperStorage::RequestForSession KeeperStateMachine::parseRequest(nuraft::buffer
return request_for_session;
}
namespace
{
void assertDigest(const KeeperStorage::Digest & first, const KeeperStorage::Digest & second, const Coordination::ZooKeeperRequest & request, bool committing)
{
if (!KeeperStorage::checkDigest(first, second))
{
throw DB::Exception(
DB::ErrorCodes::INVALID_STATE,
"Digest for nodes is not matching after {} request of type '{}'.\nExpected digest - {}, actual digest {} (digest version {}). Keeper will "
"terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
committing ? "committing" : "preprocessing",
request.getOpNum(),
first.value,
second.value,
first.version,
request.toString());
}
}
}
void KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & request_for_session)
{
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
std::lock_guard lock(storage_and_responses_lock);
storage->preprocessRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, request_for_session.zxid, true /* check_acl */, request_for_session.digest);
{
std::lock_guard lock(storage_and_responses_lock);
storage->preprocessRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, request_for_session.zxid, true /* check_acl */, request_for_session.digest);
}
if (digest_enabled && request_for_session.digest)
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, false);
}
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
@ -182,21 +211,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
assert(request_for_session.digest);
auto local_nodes_digest = storage->getNodesDigest(true);
if (digest_enabled && !KeeperStorage::checkDigest(*request_for_session.digest, local_nodes_digest))
{
LOG_ERROR(
log,
"Digest for nodes is not matching after applying request of type '{}'.\nExpected digest - {}, actual digest {} (digest version {}). Keeper will "
"terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
request_for_session.request->getOpNum(),
request_for_session.digest->value,
local_nodes_digest.value,
request_for_session.digest->version,
request_for_session.request->toString());
std::terminate();
}
assertDigest(*request_for_session.digest, storage->getNodesDigest(true), *request_for_session.request, true);
last_committed_idx = log_idx;
return nullptr;
}
@ -208,7 +223,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{ /// save snapshot into memory
std::lock_guard lock(snapshots_lock);
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required to apply snapshot with last log index {}, but our last log index is {}",
throw Exception(ErrorCodes::INVALID_STATE, "Required to apply snapshot with last log index {}, but our last log index is {}",
s.get_last_log_idx(), latest_snapshot_meta->get_last_log_idx());
latest_snapshot_ptr = latest_snapshot_buf;
}

View File

@ -1701,6 +1701,9 @@ void KeeperStorage::preprocessRequest(
TransactionInfo transaction{.zxid = new_last_zxid};
SCOPE_EXIT({
if (digest_enabled)
// if the version of digest we got from the leader is the same as the one this instances has, we can simply copy the value
// and just check the digest on the commit
// a mistake can happen while applying the changes to the uncommitted_state so for now let's just recalculate the digest here also
transaction.nodes_digest = Digest{CURRENT_DIGEST_VERSION, calculateNodesDigest(getNodesDigest(false).value, transaction.zxid)};
else
transaction.nodes_digest = Digest{DigestVersion::NO_DIGEST};

View File

@ -1224,7 +1224,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotBroken)
EXPECT_THROW(manager.restoreFromLatestSnapshot(), DB::Exception);
}
nuraft::ptr<nuraft::buffer> getBufferFromZKRequest(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request)
nuraft::ptr<nuraft::buffer> getBufferFromZKRequest(int64_t session_id, int64_t zxid, const Coordination::ZooKeeperRequestPtr & request, const std::optional<uint64_t> digest = std::nullopt)
{
DB::WriteBufferFromNuraftBuffer buf;
DB::writeIntBinary(session_id, buf);
@ -1232,12 +1232,18 @@ nuraft::ptr<nuraft::buffer> getBufferFromZKRequest(int64_t session_id, const Coo
using namespace std::chrono;
auto time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
DB::writeIntBinary(time, buf);
DB::writeIntBinary(zxid, buf);
if (digest)
{
DB::writeIntBinary(DB::KeeperStorage::CURRENT_DIGEST_VERSION, buf);
DB::writeIntBinary(*digest, buf);
}
return buf.getBuffer();
}
nuraft::ptr<nuraft::log_entry> getLogEntryFromZKRequest(size_t term, int64_t session_id, const Coordination::ZooKeeperRequestPtr & request)
nuraft::ptr<nuraft::log_entry> getLogEntryFromZKRequest(size_t term, int64_t session_id, int64_t zxid, const Coordination::ZooKeeperRequestPtr & request, const std::optional<uint64_t> digest = std::nullopt)
{
auto buffer = getBufferFromZKRequest(session_id, request);
auto buffer = getBufferFromZKRequest(session_id, zxid, request, digest);
return nuraft::cs_new<nuraft::log_entry>(term, buffer);
}
@ -1259,7 +1265,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
auto entry = getLogEntryFromZKRequest(0, 1, 1, request);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
@ -1410,7 +1416,7 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();
request_c->path = "/hello";
request_c->is_ephemeral = true;
auto entry_c = getLogEntryFromZKRequest(0, 1, request_c);
auto entry_c = getLogEntryFromZKRequest(0, 1, state_machine->getNextZxid(), request_c);
state_machine->pre_commit(1, entry_c->get_buf());
state_machine->commit(1, entry_c->get_buf());
const auto & storage = state_machine->getStorage();
@ -1419,7 +1425,7 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
std::shared_ptr<ZooKeeperRemoveRequest> request_d = std::make_shared<ZooKeeperRemoveRequest>();
request_d->path = "/hello";
/// Delete from other session
auto entry_d = getLogEntryFromZKRequest(0, 2, request_d);
auto entry_d = getLogEntryFromZKRequest(0, 2, state_machine->getNextZxid(), request_d);
state_machine->pre_commit(2, entry_d->get_buf());
state_machine->commit(2, entry_d->get_buf());
@ -1440,7 +1446,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
auto entry = getLogEntryFromZKRequest(0, 1, i, request);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
}
@ -1455,7 +1461,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(100 + i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
auto entry = getLogEntryFromZKRequest(0, 1, i, request);
changelog_1.append(entry);
changelog_1.end_of_append_batch(0, 0);
}
@ -1470,7 +1476,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(200 + i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
auto entry = getLogEntryFromZKRequest(0, 1, i, request);
changelog_2.append(entry);
changelog_2.end_of_append_batch(0, 0);
}
@ -1490,7 +1496,7 @@ TEST_P(CoordinationTest, TestRotateIntervalChanges)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(300 + i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
auto entry = getLogEntryFromZKRequest(0, 1, i, request);
changelog_3.append(entry);
changelog_3.end_of_append_batch(0, 0);
}
@ -1537,7 +1543,7 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
auto entry = getLogEntryFromZKRequest(0, 1, i, request);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
}
@ -1549,7 +1555,7 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
auto entry = getLogEntryFromZKRequest(0, 1, i, request);
changelog1.append(entry);
changelog1.end_of_append_batch(0, 0);
}
@ -1560,7 +1566,7 @@ TEST_P(CoordinationTest, TestCompressedLogsMultipleRewrite)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
auto entry = getLogEntryFromZKRequest(0, 1, i, request);
changelog2.append(entry);
changelog2.end_of_append_batch(0, 0);
}
@ -1769,7 +1775,7 @@ TEST_P(CoordinationTest, TestLogGap)
{
std::shared_ptr<ZooKeeperCreateRequest> request = std::make_shared<ZooKeeperCreateRequest>();
request->path = "/hello_" + std::to_string(i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
auto entry = getLogEntryFromZKRequest(0, 1, i, request);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
}
@ -1907,6 +1913,58 @@ TEST_P(CoordinationTest, TestUncommittedStateBasicCrud)
ASSERT_FALSE(get_committed_data());
}
TEST_P(CoordinationTest, TestDigest)
{
using namespace Coordination;
using namespace DB;
ChangelogDirTest snapshots1("./snapshots1");
ChangelogDirTest snapshots2("./snapshots2");
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};
const auto test_digest = [&](const auto modify_digest)
{
auto state_machine1 = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots1", settings);
auto state_machine2 = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots2", settings);
state_machine1->init();
state_machine2->init();
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();
request_c->path = "/hello";
auto zxid = state_machine1->getNextZxid();
auto entry_c = getLogEntryFromZKRequest(0, 1, zxid, request_c);
state_machine1->pre_commit(1, entry_c->get_buf());
auto correct_digest = state_machine1->getNodesDigest();
ASSERT_EQ(correct_digest.version, DB::KeeperStorage::CURRENT_DIGEST_VERSION);
entry_c = getLogEntryFromZKRequest(0, 1, zxid, request_c, correct_digest.value);
if (modify_digest)
{
std::shared_ptr<ZooKeeperCreateRequest> modified_c = std::make_shared<ZooKeeperCreateRequest>();
modified_c->path = "modified";
auto modified_entry = getLogEntryFromZKRequest(0, 1, zxid, modified_c, correct_digest.value);
ASSERT_THROW(state_machine2->pre_commit(1, modified_entry->get_buf()), DB::Exception);
}
else
ASSERT_NO_THROW(state_machine2->pre_commit(1, entry_c->get_buf()));
if (modify_digest)
{
auto new_digest = modify_digest ? correct_digest.value + 1 : correct_digest.value;
auto modified_entry = getLogEntryFromZKRequest(0, 1, zxid, request_c, new_digest);
ASSERT_THROW(state_machine1->commit(1, modified_entry->get_buf()), DB::Exception);
}
else
ASSERT_NO_THROW(state_machine1->commit(1, entry_c->get_buf()));
};
test_digest(true);
test_digest(true);
test_digest(false);
test_digest(false);
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest,