Apply uncommitted state after snapshot deser

This commit is contained in:
Antonio Andelic 2023-06-02 13:31:14 +00:00
parent eb5985e5fc
commit 336c9d7136
4 changed files with 124 additions and 14 deletions

View File

@ -363,6 +363,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
else if (s.get_last_log_idx() < latest_snapshot_meta->get_last_log_idx())
{
LOG_INFO(log, "A snapshot with a larger last log index ({}) was created, skipping applying this snapshot", latest_snapshot_meta->get_last_log_idx());
return true;
}
latest_snapshot_ptr = latest_snapshot_buf;
@ -372,6 +373,10 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
std::lock_guard lock(storage_and_responses_lock);
auto snapshot_deserialization_result
= snapshot_manager.deserializeSnapshotFromBuffer(snapshot_manager.deserializeSnapshotBufferFromDisk(s.get_last_log_idx()));
/// maybe some logs were preprocessed with log idx larger than the snapshot idx
/// we have to apply them to the new storage
storage->applyUncommittedState(*snapshot_deserialization_result.storage, s.get_last_log_idx());
storage = std::move(snapshot_deserialization_result.storage);
latest_snapshot_meta = snapshot_deserialization_result.snapshot_meta;
cluster_config = snapshot_deserialization_result.cluster_config;

View File

@ -375,23 +375,26 @@ void KeeperStorage::UncommittedState::applyDelta(const Delta & delta)
delta.operation);
}
void KeeperStorage::UncommittedState::addDelta(Delta new_delta)
{
const auto & added_delta = deltas.emplace_back(std::move(new_delta));
if (!added_delta.path.empty())
{
deltas_for_path[added_delta.path].push_back(&added_delta);
applyDelta(added_delta);
}
else if (const auto * auth_delta = std::get_if<AddAuthDelta>(&added_delta.operation))
{
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
uncommitted_auth.emplace_back(&auth_delta->auth_id);
}
}
void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
{
for (auto & delta : new_deltas)
{
const auto & added_delta = deltas.emplace_back(std::move(delta));
if (!added_delta.path.empty())
{
deltas_for_path[added_delta.path].push_back(&added_delta);
applyDelta(added_delta);
}
else if (const auto * auth_delta = std::get_if<AddAuthDelta>(&added_delta.operation))
{
auto & uncommitted_auth = session_and_auth[auth_delta->session_id];
uncommitted_auth.emplace_back(&auth_delta->auth_id);
}
}
addDelta(std::move(delta));
}
void KeeperStorage::UncommittedState::commit(int64_t commit_zxid)
@ -602,6 +605,26 @@ namespace
}
void KeeperStorage::applyUncommittedState(KeeperStorage & other, int64_t last_zxid)
{
for (const auto & transaction : uncommitted_transactions)
{
if (transaction.zxid <= last_zxid)
continue;
other.uncommitted_transactions.push_back(transaction);
}
auto it = uncommitted_state.deltas.begin();
for (; it != uncommitted_state.deltas.end(); ++it)
{
if (it->zxid <= last_zxid)
continue;
other.uncommitted_state.addDelta(*it);
}
}
Coordination::Error KeeperStorage::commit(int64_t commit_zxid)
{
// Deltas are added with increasing ZXIDs

View File

@ -222,6 +222,7 @@ public:
{
explicit UncommittedState(KeeperStorage & storage_) : storage(storage_) { }
void addDelta(Delta new_delta);
void addDeltas(std::vector<Delta> new_deltas);
void commit(int64_t commit_zxid);
void rollback(int64_t rollback_zxid);
@ -310,6 +311,10 @@ public:
UncommittedState uncommitted_state{*this};
// Apply uncommitted state to another storage using only transactions
// with zxid > last_zxid
void applyUncommittedState(KeeperStorage & other, int64_t last_zxid);
Coordination::Error commit(int64_t zxid);
// Create node in the storage

View File

@ -2524,6 +2524,83 @@ TEST_P(CoordinationTest, TestCheckNotExistsRequest)
}
}
TEST_P(CoordinationTest, TestReapplyingDeltas)
{
using namespace DB;
using namespace Coordination;
static constexpr int64_t initial_zxid = 100;
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
create_request->path = "/test/data";
create_request->is_sequential = true;
const auto process_create = [](KeeperStorage & storage, const auto & request, int64_t zxid)
{
storage.preprocessRequest(request, 1, 0, zxid);
auto responses = storage.processRequest(request, 1, zxid);
EXPECT_GE(responses.size(), 1);
EXPECT_EQ(responses[0].response->error, Error::ZOK);
};
const auto commit_initial_data = [&](auto & storage)
{
int64_t zxid = 1;
const auto root_create = std::make_shared<ZooKeeperCreateRequest>();
root_create->path = "/test";
process_create(storage, root_create, zxid);
++zxid;
for (; zxid <= initial_zxid; ++zxid)
process_create(storage, create_request, zxid);
};
KeeperStorage storage1{500, "", keeper_context};
commit_initial_data(storage1);
for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid)
storage1.preprocessRequest(create_request, 1, 0, zxid);
/// create identical new storage
KeeperStorage storage2{500, "", keeper_context};
commit_initial_data(storage2);
storage1.applyUncommittedState(storage2, initial_zxid);
const auto commit_unprocessed = [&](KeeperStorage & storage)
{
for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid)
{
auto responses = storage.processRequest(create_request, 1, zxid);
EXPECT_GE(responses.size(), 1);
EXPECT_EQ(responses[0].response->error, Error::ZOK);
}
};
commit_unprocessed(storage1);
commit_unprocessed(storage2);
const auto get_children = [&](KeeperStorage & storage)
{
const auto list_request = std::make_shared<ZooKeeperListRequest>();
list_request->path = "/test";
auto responses = storage.processRequest(list_request, 1, std::nullopt, /*check_acl=*/true, /*is_local=*/true);
EXPECT_EQ(responses.size(), 1);
const auto * list_response = dynamic_cast<const ListResponse *>(responses[0].response.get());
EXPECT_TRUE(list_response);
return list_response->names;
};
auto children1 = get_children(storage1);
std::unordered_set<std::string> children1_set(children1.begin(), children1.end());
auto children2 = get_children(storage2);
std::unordered_set<std::string> children2_set(children2.begin(), children2.end());
ASSERT_TRUE(children1_set == children2_set);
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest,
::testing::ValuesIn(std::initializer_list<CompressionParam>{