Setup node generator initial

This commit is contained in:
Antonio Andelic 2024-05-10 14:15:01 +02:00
parent f081d7c41d
commit f1f668e7df
3 changed files with 265 additions and 28 deletions

View File

@ -1,17 +1,22 @@
#include "Runner.h"
#include <atomic>
#include <condition_variable>
#include <Poco/Util/AbstractConfiguration.h>
#include <Coordination/CoordinationSettings.h>
#include <Coordination/KeeperContext.h>
#include <Coordination/KeeperStorage.h>
#include "Common/ConcurrentBoundedQueue.h"
#include "Common/Exception.h"
#include "Common/ZooKeeper/IKeeper.h"
#include "Common/ZooKeeper/ZooKeeperArgs.h"
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#include <Common/EventNotifier.h>
#include <Common/Config/ConfigProcessor.h>
#include "Coordination/KeeperSnapshotManager.h"
#include "Core/ColumnWithTypeAndName.h"
#include "Core/ColumnsWithTypeAndName.h"
#include <Disks/DiskLocal.h>
#include "IO/ReadBuffer.h"
#include "IO/ReadBufferFromFile.h"
#include "base/Decimal.h"
@ -43,12 +48,14 @@ Runner::Runner(
std::optional<size_t> concurrency_,
const std::string & config_path,
const std::string & input_request_log_,
const std::string & setup_nodes_snapshot_path_,
const Strings & hosts_strings_,
std::optional<double> max_time_,
std::optional<double> delay_,
std::optional<bool> continue_on_error_,
std::optional<size_t> max_iterations_)
: input_request_log(input_request_log_)
, setup_nodes_snapshot_path(setup_nodes_snapshot_path_)
, info(std::make_shared<Stats>())
{
@ -381,18 +388,18 @@ struct ZooKeeperRequestBlock
{
explicit ZooKeeperRequestBlock(DB::Block block_)
: block(std::move(block_))
, hostname_idx(block.getPositionByName("hostname")) //
, request_event_time_idx(block.getPositionByName("request_event_time")) //
, thread_id_idx(block.getPositionByName("thread_id")) //
, session_id_idx(block.getPositionByName("session_id")) //
, xid_idx(block.getPositionByName("xid")) //
, hostname_idx(block.getPositionByName("hostname"))
, request_event_time_idx(block.getPositionByName("request_event_time"))
, thread_id_idx(block.getPositionByName("thread_id"))
, session_id_idx(block.getPositionByName("session_id"))
, xid_idx(block.getPositionByName("xid"))
, has_watch_idx(block.getPositionByName("has_watch"))
, op_num_idx(block.getPositionByName("op_num"))
, path_idx(block.getPositionByName("path"))
, data_idx(block.getPositionByName("data"))
, is_ephemeral_idx(block.getPositionByName("is_ephemeral"))
, is_sequential_idx(block.getPositionByName("is_sequential"))
, response_event_time_idx(block.getPositionByName("response_event_time")) //
, response_event_time_idx(block.getPositionByName("response_event_time"))
, error_idx(block.getPositionByName("error"))
, requests_size_idx(block.getPositionByName("requests_size"))
, version_idx(block.getPositionByName("version"))
@ -519,6 +526,7 @@ struct RequestFromLog
{
Coordination::ZooKeeperRequestPtr request;
std::optional<Coordination::Error> expected_result;
std::vector<std::optional<Coordination::Error>> subrequest_expected_results;
int64_t session_id = 0;
size_t executor_id = 0;
bool has_watch = false;
@ -586,7 +594,6 @@ struct ZooKeeperRequestFromLogReader
idx_in_block = 0;
}
request_from_log.expected_result = current_block->getError(idx_in_block);
request_from_log.session_id = current_block->getSessionId(idx_in_block);
request_from_log.has_watch = current_block->hasWatch(idx_in_block);
@ -693,6 +700,12 @@ struct ZooKeeperRequestFromLogReader
if (!subrequest_from_log)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to fetch subrequest for {}, subrequest index {}", op_num, i);
if (!subrequest_from_log->expected_result && request_from_log.expected_result
&& request_from_log.expected_result == Coordination::Error::ZOK)
{
subrequest_from_log->expected_result = Coordination::Error::ZOK;
}
requests.push_back(std::move(subrequest_from_log->request));
if (subrequest_from_log->session_id != request_from_log.session_id)
@ -700,6 +713,8 @@ struct ZooKeeperRequestFromLogReader
if (subrequest_from_log->executor_id != request_from_log.executor_id)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Executor id mismatch for subrequest in {}, subrequest index {}", op_num, i);
request_from_log.subrequest_expected_results.push_back(subrequest_from_log->expected_result);
}
request_from_log.request = std::make_shared<Coordination::ZooKeeperMultiRequest>(requests, default_acls);
@ -731,7 +746,6 @@ private:
namespace
{
struct RequestFromLogStats
{
struct Stats
@ -744,6 +758,192 @@ struct RequestFromLogStats
Stats read_requests;
};
struct SetupNodeCollector
{
explicit SetupNodeCollector(const std::string & setup_nodes_snapshot_path)
{
if (setup_nodes_snapshot_path.empty())
return;
keeper_context = std::make_shared<DB::KeeperContext>(true, std::make_shared<Coordination::CoordinationSettings>());
keeper_context->setDigestEnabled(true);
keeper_context->setSnapshotDisk(
std::make_shared<DB::DiskLocal>("Keeper-snapshots", setup_nodes_snapshot_path));
snapshot_manager.emplace(1, keeper_context);
auto snapshot_result = snapshot_manager->restoreFromLatestSnapshot();
if (snapshot_result.storage == nullptr)
{
std::cerr << "No initial snapshot found" << std::endl;
initial_storage = std::make_unique<Coordination::KeeperStorage>(
/* tick_time_ms */ 500, /* superdigest */ "", keeper_context, /* initialize_system_nodes */ false);
initial_storage->initializeSystemNodes();
}
else
{
std::cerr << "Loaded initial nodes from snapshot" << std::endl;
initial_storage = std::move(snapshot_result.storage);
}
}
void processRequest(const RequestFromLog & request_from_log)
{
if (!request_from_log.expected_result.has_value())
return;
auto process_request = [&](const Coordination::ZooKeeperRequest & request, const auto expected_result)
{
const auto & path = request.getPath();
if (processed_paths.contains(path))
return;
auto op_num = request.getOpNum();
if (op_num == Coordination::OpNum::Create)
{
if (expected_result == Coordination::Error::ZNODEEXISTS)
{
addExpectedNode(path);
processed_paths.insert(path);
}
else if (expected_result == Coordination::Error::ZOK)
{
/// we need to make sure ancestors exist
auto position = path.find_last_of('/');
if (position != 0)
{
auto parent_path = path.substr(0, position);
if (!processed_paths.contains(parent_path))
{
addExpectedNode(parent_path);
processed_paths.insert(parent_path);
}
}
processed_paths.insert(path);
}
}
else if (op_num == Coordination::OpNum::Remove)
{
if (expected_result == Coordination::Error::ZOK)
{
addExpectedNode(path);
processed_paths.insert(path);
}
}
else if (op_num == Coordination::OpNum::Set)
{
if (expected_result == Coordination::Error::ZOK)
{
addExpectedNode(path);
processed_paths.insert(path);
}
}
else if (op_num == Coordination::OpNum::Check)
{
if (expected_result == Coordination::Error::ZOK)
{
addExpectedNode(path);
processed_paths.insert(path);
}
}
else if (op_num == Coordination::OpNum::CheckNotExists)
{
if (expected_result == Coordination::Error::ZNODEEXISTS)
{
addExpectedNode(path);
processed_paths.insert(path);
}
}
else if (request.isReadRequest())
{
if (expected_result == Coordination::Error::ZOK)
{
addExpectedNode(path);
processed_paths.insert(path);
}
}
};
const auto & request = request_from_log.request;
if (request->getOpNum() == Coordination::OpNum::Multi || request->getOpNum() == Coordination::OpNum::MultiRead)
{
const auto & multi_request = dynamic_cast<const Coordination::ZooKeeperMultiRequest &>(*request);
const auto & subrequests = multi_request.requests;
for (size_t i = 0; i < subrequests.size(); ++i)
{
const auto & zookeeper_request = dynamic_cast<const Coordination::ZooKeeperRequest &>(*subrequests[i]);
const auto subrequest_expected_result = request_from_log.subrequest_expected_results[i];
if (subrequest_expected_result.has_value())
process_request(zookeeper_request, *subrequest_expected_result);
}
}
else
process_request(*request, *request_from_log.expected_result);
}
void addExpectedNode(const std::string & path)
{
std::lock_guard lock(nodes_mutex);
if (initial_storage->container.contains(path))
return;
std::cerr << "Adding expected node " << path << std::endl;
Coordination::Requests create_ops;
size_t pos = 1;
while (true)
{
pos = path.find('/', pos);
if (pos == std::string::npos)
break;
auto request = zkutil::makeCreateRequest(path.substr(0, pos), "", zkutil::CreateMode::Persistent, true);
create_ops.emplace_back(request);
++pos;
}
auto request = zkutil::makeCreateRequest(path, "", zkutil::CreateMode::Persistent, true);
create_ops.emplace_back(request);
auto next_zxid = initial_storage->getNextZXID();
static Coordination::ACLs default_acls = []
{
Coordination::ACL acl;
acl.permissions = Coordination::ACL::All;
acl.scheme = "world";
acl.id = "anyone";
return Coordination::ACLs{std::move(acl)};
}();
auto multi_create_request = std::make_shared<Coordination::ZooKeeperMultiRequest>(create_ops, default_acls);
initial_storage->preprocessRequest(multi_create_request, 1, 0, next_zxid, /* check_acl = */ false);
auto responses = initial_storage->processRequest(multi_create_request, 1, next_zxid, /* check_acl = */ false);
if (responses.size() > 1 || responses[0].response->error != Coordination::Error::ZOK)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Invalid response after trying to create a node {}", responses[0].response->error);
}
void generateSnapshot()
{
std::cerr << "Generating snapshot with starting data" << std::endl;
std::lock_guard lock(nodes_mutex);
DB::SnapshotMetadataPtr snapshot_meta = std::make_shared<DB::SnapshotMetadata>(initial_storage->getZXID(), 1, std::make_shared<nuraft::cluster_config>());
DB::KeeperStorageSnapshot snapshot(initial_storage.get(), snapshot_meta);
snapshot_manager->serializeSnapshotToDisk(snapshot);
}
std::mutex nodes_mutex;
DB::KeeperContextPtr keeper_context;
Coordination::KeeperStoragePtr initial_storage;
std::unordered_set<std::string> processed_paths;
std::optional<Coordination::KeeperSnapshotManager> snapshot_manager;
};
void dumpStats(std::string_view type, const RequestFromLogStats::Stats & stats_for_type)
{
std::cerr << fmt::format(
@ -751,7 +951,7 @@ void dumpStats(std::string_view type, const RequestFromLogStats::Stats & stats_f
type,
stats_for_type.total,
stats_for_type.unexpected_results,
static_cast<double>(stats_for_type.unexpected_results) / stats_for_type.total * 100)
stats_for_type.total != 0 ? static_cast<double>(stats_for_type.unexpected_results) / stats_for_type.total * 100 : 0.0)
<< std::endl;
};
@ -763,24 +963,40 @@ void requestFromLogExecutor(std::shared_ptr<ConcurrentBoundedQueue<RequestFromLo
{
auto request_promise = std::make_shared<std::promise<void>>();
last_request = request_promise->get_future();
Coordination::ResponseCallback callback
= [&, request_promise, request = request_from_log.request, expected_result = request_from_log.expected_result](
const Coordination::Response & response) mutable
Coordination::ResponseCallback callback = [&,
request_promise,
request = request_from_log.request,
expected_result = request_from_log.expected_result,
subrequest_expected_results = std::move(request_from_log.subrequest_expected_results)](
const Coordination::Response & response) mutable
{
auto & stats = request->isReadRequest() ? request_stats.read_requests : request_stats.write_requests;
stats.total.fetch_add(1, std::memory_order_relaxed);
if (*expected_result != response.error)
stats.unexpected_results.fetch_add(1, std::memory_order_relaxed);
if (expected_result)
{
if (*expected_result != response.error)
stats.unexpected_results.fetch_add(1, std::memory_order_relaxed);
//if (!expected_result)
// return;
if (*expected_result != response.error)
{
std::cerr << fmt::format(
"Unexpected result for {}\ngot {}, expected {}\n", request->toString(), response.error, *expected_result)
<< std::endl;
//if (*expected_result != response.error)
// std::cerr << fmt::format(
// "Unexpected result for {}, got {}, expected {}", request->getOpNum(), response.error, *expected_result)
// << std::endl;
if (const auto * multi_response = dynamic_cast<const Coordination::ZooKeeperMultiResponse *>(&response))
{
std::string subresponses;
for (size_t i = 0; i < multi_response->responses.size(); ++i)
{
subresponses += fmt::format("{} = {}\n", i, multi_response->responses[i]->error);
}
std::cerr << "Subresponses\n" << subresponses << std::endl;
}
}
}
request_promise->set_value();
};
@ -827,6 +1043,9 @@ void Runner::runBenchmarkFromLog()
RequestFromLogStats stats;
std::optional<SetupNodeCollector> setup_nodes_collector;
if (!setup_nodes_snapshot_path.empty())
setup_nodes_collector.emplace(setup_nodes_snapshot_path);
std::unordered_map<uint64_t, std::shared_ptr<ConcurrentBoundedQueue<RequestFromLog>>> executor_id_to_queue;
@ -850,7 +1069,7 @@ void Runner::runBenchmarkFromLog()
return;
}
auto executor_queue = std::make_shared<ConcurrentBoundedQueue<RequestFromLog>>(std::numeric_limits<uint64_t>().max());
auto executor_queue = std::make_shared<ConcurrentBoundedQueue<RequestFromLog>>(std::numeric_limits<uint64_t>::max());
executor_id_to_queue.emplace(request.executor_id, executor_queue);
auto scheduled = pool->trySchedule([&, executor_queue]() mutable
{
@ -865,6 +1084,7 @@ void Runner::runBenchmarkFromLog()
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Failed to push to the executor's queue");
};
if (!setup_nodes_collector)
{
auto setup_connection = getConnection(connection_infos[0], 0);
benchmark_context.startup(*setup_connection);
@ -875,14 +1095,26 @@ void Runner::runBenchmarkFromLog()
delay_watch.restart();
while (auto request_from_log = request_reader.getNextRequest())
{
request_from_log->connection = get_zookeeper_connection(request_from_log->session_id);
push_request(std::move(*request_from_log));
if (setup_nodes_collector)
{
setup_nodes_collector->processRequest(*request_from_log);
}
else
{
request_from_log->connection = get_zookeeper_connection(request_from_log->session_id);
push_request(std::move(*request_from_log));
}
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
{
dumpStats("Write", stats.write_requests);
dumpStats("Read", stats.read_requests);
std::cerr << std::endl;
if (setup_nodes_collector)
setup_nodes_collector->generateSnapshot();
else
{
dumpStats("Write", stats.write_requests);
dumpStats("Read", stats.read_requests);
std::cerr << std::endl;
}
delay_watch.restart();
}
}
@ -906,7 +1138,7 @@ void Runner::runBenchmarkWithGenerator()
for (size_t i = 0; i < concurrency; ++i)
{
auto thread_connections = connections;
pool->scheduleOrThrowOnError([this, connections_ = std::move(thread_connections)]() mutable { thread(connections_); });
pool->scheduleOrThrowOnError([this, my_connections = std::move(thread_connections)]() mutable { thread(my_connections); });
}
}
catch (...)

View File

@ -27,6 +27,7 @@ public:
void startup(Coordination::ZooKeeper & zookeeper);
void cleanup(Coordination::ZooKeeper & zookeeper);
private:
struct Node
{
@ -54,6 +55,7 @@ public:
std::optional<size_t> concurrency_,
const std::string & config_path,
const std::string & input_request_log_,
const std::string & setup_nodes_snapshot_path_,
const Strings & hosts_strings_,
std::optional<double> max_time_,
std::optional<double> delay_,
@ -96,6 +98,7 @@ private:
std::shared_ptr<Coordination::ZooKeeper> getConnection(const ConnectionInfo & connection_info, size_t connection_info_idx);
std::string input_request_log;
std::string setup_nodes_snapshot_path;
size_t concurrency = 1;

View File

@ -38,6 +38,7 @@ int main(int argc, char *argv[])
("help", "produce help message")
("config", value<std::string>()->default_value(""), "yaml/xml file containing configuration")
("input-request-log", value<std::string>()->default_value(""), "log of requests that will be replayed")
("setup-nodes-snapshot-path", value<std::string>()->default_value(""), "directory containing snapshots with starting state")
("concurrency,c", value<unsigned>(), "number of parallel queries")
("report-delay,d", value<double>(), "delay between intermediate reports in seconds (set 0 to disable reports)")
("iterations,i", value<size_t>(), "amount of queries to be executed")
@ -60,6 +61,7 @@ int main(int argc, char *argv[])
Runner runner(valueToOptional<unsigned>(options["concurrency"]),
options["config"].as<std::string>(),
options["input-request-log"].as<std::string>(),
options["setup-nodes-snapshot-path"].as<std::string>(),
options["hosts"].as<Strings>(),
valueToOptional<double>(options["time-limit"]),
valueToOptional<double>(options["report-delay"]),