Replay ZK logs using keeper-bench

This commit is contained in:
Antonio Andelic 2024-04-10 12:56:29 +02:00
parent b3dd5f519d
commit 14c461338b
8 changed files with 875 additions and 269 deletions

View File

@ -1259,11 +1259,13 @@ void ZooKeeper::initFeatureFlags()
void ZooKeeper::executeGenericRequest(
const ZooKeeperRequestPtr & request,
ResponseCallback callback)
ResponseCallback callback,
WatchCallbackPtr watch)
{
RequestInfo request_info;
request_info.request = request;
request_info.callback = callback;
request_info.watch = watch;
pushRequest(std::move(request_info));
}

View File

@ -139,7 +139,8 @@ public:
void executeGenericRequest(
const ZooKeeperRequestPtr & request,
ResponseCallback callback);
ResponseCallback callback,
WatchCallbackPtr watch = nullptr);
/// See the documentation about semantics of these methods in IKeeper class.

View File

@ -4,5 +4,4 @@ if (NOT TARGET ch_contrib::rapidjson)
endif ()
clickhouse_add_executable(keeper-bench Generator.cpp Runner.cpp Stats.cpp main.cpp)
target_link_libraries(keeper-bench PRIVATE dbms)
target_link_libraries(keeper-bench PRIVATE ch_contrib::rapidjson)
target_link_libraries(keeper-bench PRIVATE dbms clickhouse_functions ch_contrib::rapidjson)

View File

@ -40,54 +40,6 @@ std::string generateRandomString(size_t length)
}
}
void removeRecursive(Coordination::ZooKeeper & zookeeper, const std::string & path)
{
namespace fs = std::filesystem;
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
Strings children;
auto list_callback = [promise, &children] (const ListResponse & response)
{
children = response.names;
promise->set_value();
};
zookeeper.list(path, ListRequestType::ALL, list_callback, nullptr);
future.get();
while (!children.empty())
{
Coordination::Requests ops;
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
{
removeRecursive(zookeeper, fs::path(path) / children.back());
ops.emplace_back(makeRemoveRequest(fs::path(path) / children.back(), -1));
children.pop_back();
}
auto multi_promise = std::make_shared<std::promise<void>>();
auto multi_future = multi_promise->get_future();
auto multi_callback = [multi_promise] (const MultiResponse &)
{
multi_promise->set_value();
};
zookeeper.multi(ops, multi_callback);
multi_future.get();
}
auto remove_promise = std::make_shared<std::promise<void>>();
auto remove_future = remove_promise->get_future();
auto remove_callback = [remove_promise] (const RemoveResponse &)
{
remove_promise->set_value();
};
zookeeper.remove(path, -1, remove_callback);
remove_future.get();
}
NumberGetter
NumberGetter::fromConfig(const std::string & key, const Poco::Util::AbstractConfiguration & config, std::optional<uint64_t> default_value)
{
@ -603,148 +555,16 @@ Generator::Generator(const Poco::Util::AbstractConfiguration & config)
acl.id = "anyone";
default_acls.emplace_back(std::move(acl));
static const std::string generator_key = "generator";
std::cerr << "---- Parsing setup ---- " << std::endl;
static const std::string setup_key = generator_key + ".setup";
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(setup_key, keys);
for (const auto & key : keys)
{
if (key.starts_with("node"))
{
auto node_key = setup_key + "." + key;
auto parsed_root_node = parseNode(node_key, config);
const auto node = root_nodes.emplace_back(parsed_root_node);
if (config.has(node_key + ".repeat"))
{
if (!node->name.isRandom())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key);
auto repeat_count = config.getUInt64(node_key + ".repeat");
node->repeat_count = repeat_count;
for (size_t i = 1; i < repeat_count; ++i)
root_nodes.emplace_back(node->clone());
}
std::cerr << "Tree to create:" << std::endl;
node->dumpTree();
std::cerr << std::endl;
}
}
std::cerr << "---- Done parsing data setup ----\n" << std::endl;
std::cerr << "---- Collecting request generators ----" << std::endl;
static const std::string requests_key = generator_key + ".requests";
static const std::string requests_key = "generator.requests";
request_getter = RequestGetter::fromConfig(requests_key, config);
std::cerr << request_getter.description() << std::endl;
std::cerr << "---- Done collecting request generators ----\n" << std::endl;
}
std::shared_ptr<Generator::Node> Generator::parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config)
{
auto node = std::make_shared<Generator::Node>();
node->name = StringGetter::fromConfig(key + ".name", config);
if (config.has(key + ".data"))
node->data = StringGetter::fromConfig(key + ".data", config);
Poco::Util::AbstractConfiguration::Keys node_keys;
config.keys(key, node_keys);
for (const auto & node_key : node_keys)
{
if (!node_key.starts_with("node"))
continue;
const auto node_key_string = key + "." + node_key;
auto child_node = parseNode(node_key_string, config);
node->children.push_back(child_node);
if (config.has(node_key_string + ".repeat"))
{
if (!child_node->name.isRandom())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key_string);
auto repeat_count = config.getUInt64(node_key_string + ".repeat");
child_node->repeat_count = repeat_count;
for (size_t i = 1; i < repeat_count; ++i)
node->children.push_back(child_node);
}
}
return node;
}
void Generator::Node::dumpTree(int level) const
{
std::string data_string
= data.has_value() ? fmt::format("{}", data->description()) : "no data";
std::string repeat_count_string = repeat_count != 0 ? fmt::format(", repeated {} times", repeat_count) : "";
std::cerr << fmt::format("{}name: {}, data: {}{}", std::string(level, '\t'), name.description(), data_string, repeat_count_string) << std::endl;
for (auto it = children.begin(); it != children.end();)
{
const auto & child = *it;
child->dumpTree(level + 1);
std::advance(it, child->repeat_count != 0 ? child->repeat_count : 1);
}
}
std::shared_ptr<Generator::Node> Generator::Node::clone() const
{
auto new_node = std::make_shared<Node>();
new_node->name = name;
new_node->data = data;
new_node->repeat_count = repeat_count;
// don't do deep copy of children because we will do clone only for root nodes
new_node->children = children;
return new_node;
}
void Generator::Node::createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const
{
auto path = std::filesystem::path(parent_path) / name.getString();
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto create_callback = [promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
zookeeper.create(path, data ? data->getString() : "", false, false, acls, create_callback);
future.get();
for (const auto & child : children)
child->createNode(zookeeper, path, acls);
}
void Generator::startup(Coordination::ZooKeeper & zookeeper)
{
std::cerr << "---- Creating test data ----" << std::endl;
for (const auto & node : root_nodes)
{
auto node_name = node->name.getString();
node->name.setString(node_name);
std::string root_path = std::filesystem::path("/") / node_name;
std::cerr << "Cleaning up " << root_path << std::endl;
removeRecursive(zookeeper, root_path);
node->createNode(zookeeper, "/", default_acls);
}
std::cerr << "---- Created test data ----\n" << std::endl;
std::cerr << "---- Initializing generators ----" << std::endl;
request_getter.startup(zookeeper);
}
@ -752,15 +572,3 @@ Coordination::ZooKeeperRequestPtr Generator::generate()
{
return request_getter.getRequestGenerator()->generate(default_acls);
}
void Generator::cleanup(Coordination::ZooKeeper & zookeeper)
{
std::cerr << "---- Cleaning up test data ----" << std::endl;
for (const auto & node : root_nodes)
{
auto node_name = node->name.getString();
std::string root_path = std::filesystem::path("/") / node_name;
std::cerr << "Cleaning up " << root_path << std::endl;
removeRecursive(zookeeper, root_path);
}
}

View File

@ -173,27 +173,9 @@ public:
void startup(Coordination::ZooKeeper & zookeeper);
Coordination::ZooKeeperRequestPtr generate();
void cleanup(Coordination::ZooKeeper & zookeeper);
private:
struct Node
{
StringGetter name;
std::optional<StringGetter> data;
std::vector<std::shared_ptr<Node>> children;
size_t repeat_count = 0;
std::shared_ptr<Node> clone() const;
void createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const;
void dumpTree(int level = 0) const;
};
static std::shared_ptr<Node> parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config);
std::uniform_int_distribution<size_t> request_picker;
std::vector<std::shared_ptr<Node>> root_nodes;
RequestGetter request_getter;
Coordination::ACLs default_acls;
};
std::optional<Generator> getGenerator(const std::string & name);

