Create snapshot

This commit is contained in:
Antonio Andelic 2024-05-21 16:19:14 +02:00
parent 6854f0e7ae
commit d4430b583c

View File

@ -628,7 +628,11 @@ struct ZooKeeperRequestFromLogReader
set_request->path = current_block->getPath(idx_in_block);
set_request->data = current_block->getData(idx_in_block);
if (auto version = current_block->getVersion(idx_in_block))
set_request->version = *version;
{
/// we just need to make sure that the request with version that need to fail, fail when replaying
if (request_from_log.expected_result == Coordination::Error::ZBADVERSION)
set_request->version = std::numeric_limits<int32_t>::max();
}
request_from_log.request = set_request;
break;
}
@ -637,7 +641,11 @@ struct ZooKeeperRequestFromLogReader
auto remove_request = std::make_shared<Coordination::ZooKeeperRemoveRequest>();
remove_request->path = current_block->getPath(idx_in_block);
if (auto version = current_block->getVersion(idx_in_block))
remove_request->version = *version;
{
/// we just need to make sure that the request with version that need to fail, fail when replaying
if (request_from_log.expected_result == Coordination::Error::ZBADVERSION)
remove_request->version = std::numeric_limits<int32_t>::max();
}
request_from_log.request = remove_request;
break;
}
@ -647,7 +655,11 @@ struct ZooKeeperRequestFromLogReader
auto check_request = std::make_shared<Coordination::ZooKeeperCheckRequest>();
check_request->path = current_block->getPath(idx_in_block);
if (auto version = current_block->getVersion(idx_in_block))
check_request->version = *version;
{
/// we just need to make sure that the request with version that need to fail, fail when replaying
if (request_from_log.expected_result == Coordination::Error::ZBADVERSION)
check_request->version = std::numeric_limits<int32_t>::max();
}
if (op_num == Coordination::OpNum::CheckNotExists)
check_request->not_exists = true;
request_from_log.request = check_request;
@ -791,10 +803,12 @@ struct SetupNodeCollector
if (!request_from_log.expected_result.has_value())
return;
auto process_request = [&](const Coordination::ZooKeeperRequest & request, const auto expected_result)
{
const auto & path = request.getPath();
if (processed_paths.contains(path))
if (nodes_created_during_replay.contains(path))
return;
auto op_num = request.getOpNum();
@ -804,64 +818,43 @@ struct SetupNodeCollector
if (expected_result == Coordination::Error::ZNODEEXISTS)
{
addExpectedNode(path);
processed_paths.insert(path);
}
else if (expected_result == Coordination::Error::ZOK)
{
nodes_created_during_replay.insert(path);
/// we need to make sure ancestors exist
auto position = path.find_last_of('/');
if (position != 0)
{
auto parent_path = path.substr(0, position);
if (!processed_paths.contains(parent_path))
{
addExpectedNode(parent_path);
processed_paths.insert(parent_path);
}
addExpectedNode(parent_path);
}
processed_paths.insert(path);
}
}
else if (op_num == Coordination::OpNum::Remove)
{
if (expected_result == Coordination::Error::ZOK)
{
if (expected_result == Coordination::Error::ZOK || expected_result == Coordination::Error::ZBADVERSION)
addExpectedNode(path);
processed_paths.insert(path);
}
}
else if (op_num == Coordination::OpNum::Set)
{
if (expected_result == Coordination::Error::ZOK)
{
if (expected_result == Coordination::Error::ZOK || expected_result == Coordination::Error::ZBADVERSION)
addExpectedNode(path);
processed_paths.insert(path);
}
}
else if (op_num == Coordination::OpNum::Check)
{
if (expected_result == Coordination::Error::ZOK)
{
if (expected_result == Coordination::Error::ZOK || expected_result == Coordination::Error::ZBADVERSION)
addExpectedNode(path);
processed_paths.insert(path);
}
}
else if (op_num == Coordination::OpNum::CheckNotExists)
{
if (expected_result == Coordination::Error::ZNODEEXISTS)
{
if (expected_result == Coordination::Error::ZNODEEXISTS || expected_result == Coordination::Error::ZBADVERSION)
addExpectedNode(path);
processed_paths.insert(path);
}
}
else if (request.isReadRequest())
{
if (expected_result == Coordination::Error::ZOK)
{
addExpectedNode(path);
processed_paths.insert(path);
}
}
};
@ -940,7 +933,7 @@ struct SetupNodeCollector
std::mutex nodes_mutex;
DB::KeeperContextPtr keeper_context;
Coordination::KeeperStoragePtr initial_storage;
std::unordered_set<std::string> processed_paths;
std::unordered_set<std::string> nodes_created_during_replay;
std::optional<Coordination::KeeperSnapshotManager> snapshot_manager;
};
@ -979,23 +972,23 @@ void requestFromLogExecutor(std::shared_ptr<ConcurrentBoundedQueue<RequestFromLo
if (*expected_result != response.error)
stats.unexpected_results.fetch_add(1, std::memory_order_relaxed);
if (*expected_result != response.error)
{
std::cerr << fmt::format(
"Unexpected result for {}\ngot {}, expected {}\n", request->toString(), response.error, *expected_result)
<< std::endl;
//if (*expected_result != response.error)
//{
// std::cerr << fmt::format(
// "Unexpected result for {}\ngot {}, expected {}\n", request->toString(), response.error, *expected_result)
// << std::endl;
if (const auto * multi_response = dynamic_cast<const Coordination::ZooKeeperMultiResponse *>(&response))
{
std::string subresponses;
for (size_t i = 0; i < multi_response->responses.size(); ++i)
{
subresponses += fmt::format("{} = {}\n", i, multi_response->responses[i]->error);
}
// if (const auto * multi_response = dynamic_cast<const Coordination::ZooKeeperMultiResponse *>(&response))
// {
// std::string subresponses;
// for (size_t i = 0; i < multi_response->responses.size(); ++i)
// {
// subresponses += fmt::format("{} = {}\n", i, multi_response->responses[i]->error);
// }
std::cerr << "Subresponses\n" << subresponses << std::endl;
}
}
// std::cerr << "Subresponses\n" << subresponses << std::endl;
// }
//}
}
request_promise->set_value();
@ -1049,7 +1042,7 @@ void Runner::runBenchmarkFromLog()
std::unordered_map<uint64_t, std::shared_ptr<ConcurrentBoundedQueue<RequestFromLog>>> executor_id_to_queue;
SCOPE_EXIT({
SCOPE_EXIT_SAFE({
for (const auto & [executor_id, executor_queue] : executor_id_to_queue)
executor_queue->finish();
@ -1262,8 +1255,15 @@ Runner::~Runner()
if (pool)
pool->wait();
auto connection = getConnection(connection_infos[0], 0);
benchmark_context.cleanup(*connection);
try
{
auto connection = getConnection(connection_infos[0], 0);
benchmark_context.cleanup(*connection);
}
catch (...)
{
DB::tryLogCurrentException("While trying to clean nodes");
}
}
namespace