From f1f668e7df24190eaf4f1d67360b9e53099289d2 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Fri, 10 May 2024 14:15:01 +0200 Subject: [PATCH] Setup node generator initial --- utils/keeper-bench/Runner.cpp | 288 ++++++++++++++++++++++++++++++---- utils/keeper-bench/Runner.h | 3 + utils/keeper-bench/main.cpp | 2 + 3 files changed, 265 insertions(+), 28 deletions(-) diff --git a/utils/keeper-bench/Runner.cpp b/utils/keeper-bench/Runner.cpp index a893dac3851..0050230b6ec 100644 --- a/utils/keeper-bench/Runner.cpp +++ b/utils/keeper-bench/Runner.cpp @@ -1,17 +1,22 @@ #include "Runner.h" #include -#include #include +#include +#include +#include #include "Common/ConcurrentBoundedQueue.h" +#include "Common/Exception.h" #include "Common/ZooKeeper/IKeeper.h" #include "Common/ZooKeeper/ZooKeeperArgs.h" #include "Common/ZooKeeper/ZooKeeperCommon.h" #include "Common/ZooKeeper/ZooKeeperConstants.h" #include #include +#include "Coordination/KeeperSnapshotManager.h" #include "Core/ColumnWithTypeAndName.h" #include "Core/ColumnsWithTypeAndName.h" +#include #include "IO/ReadBuffer.h" #include "IO/ReadBufferFromFile.h" #include "base/Decimal.h" @@ -43,12 +48,14 @@ Runner::Runner( std::optional concurrency_, const std::string & config_path, const std::string & input_request_log_, + const std::string & setup_nodes_snapshot_path_, const Strings & hosts_strings_, std::optional max_time_, std::optional delay_, std::optional continue_on_error_, std::optional max_iterations_) : input_request_log(input_request_log_) + , setup_nodes_snapshot_path(setup_nodes_snapshot_path_) , info(std::make_shared()) { @@ -381,18 +388,18 @@ struct ZooKeeperRequestBlock { explicit ZooKeeperRequestBlock(DB::Block block_) : block(std::move(block_)) - , hostname_idx(block.getPositionByName("hostname")) // - , request_event_time_idx(block.getPositionByName("request_event_time")) // - , thread_id_idx(block.getPositionByName("thread_id")) // - , session_id_idx(block.getPositionByName("session_id")) // - , xid_idx(block.getPositionByName("xid")) // + , hostname_idx(block.getPositionByName("hostname")) + , request_event_time_idx(block.getPositionByName("request_event_time")) + , thread_id_idx(block.getPositionByName("thread_id")) + , session_id_idx(block.getPositionByName("session_id")) + , xid_idx(block.getPositionByName("xid")) , has_watch_idx(block.getPositionByName("has_watch")) , op_num_idx(block.getPositionByName("op_num")) , path_idx(block.getPositionByName("path")) , data_idx(block.getPositionByName("data")) , is_ephemeral_idx(block.getPositionByName("is_ephemeral")) , is_sequential_idx(block.getPositionByName("is_sequential")) - , response_event_time_idx(block.getPositionByName("response_event_time")) // + , response_event_time_idx(block.getPositionByName("response_event_time")) , error_idx(block.getPositionByName("error")) , requests_size_idx(block.getPositionByName("requests_size")) , version_idx(block.getPositionByName("version")) @@ -519,6 +526,7 @@ struct RequestFromLog { Coordination::ZooKeeperRequestPtr request; std::optional expected_result; + std::vector> subrequest_expected_results; int64_t session_id = 0; size_t executor_id = 0; bool has_watch = false; @@ -586,7 +594,6 @@ struct ZooKeeperRequestFromLogReader idx_in_block = 0; } - request_from_log.expected_result = current_block->getError(idx_in_block); request_from_log.session_id = current_block->getSessionId(idx_in_block); request_from_log.has_watch = current_block->hasWatch(idx_in_block); @@ -693,6 +700,12 @@ struct ZooKeeperRequestFromLogReader if (!subrequest_from_log) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to fetch subrequest for {}, subrequest index {}", op_num, i); + if (!subrequest_from_log->expected_result && request_from_log.expected_result + && request_from_log.expected_result == Coordination::Error::ZOK) + { + subrequest_from_log->expected_result = Coordination::Error::ZOK; + } + requests.push_back(std::move(subrequest_from_log->request)); if (subrequest_from_log->session_id != request_from_log.session_id) @@ -700,6 +713,8 @@ struct ZooKeeperRequestFromLogReader if (subrequest_from_log->executor_id != request_from_log.executor_id) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Executor id mismatch for subrequest in {}, subrequest index {}", op_num, i); + + request_from_log.subrequest_expected_results.push_back(subrequest_from_log->expected_result); } request_from_log.request = std::make_shared(requests, default_acls); @@ -731,7 +746,6 @@ private: namespace { - struct RequestFromLogStats { struct Stats @@ -744,6 +758,192 @@ struct RequestFromLogStats Stats read_requests; }; +struct SetupNodeCollector +{ + explicit SetupNodeCollector(const std::string & setup_nodes_snapshot_path) + { + if (setup_nodes_snapshot_path.empty()) + return; + + keeper_context = std::make_shared(true, std::make_shared()); + keeper_context->setDigestEnabled(true); + keeper_context->setSnapshotDisk( + std::make_shared("Keeper-snapshots", setup_nodes_snapshot_path)); + + snapshot_manager.emplace(1, keeper_context); + auto snapshot_result = snapshot_manager->restoreFromLatestSnapshot(); + if (snapshot_result.storage == nullptr) + { + std::cerr << "No initial snapshot found" << std::endl; + initial_storage = std::make_unique( + /* tick_time_ms */ 500, /* superdigest */ "", keeper_context, /* initialize_system_nodes */ false); + initial_storage->initializeSystemNodes(); + } + else + { + std::cerr << "Loaded initial nodes from snapshot" << std::endl; + initial_storage = std::move(snapshot_result.storage); + } + } + + void processRequest(const RequestFromLog & request_from_log) + { + if (!request_from_log.expected_result.has_value()) + return; + + auto process_request = [&](const Coordination::ZooKeeperRequest & request, const auto expected_result) + { + const auto & path = request.getPath(); + if (processed_paths.contains(path)) + return; + + auto op_num = request.getOpNum(); + + if (op_num == Coordination::OpNum::Create) + { + if (expected_result == Coordination::Error::ZNODEEXISTS) + { + addExpectedNode(path); + processed_paths.insert(path); + } + else if (expected_result == Coordination::Error::ZOK) + { + /// we need to make sure ancestors exist + auto position = path.find_last_of('/'); + if (position != 0) + { + auto parent_path = path.substr(0, position); + if (!processed_paths.contains(parent_path)) + { + addExpectedNode(parent_path); + processed_paths.insert(parent_path); + } + } + + processed_paths.insert(path); + } + } + else if (op_num == Coordination::OpNum::Remove) + { + if (expected_result == Coordination::Error::ZOK) + { + addExpectedNode(path); + processed_paths.insert(path); + } + } + else if (op_num == Coordination::OpNum::Set) + { + if (expected_result == Coordination::Error::ZOK) + { + addExpectedNode(path); + processed_paths.insert(path); + } + } + else if (op_num == Coordination::OpNum::Check) + { + if (expected_result == Coordination::Error::ZOK) + { + addExpectedNode(path); + processed_paths.insert(path); + } + } + else if (op_num == Coordination::OpNum::CheckNotExists) + { + if (expected_result == Coordination::Error::ZNODEEXISTS) + { + addExpectedNode(path); + processed_paths.insert(path); + } + } + else if (request.isReadRequest()) + { + if (expected_result == Coordination::Error::ZOK) + { + addExpectedNode(path); + processed_paths.insert(path); + } + } + }; + + const auto & request = request_from_log.request; + if (request->getOpNum() == Coordination::OpNum::Multi || request->getOpNum() == Coordination::OpNum::MultiRead) + { + const auto & multi_request = dynamic_cast(*request); + const auto & subrequests = multi_request.requests; + + for (size_t i = 0; i < subrequests.size(); ++i) + { + const auto & zookeeper_request = dynamic_cast(*subrequests[i]); + const auto subrequest_expected_result = request_from_log.subrequest_expected_results[i]; + if (subrequest_expected_result.has_value()) + process_request(zookeeper_request, *subrequest_expected_result); + + } + } + else + process_request(*request, *request_from_log.expected_result); + } + + void addExpectedNode(const std::string & path) + { + std::lock_guard lock(nodes_mutex); + + if (initial_storage->container.contains(path)) + return; + + std::cerr << "Adding expected node " << path << std::endl; + + Coordination::Requests create_ops; + + size_t pos = 1; + while (true) + { + pos = path.find('/', pos); + if (pos == std::string::npos) + break; + + auto request = zkutil::makeCreateRequest(path.substr(0, pos), "", zkutil::CreateMode::Persistent, true); + create_ops.emplace_back(request); + ++pos; + } + + auto request = zkutil::makeCreateRequest(path, "", zkutil::CreateMode::Persistent, true); + create_ops.emplace_back(request); + + auto next_zxid = initial_storage->getNextZXID(); + + static Coordination::ACLs default_acls = [] + { + Coordination::ACL acl; + acl.permissions = Coordination::ACL::All; + acl.scheme = "world"; + acl.id = "anyone"; + return Coordination::ACLs{std::move(acl)}; + }(); + + auto multi_create_request = std::make_shared(create_ops, default_acls); + initial_storage->preprocessRequest(multi_create_request, 1, 0, next_zxid, /* check_acl = */ false); + auto responses = initial_storage->processRequest(multi_create_request, 1, next_zxid, /* check_acl = */ false); + if (responses.size() > 1 || responses[0].response->error != Coordination::Error::ZOK) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Invalid response after trying to create a node {}", responses[0].response->error); + } + + void generateSnapshot() + { + std::cerr << "Generating snapshot with starting data" << std::endl; + std::lock_guard lock(nodes_mutex); + DB::SnapshotMetadataPtr snapshot_meta = std::make_shared(initial_storage->getZXID(), 1, std::make_shared()); + DB::KeeperStorageSnapshot snapshot(initial_storage.get(), snapshot_meta); + snapshot_manager->serializeSnapshotToDisk(snapshot); + } + + std::mutex nodes_mutex; + DB::KeeperContextPtr keeper_context; + Coordination::KeeperStoragePtr initial_storage; + std::unordered_set processed_paths; + std::optional snapshot_manager; +}; + void dumpStats(std::string_view type, const RequestFromLogStats::Stats & stats_for_type) { std::cerr << fmt::format( @@ -751,7 +951,7 @@ void dumpStats(std::string_view type, const RequestFromLogStats::Stats & stats_f type, stats_for_type.total, stats_for_type.unexpected_results, - static_cast(stats_for_type.unexpected_results) / stats_for_type.total * 100) + stats_for_type.total != 0 ? static_cast(stats_for_type.unexpected_results) / stats_for_type.total * 100 : 0.0) << std::endl; }; @@ -763,24 +963,40 @@ void requestFromLogExecutor(std::shared_ptr>(); last_request = request_promise->get_future(); - Coordination::ResponseCallback callback - = [&, request_promise, request = request_from_log.request, expected_result = request_from_log.expected_result]( - const Coordination::Response & response) mutable + Coordination::ResponseCallback callback = [&, + request_promise, + request = request_from_log.request, + expected_result = request_from_log.expected_result, + subrequest_expected_results = std::move(request_from_log.subrequest_expected_results)]( + const Coordination::Response & response) mutable { auto & stats = request->isReadRequest() ? request_stats.read_requests : request_stats.write_requests; stats.total.fetch_add(1, std::memory_order_relaxed); - if (*expected_result != response.error) - stats.unexpected_results.fetch_add(1, std::memory_order_relaxed); + if (expected_result) + { + if (*expected_result != response.error) + stats.unexpected_results.fetch_add(1, std::memory_order_relaxed); - //if (!expected_result) - // return; + if (*expected_result != response.error) + { + std::cerr << fmt::format( + "Unexpected result for {}\ngot {}, expected {}\n", request->toString(), response.error, *expected_result) + << std::endl; - //if (*expected_result != response.error) - // std::cerr << fmt::format( - // "Unexpected result for {}, got {}, expected {}", request->getOpNum(), response.error, *expected_result) - // << std::endl; + if (const auto * multi_response = dynamic_cast(&response)) + { + std::string subresponses; + for (size_t i = 0; i < multi_response->responses.size(); ++i) + { + subresponses += fmt::format("{} = {}\n", i, multi_response->responses[i]->error); + } + + std::cerr << "Subresponses\n" << subresponses << std::endl; + } + } + } request_promise->set_value(); }; @@ -827,6 +1043,9 @@ void Runner::runBenchmarkFromLog() RequestFromLogStats stats; + std::optional setup_nodes_collector; + if (!setup_nodes_snapshot_path.empty()) + setup_nodes_collector.emplace(setup_nodes_snapshot_path); std::unordered_map>> executor_id_to_queue; @@ -850,7 +1069,7 @@ void Runner::runBenchmarkFromLog() return; } - auto executor_queue = std::make_shared>(std::numeric_limits().max()); + auto executor_queue = std::make_shared>(std::numeric_limits::max()); executor_id_to_queue.emplace(request.executor_id, executor_queue); auto scheduled = pool->trySchedule([&, executor_queue]() mutable { @@ -865,6 +1084,7 @@ void Runner::runBenchmarkFromLog() throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Failed to push to the executor's queue"); }; + if (!setup_nodes_collector) { auto setup_connection = getConnection(connection_infos[0], 0); benchmark_context.startup(*setup_connection); @@ -875,14 +1095,26 @@ void Runner::runBenchmarkFromLog() delay_watch.restart(); while (auto request_from_log = request_reader.getNextRequest()) { - request_from_log->connection = get_zookeeper_connection(request_from_log->session_id); - push_request(std::move(*request_from_log)); + if (setup_nodes_collector) + { + setup_nodes_collector->processRequest(*request_from_log); + } + else + { + request_from_log->connection = get_zookeeper_connection(request_from_log->session_id); + push_request(std::move(*request_from_log)); + } if (delay > 0 && delay_watch.elapsedSeconds() > delay) { - dumpStats("Write", stats.write_requests); - dumpStats("Read", stats.read_requests); - std::cerr << std::endl; + if (setup_nodes_collector) + setup_nodes_collector->generateSnapshot(); + else + { + dumpStats("Write", stats.write_requests); + dumpStats("Read", stats.read_requests); + std::cerr << std::endl; + } delay_watch.restart(); } } @@ -906,7 +1138,7 @@ void Runner::runBenchmarkWithGenerator() for (size_t i = 0; i < concurrency; ++i) { auto thread_connections = connections; - pool->scheduleOrThrowOnError([this, connections_ = std::move(thread_connections)]() mutable { thread(connections_); }); + pool->scheduleOrThrowOnError([this, my_connections = std::move(thread_connections)]() mutable { thread(my_connections); }); } } catch (...) diff --git a/utils/keeper-bench/Runner.h b/utils/keeper-bench/Runner.h index 0c646eb2166..c19a4d82898 100644 --- a/utils/keeper-bench/Runner.h +++ b/utils/keeper-bench/Runner.h @@ -27,6 +27,7 @@ public: void startup(Coordination::ZooKeeper & zookeeper); void cleanup(Coordination::ZooKeeper & zookeeper); + private: struct Node { @@ -54,6 +55,7 @@ public: std::optional concurrency_, const std::string & config_path, const std::string & input_request_log_, + const std::string & setup_nodes_snapshot_path_, const Strings & hosts_strings_, std::optional max_time_, std::optional delay_, @@ -96,6 +98,7 @@ private: std::shared_ptr getConnection(const ConnectionInfo & connection_info, size_t connection_info_idx); std::string input_request_log; + std::string setup_nodes_snapshot_path; size_t concurrency = 1; diff --git a/utils/keeper-bench/main.cpp b/utils/keeper-bench/main.cpp index 45fc28f3bca..0b963abf406 100644 --- a/utils/keeper-bench/main.cpp +++ b/utils/keeper-bench/main.cpp @@ -38,6 +38,7 @@ int main(int argc, char *argv[]) ("help", "produce help message") ("config", value()->default_value(""), "yaml/xml file containing configuration") ("input-request-log", value()->default_value(""), "log of requests that will be replayed") + ("setup-nodes-snapshot-path", value()->default_value(""), "directory containing snapshots with starting state") ("concurrency,c", value(), "number of parallel queries") ("report-delay,d", value(), "delay between intermediate reports in seconds (set 0 to disable reports)") ("iterations,i", value(), "amount of queries to be executed") @@ -60,6 +61,7 @@ int main(int argc, char *argv[]) Runner runner(valueToOptional(options["concurrency"]), options["config"].as(), options["input-request-log"].as(), + options["setup-nodes-snapshot-path"].as(), options["hosts"].as(), valueToOptional(options["time-limit"]), valueToOptional(options["report-delay"]),