View File

@ -1,14 +1,28 @@
#include "Runner.h"
#include <atomic>
#include <condition_variable>
#include <Poco/Util/AbstractConfiguration.h>
#include "Common/ConcurrentBoundedQueue.h"
#include "Common/ZooKeeper/IKeeper.h"
#include "Common/ZooKeeper/ZooKeeperArgs.h"
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#include <Common/EventNotifier.h>
#include <Common/Config/ConfigProcessor.h>
#include "IO/ReadBufferFromString.h"
#include "Core/ColumnWithTypeAndName.h"
#include "Core/ColumnsWithTypeAndName.h"
#include "IO/ReadBuffer.h"
#include "IO/ReadBufferFromFile.h"
#include "base/Decimal.h"
#include "base/types.h"
#include <Processors/Formats/IInputFormat.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
#include <Formats/ReadSchemaUtils.h>
#include <Formats/registerFormats.h>
#include <Interpreters/Context.h>
namespace CurrentMetrics
@ -22,23 +36,41 @@ namespace DB::ErrorCodes
{
extern const int CANNOT_BLOCK_SIGNAL;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
Runner::Runner(
std::optional<size_t> concurrency_,
const std::string & config_path,
const std::string & input_request_log_,
const Strings & hosts_strings_,
std::optional<double> max_time_,
std::optional<double> delay_,
std::optional<bool> continue_on_error_,
std::optional<size_t> max_iterations_)
: info(std::make_shared<Stats>())
: input_request_log(input_request_log_)
, info(std::make_shared<Stats>())
{
DB::ConfigProcessor config_processor(config_path, true, false);
auto config = config_processor.loadConfig().configuration;
DB::ConfigurationPtr config = nullptr;
if (!config_path.empty())
{
config = config_processor.loadConfig().configuration;
if (config->has("generator"))
generator.emplace(*config);
}
else
{
if (input_request_log.empty())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Both --config and --input_request_log cannot be empty");
if (!std::filesystem::exists(input_request_log))
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "File on path {} does not exist", input_request_log);
}
generator.emplace(*config);
if (!hosts_strings_.empty())
{
@ -57,6 +89,8 @@ Runner::Runner(
static constexpr uint64_t DEFAULT_CONCURRENCY = 1;
if (concurrency_)
concurrency = *concurrency_;
else if (!config)
concurrency = DEFAULT_CONCURRENCY;
else
concurrency = config->getUInt64("concurrency", DEFAULT_CONCURRENCY);
std::cerr << "Concurrency: " << concurrency << std::endl;
@ -64,6 +98,8 @@ Runner::Runner(
static constexpr uint64_t DEFAULT_ITERATIONS = 0;
if (max_iterations_)
max_iterations = *max_iterations_;
else if (!config)
max_iterations = DEFAULT_ITERATIONS;
else
max_iterations = config->getUInt64("iterations", DEFAULT_ITERATIONS);
std::cerr << "Iterations: " << max_iterations << std::endl;
@ -71,6 +107,8 @@ Runner::Runner(
static constexpr double DEFAULT_DELAY = 1.0;
if (delay_)
delay = *delay_;
else if (!config)
delay = DEFAULT_DELAY;
else
delay = config->getDouble("report_delay", DEFAULT_DELAY);
std::cerr << "Report delay: " << delay << std::endl;
@ -78,44 +116,48 @@ Runner::Runner(
static constexpr double DEFAULT_TIME_LIMIT = 0.0;
if (max_time_)
max_time = *max_time_;
else if (!config)
max_time = DEFAULT_TIME_LIMIT;
else
max_time = config->getDouble("timelimit", DEFAULT_TIME_LIMIT);
std::cerr << "Time limit: " << max_time << std::endl;
if (continue_on_error_)
continue_on_error = *continue_on_error_;
else if (!config)
continue_on_error_ = false;
else
continue_on_error = config->getBool("continue_on_error", false);
std::cerr << "Continue on error: " << continue_on_error << std::endl;
static const std::string output_key = "output";
print_to_stdout = config->getBool(output_key + ".stdout", false);
std::cerr << "Printing output to stdout: " << print_to_stdout << std::endl;
static const std::string output_file_key = output_key + ".file";
if (config->has(output_file_key))
if (config)
{
if (config->has(output_file_key + ".path"))
{
file_output = config->getString(output_file_key + ".path");
output_file_with_timestamp = config->getBool(output_file_key + ".with_timestamp");
}
else
file_output = config->getString(output_file_key);
benchmark_context.initializeFromConfig(*config);
std::cerr << "Result file path: " << file_output->string() << std::endl;
static const std::string output_key = "output";
print_to_stdout = config->getBool(output_key + ".stdout", false);
std::cerr << "Printing output to stdout: " << print_to_stdout << std::endl;
static const std::string output_file_key = output_key + ".file";
if (config->has(output_file_key))
{
if (config->has(output_file_key + ".path"))
{
file_output = config->getString(output_file_key + ".path");
output_file_with_timestamp = config->getBool(output_file_key + ".with_timestamp");
}
else
file_output = config->getString(output_file_key);
std::cerr << "Result file path: " << file_output->string() << std::endl;
}
}
std::cerr << "---- Run options ----\n" << std::endl;
pool.emplace(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, concurrency);
queue.emplace(concurrency);
}
void Runner::parseHostsFromConfig(const Poco::Util::AbstractConfiguration & config)
{
ConnectionInfo default_connection_info;
const auto fill_connection_details = [&](const std::string & key, auto & connection_info)
{
if (config.has(key + ".secure"))
@ -328,9 +370,519 @@ bool Runner::tryPushRequestInteractively(Coordination::ZooKeeperRequestPtr && re
void Runner::runBenchmark()
{
if (generator)
runBenchmarkWithGenerator();
else
runBenchmarkFromLog();
}
struct ZooKeeperRequestBlock
{
explicit ZooKeeperRequestBlock(DB::Block block_)
: block(std::move(block_))
, hostname_idx(block.getPositionByName("hostname")) //
, request_event_time_idx(block.getPositionByName("request_event_time")) //
, thread_id_idx(block.getPositionByName("thread_id")) //
, session_id_idx(block.getPositionByName("session_id")) //
, xid_idx(block.getPositionByName("xid")) //
, has_watch_idx(block.getPositionByName("has_watch"))
, op_num_idx(block.getPositionByName("op_num"))
, path_idx(block.getPositionByName("path"))
, data_idx(block.getPositionByName("data"))
, is_ephemeral_idx(block.getPositionByName("is_ephemeral"))
, is_sequential_idx(block.getPositionByName("is_sequential"))
, response_event_time_idx(block.getPositionByName("response_event_time")) //
, error_idx(block.getPositionByName("error"))
, requests_size_idx(block.getPositionByName("requests_size"))
, version_idx(block.getPositionByName("version"))
{}
size_t rows() const
{
return block.rows();
}
UInt64 getExecutorId(size_t row) const
{
return getSessionId(row);
}
std::string getHostname(size_t row) const
{
return getField(hostname_idx, row).safeGet<std::string>();
}
UInt64 getThreadId(size_t row) const
{
return getField(thread_id_idx, row).safeGet<UInt64>();
}
DB::DateTime64 getRequestEventTime(size_t row) const
{
return getField(request_event_time_idx, row).safeGet<DB::DateTime64>();
}
DB::DateTime64 getResponseEventTime(size_t row) const
{
return getField(response_event_time_idx, row).safeGet<DB::DateTime64>();
}
Int64 getSessionId(size_t row) const
{
return getField(session_id_idx, row).safeGet<Int64>();
}
Int64 getXid(size_t row) const
{
return getField(xid_idx, row).safeGet<Int64>();
}
bool hasWatch(size_t row) const
{
return getField(has_watch_idx, row).safeGet<UInt8>();
}
Coordination::OpNum getOpNum(size_t row) const
{
return static_cast<Coordination::OpNum>(getField(op_num_idx, row).safeGet<Int64>());
}
bool isEphemeral(size_t row) const
{
return getField(is_ephemeral_idx, row).safeGet<UInt8>();
}
bool isSequential(size_t row) const
{
return getField(is_sequential_idx, row).safeGet<UInt8>();
}
std::string getPath(size_t row) const
{
return getField(path_idx, row).safeGet<std::string>();
}
std::string getData(size_t row) const
{
return getField(data_idx, row).safeGet<std::string>();
}
UInt64 getRequestsSize(size_t row) const
{
return getField(requests_size_idx, row).safeGet<UInt64>();
}
std::optional<Int32> getVersion(size_t row) const
{
auto field = getField(version_idx, row);
if (field.isNull())
return std::nullopt;
return static_cast<Int32>(field.safeGet<Int64>());
}
std::optional<Coordination::Error> getError(size_t row) const
{
auto field = getField(error_idx, row);
if (field.isNull())
return std::nullopt;
return static_cast<Coordination::Error>(field.safeGet<Int64>());
}
private:
DB::Field getField(size_t position, size_t row) const
{
DB::Field field;
block.getByPosition(position).column->get(row, field);
return field;
}
DB::Block block;
size_t hostname_idx = 0;
size_t request_event_time_idx = 0;
size_t thread_id_idx = 0;
size_t session_id_idx = 0;
size_t xid_idx = 0;
size_t has_watch_idx = 0;
size_t op_num_idx = 0;
size_t path_idx = 0;
size_t data_idx = 0;
size_t is_ephemeral_idx = 0;
size_t is_sequential_idx = 0;
size_t response_event_time_idx = 0;
size_t error_idx = 0;
size_t requests_size_idx = 0;
size_t version_idx = 0;
};
struct RequestFromLog
{
Coordination::ZooKeeperRequestPtr request;
std::optional<Coordination::Error> expected_result;
int64_t session_id = 0;
size_t executor_id = 0;
bool has_watch = false;
DB::DateTime64 request_event_time;
DB::DateTime64 response_event_time;
std::shared_ptr<Coordination::ZooKeeper> connection;
};
struct ZooKeeperRequestFromLogReader
{
ZooKeeperRequestFromLogReader(const std::string & input_request_log, DB::ContextPtr context)
{
std::optional<DB::FormatSettings> format_settings;
file_read_buf = std::make_unique<DB::ReadBufferFromFile>(input_request_log);
auto compression_method = DB::chooseCompressionMethod(input_request_log, "");
file_read_buf = DB::wrapReadBufferWithCompressionMethod(std::move(file_read_buf), compression_method);
DB::SingleReadBufferIterator read_buffer_iterator(std::move(file_read_buf));
auto [columns_description, format] = DB::detectFormatAndReadSchema(format_settings, read_buffer_iterator, context);
DB::ColumnsWithTypeAndName columns;
columns.reserve(columns_description.size());
for (const auto & column_description : columns_description)
columns.push_back(DB::ColumnWithTypeAndName{column_description.type, column_description.name});
header_block = std::move(columns);
file_read_buf
= DB::wrapReadBufferWithCompressionMethod(std::make_unique<DB::ReadBufferFromFile>(input_request_log), compression_method);
input_format = DB::FormatFactory::instance().getInput(
format,
*file_read_buf,
header_block,
context,
context->getSettingsRef().max_block_size,
format_settings,
1,
std::nullopt,
/*is_remote_fs*/ false,
DB::CompressionMethod::None,
false);
Coordination::ACL acl;
acl.permissions = Coordination::ACL::All;
acl.scheme = "world";
acl.id = "anyone";
default_acls.emplace_back(std::move(acl));
}
std::optional<RequestFromLog> getNextRequest(bool for_multi = false)
{
RequestFromLog request_from_log;
if (!current_block)
{
auto chunk = input_format->generate();
if (chunk.empty())
return std::nullopt;
current_block.emplace(header_block.cloneWithColumns(chunk.detachColumns()));
idx_in_block = 0;
}
request_from_log.expected_result = current_block->getError(idx_in_block);
request_from_log.session_id = current_block->getSessionId(idx_in_block);
request_from_log.has_watch = current_block->hasWatch(idx_in_block);
request_from_log.executor_id = current_block->getExecutorId(idx_in_block);
request_from_log.request_event_time = current_block->getRequestEventTime(idx_in_block);
request_from_log.response_event_time = current_block->getResponseEventTime(idx_in_block);
const auto move_row_iterator = [&]
{
if (idx_in_block == current_block->rows() - 1)
current_block.reset();
else
++idx_in_block;
};
auto op_num = current_block->getOpNum(idx_in_block);
switch (op_num)
{
case Coordination::OpNum::Create:
{
auto create_request = std::make_shared<Coordination::ZooKeeperCreateRequest>();
create_request->path = current_block->getPath(idx_in_block);
create_request->data = current_block->getData(idx_in_block);
create_request->is_ephemeral = current_block->isEphemeral(idx_in_block);
create_request->is_sequential = current_block->isSequential(idx_in_block);
request_from_log.request = create_request;
break;
}
case Coordination::OpNum::Set:
{
auto set_request = std::make_shared<Coordination::ZooKeeperSetRequest>();
set_request->path = current_block->getPath(idx_in_block);
set_request->data = current_block->getData(idx_in_block);
if (auto version = current_block->getVersion(idx_in_block))
set_request->version = *version;
request_from_log.request = set_request;
break;
}
case Coordination::OpNum::Remove:
{
auto remove_request = std::make_shared<Coordination::ZooKeeperRemoveRequest>();
remove_request->path = current_block->getPath(idx_in_block);
if (auto version = current_block->getVersion(idx_in_block))
remove_request->version = *version;
request_from_log.request = remove_request;
break;
}
case Coordination::OpNum::Check:
{
auto check_request = std::make_shared<Coordination::ZooKeeperCheckRequest>();
check_request->path = current_block->getPath(idx_in_block);
if (auto version = current_block->getVersion(idx_in_block))
check_request->version = *version;
request_from_log.request = check_request;
break;
}
case Coordination::OpNum::Sync:
{
auto sync_request = std::make_shared<Coordination::ZooKeeperSyncRequest>();
sync_request->path = current_block->getPath(idx_in_block);
request_from_log.request = sync_request;
break;
}
case Coordination::OpNum::Get:
{
auto get_request = std::make_shared<Coordination::ZooKeeperGetRequest>();
get_request->path = current_block->getPath(idx_in_block);
request_from_log.request = get_request;
break;
}
case Coordination::OpNum::SimpleList:
case Coordination::OpNum::FilteredList:
{
auto list_request = std::make_shared<Coordination::ZooKeeperSimpleListRequest>();
list_request->path = current_block->getPath(idx_in_block);
request_from_log.request = list_request;
break;
}
case Coordination::OpNum::Exists:
{
auto exists_request = std::make_shared<Coordination::ZooKeeperExistsRequest>();
exists_request->path = current_block->getPath(idx_in_block);
request_from_log.request = exists_request;
break;
}
case Coordination::OpNum::Multi:
case Coordination::OpNum::MultiRead:
{
if (for_multi)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Nested multi requests are not allowed");
auto requests_size = current_block->getRequestsSize(idx_in_block);
Coordination::Requests requests;
requests.reserve(requests_size);
move_row_iterator();
for (size_t i = 0; i < requests_size; ++i)
{
auto subrequest_from_log = getNextRequest(/*for_multi=*/true);
if (!subrequest_from_log)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to fetch subrequest for {}, subrequest index {}", op_num, i);
requests.push_back(std::move(subrequest_from_log->request));
if (subrequest_from_log->session_id != request_from_log.session_id)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Session id mismatch for subrequest in {}, subrequest index {}", op_num, i);
if (subrequest_from_log->executor_id != request_from_log.executor_id)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Executor id mismatch for subrequest in {}, subrequest index {}", op_num, i);
}
request_from_log.request = std::make_shared<Coordination::ZooKeeperMultiRequest>(requests, default_acls);
return request_from_log;
}
default:
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unsupported operation {} ({})", op_num, static_cast<int64_t>(op_num));
}
move_row_iterator();
return request_from_log;
}
private:
DB::Block header_block;
std::unique_ptr<DB::ReadBuffer> file_read_buf;
DB::InputFormatPtr input_format;
std::optional<ZooKeeperRequestBlock> current_block;
size_t idx_in_block = 0;
Coordination::ACLs default_acls;
};
namespace
{
struct RequestFromLogStats
{
struct Stats
{
std::atomic<size_t> total = 0;
std::atomic<size_t> unexpected_results = 0;
};
Stats write_requests;
Stats read_requests;
};
void dumpStats(std::string_view type, const RequestFromLogStats::Stats & stats_for_type)
{
std::cerr << fmt::format(
"{} requests: {} total, {} with unexpected results ({:.4}%)",
type,
stats_for_type.total,
stats_for_type.unexpected_results,
static_cast<double>(stats_for_type.unexpected_results) / stats_for_type.total * 100)
<< std::endl;
};
void requestFromLogExecutor(std::shared_ptr<ConcurrentBoundedQueue<RequestFromLog>> queue, RequestFromLogStats & request_stats)
{
RequestFromLog request_from_log;
std::optional<std::future<void>> last_request;
while (queue->pop(request_from_log))
{
auto request_promise = std::make_shared<std::promise<void>>();
last_request = request_promise->get_future();
Coordination::ResponseCallback callback
= [&, request_promise, request = request_from_log.request, expected_result = request_from_log.expected_result](
const Coordination::Response & response) mutable
{
auto & stats = request->isReadRequest() ? request_stats.read_requests : request_stats.write_requests;
stats.total.fetch_add(1, std::memory_order_relaxed);
if (*expected_result != response.error)
stats.unexpected_results.fetch_add(1, std::memory_order_relaxed);
//if (!expected_result)
// return;
//if (*expected_result != response.error)
// std::cerr << fmt::format(
// "Unexpected result for {}, got {}, expected {}", request->getOpNum(), response.error, *expected_result)
// << std::endl;
request_promise->set_value();
};
Coordination::WatchCallbackPtr watch;
if (request_from_log.has_watch)
watch = std::make_shared<Coordination::WatchCallback>([](const Coordination::WatchResponse &) {});
request_from_log.connection->executeGenericRequest(request_from_log.request, callback, watch);
}
if (last_request)
last_request->wait();
}
}
void Runner::runBenchmarkFromLog()
{
std::cerr << fmt::format("Running benchmark using requests from {}", input_request_log) << std::endl;
pool.emplace(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, concurrency);
shared_context = DB::Context::createShared();
global_context = DB::Context::createGlobal(shared_context.get());
global_context->makeGlobalContext();
DB::registerFormats();
/// Randomly choosing connection index
pcg64 rng(randomSeed());
std::uniform_int_distribution<size_t> connection_distribution(0, connection_infos.size() - 1);
std::unordered_map<int64_t, std::shared_ptr<Coordination::ZooKeeper>> zookeeper_connections;
auto get_zookeeper_connection = [&](int64_t session_id)
{
if (auto it = zookeeper_connections.find(session_id); it != zookeeper_connections.end() && !it->second->isExpired())
return it->second;
auto connection_idx = connection_distribution(rng);
auto zk_connection = getConnection(connection_infos[connection_idx], connection_idx);
zookeeper_connections.insert_or_assign(session_id, zk_connection);
return zk_connection;
};
RequestFromLogStats stats;
std::unordered_map<uint64_t, std::shared_ptr<ConcurrentBoundedQueue<RequestFromLog>>> executor_id_to_queue;
SCOPE_EXIT({
for (const auto & [executor_id, executor_queue] : executor_id_to_queue)
executor_queue->finish();
pool->wait();
dumpStats("Write", stats.write_requests);
dumpStats("Read", stats.read_requests);
});
auto push_request = [&](RequestFromLog request)
{
if (auto it = executor_id_to_queue.find(request.executor_id); it != executor_id_to_queue.end())
{
auto success = it->second->push(std::move(request));
if (!success)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Failed to push to the executor's queue");
return;
}
auto executor_queue = std::make_shared<ConcurrentBoundedQueue<RequestFromLog>>(std::numeric_limits<uint64_t>().max());
executor_id_to_queue.emplace(request.executor_id, executor_queue);
auto scheduled = pool->trySchedule([&, executor_queue]() mutable
{
requestFromLogExecutor(std::move(executor_queue), stats);
});
if (!scheduled)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Failed to schedule worker, try to increase concurrency parameter");
auto success = executor_queue->push(std::move(request));
if (!success)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Failed to push to the executor's queue");
};
{
auto setup_connection = getConnection(connection_infos[0], 0);
benchmark_context.startup(*setup_connection);
}
ZooKeeperRequestFromLogReader request_reader(input_request_log, global_context);
while (auto request_from_log = request_reader.getNextRequest())
{
request_from_log->connection = get_zookeeper_connection(request_from_log->session_id);
push_request(std::move(*request_from_log));
}
}
void Runner::runBenchmarkWithGenerator()
{
pool.emplace(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, concurrency);
queue.emplace(concurrency);
createConnections();
std::cerr << "Preparing to run\n";
benchmark_context.startup(*connections[0]);
generator->startup(*connections[0]);
std::cerr << "Prepared\n";
@ -458,8 +1010,225 @@ std::vector<std::shared_ptr<Coordination::ZooKeeper>> Runner::refreshConnections
Runner::~Runner()
{
queue->clearAndFinish();
if (queue)
queue->clearAndFinish();
shutdown = true;
pool->wait();
generator->cleanup(*connections[0]);
if (pool)
pool->wait();
auto connection = getConnection(connection_infos[0], 0);
benchmark_context.cleanup(*connection);
}
namespace
{
void removeRecursive(Coordination::ZooKeeper & zookeeper, const std::string & path)
{
namespace fs = std::filesystem;
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
Strings children;
auto list_callback = [promise, &children] (const Coordination::ListResponse & response)
{
children = response.names;
promise->set_value();
};
zookeeper.list(path, Coordination::ListRequestType::ALL, list_callback, nullptr);
future.get();
std::span children_span(children);
while (!children_span.empty())
{
Coordination::Requests ops;
for (size_t i = 0; i < 1000 && !children.empty(); ++i)
{
removeRecursive(zookeeper, fs::path(path) / children.back());
ops.emplace_back(zkutil::makeRemoveRequest(fs::path(path) / children_span.back(), -1));
children_span = children_span.subspan(0, children_span.size() - 1);
}
auto multi_promise = std::make_shared<std::promise<void>>();
auto multi_future = multi_promise->get_future();
auto multi_callback = [multi_promise] (const Coordination::MultiResponse &)
{
multi_promise->set_value();
};
zookeeper.multi(ops, multi_callback);
multi_future.get();
}
auto remove_promise = std::make_shared<std::promise<void>>();
auto remove_future = remove_promise->get_future();
auto remove_callback = [remove_promise] (const Coordination::RemoveResponse &)
{
remove_promise->set_value();
};
zookeeper.remove(path, -1, remove_callback);
remove_future.get();
}
}
void BenchmarkContext::initializeFromConfig(const Poco::Util::AbstractConfiguration & config)
{
Coordination::ACL acl;
acl.permissions = Coordination::ACL::All;
acl.scheme = "world";
acl.id = "anyone";
default_acls.emplace_back(std::move(acl));
std::cerr << "---- Parsing setup ---- " << std::endl;
static const std::string setup_key = "setup";
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(setup_key, keys);
for (const auto & key : keys)
{
if (key.starts_with("node"))
{
auto node_key = setup_key + "." + key;
auto parsed_root_node = parseNode(node_key, config);
const auto node = root_nodes.emplace_back(parsed_root_node);
if (config.has(node_key + ".repeat"))
{
if (!node->name.isRandom())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key);
auto repeat_count = config.getUInt64(node_key + ".repeat");
node->repeat_count = repeat_count;
for (size_t i = 1; i < repeat_count; ++i)
root_nodes.emplace_back(node->clone());
}
std::cerr << "Tree to create:" << std::endl;
node->dumpTree();
std::cerr << std::endl;
}
}
std::cerr << "---- Done parsing data setup ----\n" << std::endl;
}
std::shared_ptr<BenchmarkContext::Node> BenchmarkContext::parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config)
{
auto node = std::make_shared<BenchmarkContext::Node>();
node->name = StringGetter::fromConfig(key + ".name", config);
if (config.has(key + ".data"))
node->data = StringGetter::fromConfig(key + ".data", config);
Poco::Util::AbstractConfiguration::Keys node_keys;
config.keys(key, node_keys);
for (const auto & node_key : node_keys)
{
if (!node_key.starts_with("node"))
continue;
const auto node_key_string = key + "." + node_key;
auto child_node = parseNode(node_key_string, config);
node->children.push_back(child_node);
if (config.has(node_key_string + ".repeat"))
{
if (!child_node->name.isRandom())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key_string);
auto repeat_count = config.getUInt64(node_key_string + ".repeat");
child_node->repeat_count = repeat_count;
for (size_t i = 1; i < repeat_count; ++i)
node->children.push_back(child_node);
}
}
return node;
}
void BenchmarkContext::Node::dumpTree(int level) const
{
std::string data_string
= data.has_value() ? fmt::format("{}", data->description()) : "no data";
std::string repeat_count_string = repeat_count != 0 ? fmt::format(", repeated {} times", repeat_count) : "";
std::cerr << fmt::format("{}name: {}, data: {}{}", std::string(level, '\t'), name.description(), data_string, repeat_count_string) << std::endl;
for (auto it = children.begin(); it != children.end();)
{
const auto & child = *it;
child->dumpTree(level + 1);
std::advance(it, child->repeat_count != 0 ? child->repeat_count : 1);
}
}
std::shared_ptr<BenchmarkContext::Node> BenchmarkContext::Node::clone() const
{
auto new_node = std::make_shared<Node>();
new_node->name = name;
new_node->data = data;
new_node->repeat_count = repeat_count;
// don't do deep copy of children because we will do clone only for root nodes
new_node->children = children;
return new_node;
}
void BenchmarkContext::Node::createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const
{
auto path = std::filesystem::path(parent_path) / name.getString();
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto create_callback = [promise] (const Coordination::CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
zookeeper.create(path, data ? data->getString() : "", false, false, acls, create_callback);
future.get();
for (const auto & child : children)
child->createNode(zookeeper, path, acls);
}
void BenchmarkContext::startup(Coordination::ZooKeeper & zookeeper)
{
if (root_nodes.empty())
return;
std::cerr << "---- Creating test data ----" << std::endl;
for (const auto & node : root_nodes)
{
auto node_name = node->name.getString();
node->name.setString(node_name);
std::string root_path = std::filesystem::path("/") / node_name;
std::cerr << "Cleaning up " << root_path << std::endl;
removeRecursive(zookeeper, root_path);
node->createNode(zookeeper, "/", default_acls);
}
std::cerr << "---- Created test data ----\n" << std::endl;
}
void BenchmarkContext::cleanup(Coordination::ZooKeeper & zookeeper)
{
if (root_nodes.empty())
return;
std::cerr << "---- Cleaning up test data ----" << std::endl;
for (const auto & node : root_nodes)
{
auto node_name = node->name.getString();
std::string root_path = std::filesystem::path("/") / node_name;
std::cerr << "Cleaning up " << root_path << std::endl;
removeRecursive(zookeeper, root_path);
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#include "Common/ZooKeeper/ZooKeeperArgs.h"
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include "Generator.h"
#include <Common/ZooKeeper/IKeeper.h>
@ -12,6 +12,7 @@
#include <Core/Types.h>
#include <Poco/Util/AbstractConfiguration.h>
#include "Interpreters/Context.h"
#include "Stats.h"
#include <filesystem>
@ -19,12 +20,40 @@
using Ports = std::vector<UInt16>;
using Strings = std::vector<std::string>;
struct BenchmarkContext
{
public:
void initializeFromConfig(const Poco::Util::AbstractConfiguration & config);
void startup(Coordination::ZooKeeper & zookeeper);
void cleanup(Coordination::ZooKeeper & zookeeper);
private:
struct Node
{
StringGetter name;
std::optional<StringGetter> data;
std::vector<std::shared_ptr<Node>> children;
size_t repeat_count = 0;
std::shared_ptr<Node> clone() const;
void createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const;
void dumpTree(int level = 0) const;
};
static std::shared_ptr<Node> parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config);
std::vector<std::shared_ptr<Node>> root_nodes;
Coordination::ACLs default_acls;
};
class Runner
{
public:
Runner(
std::optional<size_t> concurrency_,
const std::string & config_path,
const std::string & input_request_log_,
const Strings & hosts_strings_,
std::optional<double> max_time_,
std::optional<double> delay_,
@ -44,8 +73,30 @@ public:
~Runner();
private:
struct ConnectionInfo
{
std::string host;
bool secure = false;
int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS;
int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
bool use_compression = false;
size_t sessions = 1;
};
void parseHostsFromConfig(const Poco::Util::AbstractConfiguration & config);
void runBenchmarkWithGenerator();
void runBenchmarkFromLog();
void createConnections();
std::vector<std::shared_ptr<Coordination::ZooKeeper>> refreshConnections();
std::shared_ptr<Coordination::ZooKeeper> getConnection(const ConnectionInfo & connection_info, size_t connection_info_idx);
std::string input_request_log;
size_t concurrency = 1;
std::optional<ThreadPool> pool;
@ -54,7 +105,8 @@ private:
double max_time = 0;
double delay = 1;
bool continue_on_error = false;
std::atomic<size_t> max_iterations = 0;
size_t max_iterations = 0;
std::atomic<size_t> requests_executed = 0;
std::atomic<bool> shutdown = false;
@ -71,25 +123,14 @@ private:
using Queue = ConcurrentBoundedQueue<Coordination::ZooKeeperRequestPtr>;
std::optional<Queue> queue;
struct ConnectionInfo
{
std::string host;
bool secure = false;
int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS;
int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
bool use_compression = false;
size_t sessions = 1;
};
std::mutex connection_mutex;
ConnectionInfo default_connection_info;
std::vector<ConnectionInfo> connection_infos;
std::vector<std::shared_ptr<Coordination::ZooKeeper>> connections;
std::unordered_map<size_t, size_t> connections_to_info_map;
void createConnections();
std::shared_ptr<Coordination::ZooKeeper> getConnection(const ConnectionInfo & connection_info, size_t connection_info_idx);
std::vector<std::shared_ptr<Coordination::ZooKeeper>> refreshConnections();
DB::SharedContextHolder shared_context;
DB::ContextMutablePtr global_context;
BenchmarkContext benchmark_context;
};

View File

@ -1,8 +1,6 @@
#include <iostream>
#include <boost/program_options.hpp>
#include "Runner.h"
#include "Stats.h"
#include "Generator.h"
#include "Common/Exception.h"
#include <Common/TerminalSize.h>
#include <Core/Types.h>
@ -27,6 +25,10 @@ int main(int argc, char *argv[])
bool print_stacktrace = true;
//Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
//Poco::Logger::root().setChannel(channel);
//Poco::Logger::root().setLevel("trace");
try
{
using boost::program_options::value;
@ -34,12 +36,13 @@ int main(int argc, char *argv[])
boost::program_options::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
desc.add_options()
("help", "produce help message")
("config", value<std::string>()->default_value(""), "yaml/xml file containing configuration")
("concurrency,c", value<unsigned>(), "number of parallel queries")
("report-delay,d", value<double>(), "delay between intermediate reports in seconds (set 0 to disable reports)")
("iterations,i", value<size_t>(), "amount of queries to be executed")
("time-limit,t", value<double>(), "stop launch of queries after specified time limit")
("hosts,h", value<Strings>()->multitoken()->default_value(Strings{}, ""), "")
("config", value<std::string>()->default_value(""), "yaml/xml file containing configuration")
("input-request-log", value<std::string>()->default_value(""), "log of requests that will be replayed")
("concurrency,c", value<unsigned>(), "number of parallel queries")
("report-delay,d", value<double>(), "delay between intermediate reports in seconds (set 0 to disable reports)")
("iterations,i", value<size_t>(), "amount of queries to be executed")
("time-limit,t", value<double>(), "stop launch of queries after specified time limit")
("hosts,h", value<Strings>()->multitoken()->default_value(Strings{}, ""), "")
("continue_on_errors", "continue testing even if a query fails")
;
@ -56,6 +59,7 @@ int main(int argc, char *argv[])
Runner runner(valueToOptional<unsigned>(options["concurrency"]),
options["config"].as<std::string>(),
options["input-request-log"].as<std::string>(),
options["hosts"].as<Strings>(),
valueToOptional<double>(options["time-limit"]),
valueToOptional<double>(options["report-delay"]),
@ -66,9 +70,9 @@ int main(int argc, char *argv[])
{
runner.runBenchmark();
}
catch (const DB::Exception & e)
catch (...)
{
std::cout << "Got exception while trying to run benchmark: " << e.message() << std::endl;
std::cout << "Got exception while trying to run benchmark: " << DB::getCurrentExceptionMessage(true) << std::endl;
}
return 0;