mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #62481 from ClickHouse/keeper-bench-replay-zookeeper-log
Replay ZK logs using keeper-bench
This commit is contained in:
commit
bfe68fd3ca
@ -1259,11 +1259,13 @@ void ZooKeeper::initFeatureFlags()
|
||||
|
||||
void ZooKeeper::executeGenericRequest(
|
||||
const ZooKeeperRequestPtr & request,
|
||||
ResponseCallback callback)
|
||||
ResponseCallback callback,
|
||||
WatchCallbackPtr watch)
|
||||
{
|
||||
RequestInfo request_info;
|
||||
request_info.request = request;
|
||||
request_info.callback = callback;
|
||||
request_info.watch = watch;
|
||||
|
||||
pushRequest(std::move(request_info));
|
||||
}
|
||||
|
@ -139,7 +139,8 @@ public:
|
||||
|
||||
void executeGenericRequest(
|
||||
const ZooKeeperRequestPtr & request,
|
||||
ResponseCallback callback);
|
||||
ResponseCallback callback,
|
||||
WatchCallbackPtr watch = nullptr);
|
||||
|
||||
/// See the documentation about semantics of these methods in IKeeper class.
|
||||
|
||||
|
@ -4,5 +4,4 @@ if (NOT TARGET ch_contrib::rapidjson)
|
||||
endif ()
|
||||
|
||||
clickhouse_add_executable(keeper-bench Generator.cpp Runner.cpp Stats.cpp main.cpp)
|
||||
target_link_libraries(keeper-bench PRIVATE dbms)
|
||||
target_link_libraries(keeper-bench PRIVATE ch_contrib::rapidjson)
|
||||
target_link_libraries(keeper-bench PRIVATE dbms clickhouse_functions ch_contrib::rapidjson)
|
||||
|
@ -40,54 +40,6 @@ std::string generateRandomString(size_t length)
|
||||
}
|
||||
}
|
||||
|
||||
void removeRecursive(Coordination::ZooKeeper & zookeeper, const std::string & path)
|
||||
{
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
auto promise = std::make_shared<std::promise<void>>();
|
||||
auto future = promise->get_future();
|
||||
|
||||
Strings children;
|
||||
auto list_callback = [promise, &children] (const ListResponse & response)
|
||||
{
|
||||
children = response.names;
|
||||
|
||||
promise->set_value();
|
||||
};
|
||||
zookeeper.list(path, ListRequestType::ALL, list_callback, nullptr);
|
||||
future.get();
|
||||
|
||||
while (!children.empty())
|
||||
{
|
||||
Coordination::Requests ops;
|
||||
for (size_t i = 0; i < MULTI_BATCH_SIZE && !children.empty(); ++i)
|
||||
{
|
||||
removeRecursive(zookeeper, fs::path(path) / children.back());
|
||||
ops.emplace_back(makeRemoveRequest(fs::path(path) / children.back(), -1));
|
||||
children.pop_back();
|
||||
}
|
||||
auto multi_promise = std::make_shared<std::promise<void>>();
|
||||
auto multi_future = multi_promise->get_future();
|
||||
|
||||
auto multi_callback = [multi_promise] (const MultiResponse &)
|
||||
{
|
||||
multi_promise->set_value();
|
||||
};
|
||||
zookeeper.multi(ops, multi_callback);
|
||||
multi_future.get();
|
||||
}
|
||||
auto remove_promise = std::make_shared<std::promise<void>>();
|
||||
auto remove_future = remove_promise->get_future();
|
||||
|
||||
auto remove_callback = [remove_promise] (const RemoveResponse &)
|
||||
{
|
||||
remove_promise->set_value();
|
||||
};
|
||||
|
||||
zookeeper.remove(path, -1, remove_callback);
|
||||
remove_future.get();
|
||||
}
|
||||
|
||||
NumberGetter
|
||||
NumberGetter::fromConfig(const std::string & key, const Poco::Util::AbstractConfiguration & config, std::optional<uint64_t> default_value)
|
||||
{
|
||||
@ -603,148 +555,16 @@ Generator::Generator(const Poco::Util::AbstractConfiguration & config)
|
||||
acl.id = "anyone";
|
||||
default_acls.emplace_back(std::move(acl));
|
||||
|
||||
static const std::string generator_key = "generator";
|
||||
|
||||
std::cerr << "---- Parsing setup ---- " << std::endl;
|
||||
static const std::string setup_key = generator_key + ".setup";
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
config.keys(setup_key, keys);
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
if (key.starts_with("node"))
|
||||
{
|
||||
auto node_key = setup_key + "." + key;
|
||||
auto parsed_root_node = parseNode(node_key, config);
|
||||
const auto node = root_nodes.emplace_back(parsed_root_node);
|
||||
|
||||
if (config.has(node_key + ".repeat"))
|
||||
{
|
||||
if (!node->name.isRandom())
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key);
|
||||
|
||||
auto repeat_count = config.getUInt64(node_key + ".repeat");
|
||||
node->repeat_count = repeat_count;
|
||||
for (size_t i = 1; i < repeat_count; ++i)
|
||||
root_nodes.emplace_back(node->clone());
|
||||
}
|
||||
|
||||
std::cerr << "Tree to create:" << std::endl;
|
||||
|
||||
node->dumpTree();
|
||||
std::cerr << std::endl;
|
||||
}
|
||||
}
|
||||
std::cerr << "---- Done parsing data setup ----\n" << std::endl;
|
||||
|
||||
std::cerr << "---- Collecting request generators ----" << std::endl;
|
||||
static const std::string requests_key = generator_key + ".requests";
|
||||
static const std::string requests_key = "generator.requests";
|
||||
request_getter = RequestGetter::fromConfig(requests_key, config);
|
||||
std::cerr << request_getter.description() << std::endl;
|
||||
std::cerr << "---- Done collecting request generators ----\n" << std::endl;
|
||||
}
|
||||
|
||||
std::shared_ptr<Generator::Node> Generator::parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config)
|
||||
{
|
||||
auto node = std::make_shared<Generator::Node>();
|
||||
node->name = StringGetter::fromConfig(key + ".name", config);
|
||||
|
||||
if (config.has(key + ".data"))
|
||||
node->data = StringGetter::fromConfig(key + ".data", config);
|
||||
|
||||
Poco::Util::AbstractConfiguration::Keys node_keys;
|
||||
config.keys(key, node_keys);
|
||||
|
||||
for (const auto & node_key : node_keys)
|
||||
{
|
||||
if (!node_key.starts_with("node"))
|
||||
continue;
|
||||
|
||||
const auto node_key_string = key + "." + node_key;
|
||||
auto child_node = parseNode(node_key_string, config);
|
||||
node->children.push_back(child_node);
|
||||
|
||||
if (config.has(node_key_string + ".repeat"))
|
||||
{
|
||||
if (!child_node->name.isRandom())
|
||||
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key_string);
|
||||
|
||||
auto repeat_count = config.getUInt64(node_key_string + ".repeat");
|
||||
child_node->repeat_count = repeat_count;
|
||||
for (size_t i = 1; i < repeat_count; ++i)
|
||||
node->children.push_back(child_node);
|
||||
}
|
||||
}
|
||||
|
||||
return node;
|
||||
}
|
||||
|
||||
void Generator::Node::dumpTree(int level) const
|
||||
{
|
||||
std::string data_string
|
||||
= data.has_value() ? fmt::format("{}", data->description()) : "no data";
|
||||
|
||||
std::string repeat_count_string = repeat_count != 0 ? fmt::format(", repeated {} times", repeat_count) : "";
|
||||
|
||||
std::cerr << fmt::format("{}name: {}, data: {}{}", std::string(level, '\t'), name.description(), data_string, repeat_count_string) << std::endl;
|
||||
|
||||
for (auto it = children.begin(); it != children.end();)
|
||||
{
|
||||
const auto & child = *it;
|
||||
child->dumpTree(level + 1);
|
||||
std::advance(it, child->repeat_count != 0 ? child->repeat_count : 1);
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Generator::Node> Generator::Node::clone() const
|
||||
{
|
||||
auto new_node = std::make_shared<Node>();
|
||||
new_node->name = name;
|
||||
new_node->data = data;
|
||||
new_node->repeat_count = repeat_count;
|
||||
|
||||
// don't do deep copy of children because we will do clone only for root nodes
|
||||
new_node->children = children;
|
||||
|
||||
return new_node;
|
||||
}
|
||||
|
||||
void Generator::Node::createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const
|
||||
{
|
||||
auto path = std::filesystem::path(parent_path) / name.getString();
|
||||
auto promise = std::make_shared<std::promise<void>>();
|
||||
auto future = promise->get_future();
|
||||
auto create_callback = [promise] (const CreateResponse & response)
|
||||
{
|
||||
if (response.error != Coordination::Error::ZOK)
|
||||
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
|
||||
else
|
||||
promise->set_value();
|
||||
};
|
||||
zookeeper.create(path, data ? data->getString() : "", false, false, acls, create_callback);
|
||||
future.get();
|
||||
|
||||
for (const auto & child : children)
|
||||
child->createNode(zookeeper, path, acls);
|
||||
}
|
||||
|
||||
void Generator::startup(Coordination::ZooKeeper & zookeeper)
|
||||
{
|
||||
std::cerr << "---- Creating test data ----" << std::endl;
|
||||
for (const auto & node : root_nodes)
|
||||
{
|
||||
auto node_name = node->name.getString();
|
||||
node->name.setString(node_name);
|
||||
|
||||
std::string root_path = std::filesystem::path("/") / node_name;
|
||||
std::cerr << "Cleaning up " << root_path << std::endl;
|
||||
removeRecursive(zookeeper, root_path);
|
||||
|
||||
node->createNode(zookeeper, "/", default_acls);
|
||||
}
|
||||
std::cerr << "---- Created test data ----\n" << std::endl;
|
||||
|
||||
std::cerr << "---- Initializing generators ----" << std::endl;
|
||||
|
||||
request_getter.startup(zookeeper);
|
||||
}
|
||||
|
||||
@ -752,15 +572,3 @@ Coordination::ZooKeeperRequestPtr Generator::generate()
|
||||
{
|
||||
return request_getter.getRequestGenerator()->generate(default_acls);
|
||||
}
|
||||
|
||||
void Generator::cleanup(Coordination::ZooKeeper & zookeeper)
|
||||
{
|
||||
std::cerr << "---- Cleaning up test data ----" << std::endl;
|
||||
for (const auto & node : root_nodes)
|
||||
{
|
||||
auto node_name = node->name.getString();
|
||||
std::string root_path = std::filesystem::path("/") / node_name;
|
||||
std::cerr << "Cleaning up " << root_path << std::endl;
|
||||
removeRecursive(zookeeper, root_path);
|
||||
}
|
||||
}
|
||||
|
@ -173,27 +173,9 @@ public:
|
||||
|
||||
void startup(Coordination::ZooKeeper & zookeeper);
|
||||
Coordination::ZooKeeperRequestPtr generate();
|
||||
void cleanup(Coordination::ZooKeeper & zookeeper);
|
||||
private:
|
||||
struct Node
|
||||
{
|
||||
StringGetter name;
|
||||
std::optional<StringGetter> data;
|
||||
std::vector<std::shared_ptr<Node>> children;
|
||||
size_t repeat_count = 0;
|
||||
|
||||
std::shared_ptr<Node> clone() const;
|
||||
|
||||
void createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const;
|
||||
void dumpTree(int level = 0) const;
|
||||
};
|
||||
|
||||
static std::shared_ptr<Node> parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
std::uniform_int_distribution<size_t> request_picker;
|
||||
std::vector<std::shared_ptr<Node>> root_nodes;
|
||||
RequestGetter request_getter;
|
||||
Coordination::ACLs default_acls;
|
||||
};
|
||||
|
||||
std::optional<Generator> getGenerator(const std::string & name);
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -1,5 +1,5 @@
|
||||
#pragma once
|
||||
#include "Common/ZooKeeper/ZooKeeperConstants.h"
|
||||
#include "Common/ZooKeeper/ZooKeeperArgs.h"
|
||||
#include <Common/ZooKeeper/ZooKeeperImpl.h>
|
||||
#include "Generator.h"
|
||||
#include <Common/ZooKeeper/IKeeper.h>
|
||||
@ -12,6 +12,7 @@
|
||||
|
||||
#include <Core/Types.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include "Interpreters/Context.h"
|
||||
#include "Stats.h"
|
||||
|
||||
#include <filesystem>
|
||||
@ -19,12 +20,42 @@
|
||||
using Ports = std::vector<UInt16>;
|
||||
using Strings = std::vector<std::string>;
|
||||
|
||||
struct BenchmarkContext
|
||||
{
|
||||
public:
|
||||
void initializeFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void startup(Coordination::ZooKeeper & zookeeper);
|
||||
void cleanup(Coordination::ZooKeeper & zookeeper);
|
||||
|
||||
private:
|
||||
struct Node
|
||||
{
|
||||
StringGetter name;
|
||||
std::optional<StringGetter> data;
|
||||
std::vector<std::shared_ptr<Node>> children;
|
||||
size_t repeat_count = 0;
|
||||
|
||||
std::shared_ptr<Node> clone() const;
|
||||
|
||||
void createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const;
|
||||
void dumpTree(int level = 0) const;
|
||||
};
|
||||
|
||||
static std::shared_ptr<Node> parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
std::vector<std::shared_ptr<Node>> root_nodes;
|
||||
Coordination::ACLs default_acls;
|
||||
};
|
||||
|
||||
class Runner
|
||||
{
|
||||
public:
|
||||
Runner(
|
||||
std::optional<size_t> concurrency_,
|
||||
const std::string & config_path,
|
||||
const std::string & input_request_log_,
|
||||
const std::string & setup_nodes_snapshot_path_,
|
||||
const Strings & hosts_strings_,
|
||||
std::optional<double> max_time_,
|
||||
std::optional<double> delay_,
|
||||
@ -44,8 +75,31 @@ public:
|
||||
|
||||
~Runner();
|
||||
private:
|
||||
struct ConnectionInfo
|
||||
{
|
||||
std::string host;
|
||||
|
||||
bool secure = false;
|
||||
int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
|
||||
int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS;
|
||||
int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
|
||||
bool use_compression = false;
|
||||
|
||||
size_t sessions = 1;
|
||||
};
|
||||
|
||||
void parseHostsFromConfig(const Poco::Util::AbstractConfiguration & config);
|
||||
|
||||
void runBenchmarkWithGenerator();
|
||||
void runBenchmarkFromLog();
|
||||
|
||||
void createConnections();
|
||||
std::vector<std::shared_ptr<Coordination::ZooKeeper>> refreshConnections();
|
||||
std::shared_ptr<Coordination::ZooKeeper> getConnection(const ConnectionInfo & connection_info, size_t connection_info_idx);
|
||||
|
||||
std::string input_request_log;
|
||||
std::string setup_nodes_snapshot_path;
|
||||
|
||||
size_t concurrency = 1;
|
||||
|
||||
std::optional<ThreadPool> pool;
|
||||
@ -54,7 +108,8 @@ private:
|
||||
double max_time = 0;
|
||||
double delay = 1;
|
||||
bool continue_on_error = false;
|
||||
std::atomic<size_t> max_iterations = 0;
|
||||
size_t max_iterations = 0;
|
||||
|
||||
std::atomic<size_t> requests_executed = 0;
|
||||
std::atomic<bool> shutdown = false;
|
||||
|
||||
@ -71,25 +126,14 @@ private:
|
||||
using Queue = ConcurrentBoundedQueue<Coordination::ZooKeeperRequestPtr>;
|
||||
std::optional<Queue> queue;
|
||||
|
||||
struct ConnectionInfo
|
||||
{
|
||||
std::string host;
|
||||
|
||||
bool secure = false;
|
||||
int32_t session_timeout_ms = Coordination::DEFAULT_SESSION_TIMEOUT_MS;
|
||||
int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS;
|
||||
int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
|
||||
bool use_compression = false;
|
||||
|
||||
size_t sessions = 1;
|
||||
};
|
||||
|
||||
std::mutex connection_mutex;
|
||||
ConnectionInfo default_connection_info;
|
||||
std::vector<ConnectionInfo> connection_infos;
|
||||
std::vector<std::shared_ptr<Coordination::ZooKeeper>> connections;
|
||||
std::unordered_map<size_t, size_t> connections_to_info_map;
|
||||
|
||||
void createConnections();
|
||||
std::shared_ptr<Coordination::ZooKeeper> getConnection(const ConnectionInfo & connection_info, size_t connection_info_idx);
|
||||
std::vector<std::shared_ptr<Coordination::ZooKeeper>> refreshConnections();
|
||||
DB::SharedContextHolder shared_context;
|
||||
DB::ContextMutablePtr global_context;
|
||||
|
||||
BenchmarkContext benchmark_context;
|
||||
};
|
||||
|
@ -1,8 +1,6 @@
|
||||
#include <iostream>
|
||||
#include <boost/program_options.hpp>
|
||||
#include "Runner.h"
|
||||
#include "Stats.h"
|
||||
#include "Generator.h"
|
||||
#include "Common/Exception.h"
|
||||
#include <Common/TerminalSize.h>
|
||||
#include <Core/Types.h>
|
||||
@ -27,6 +25,10 @@ int main(int argc, char *argv[])
|
||||
|
||||
bool print_stacktrace = true;
|
||||
|
||||
//Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
|
||||
//Poco::Logger::root().setChannel(channel);
|
||||
//Poco::Logger::root().setLevel("trace");
|
||||
|
||||
try
|
||||
{
|
||||
using boost::program_options::value;
|
||||
@ -34,12 +36,14 @@ int main(int argc, char *argv[])
|
||||
boost::program_options::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
|
||||
desc.add_options()
|
||||
("help", "produce help message")
|
||||
("config", value<std::string>()->default_value(""), "yaml/xml file containing configuration")
|
||||
("concurrency,c", value<unsigned>(), "number of parallel queries")
|
||||
("report-delay,d", value<double>(), "delay between intermediate reports in seconds (set 0 to disable reports)")
|
||||
("iterations,i", value<size_t>(), "amount of queries to be executed")
|
||||
("time-limit,t", value<double>(), "stop launch of queries after specified time limit")
|
||||
("hosts,h", value<Strings>()->multitoken()->default_value(Strings{}, ""), "")
|
||||
("config", value<std::string>()->default_value(""), "yaml/xml file containing configuration")
|
||||
("input-request-log", value<std::string>()->default_value(""), "log of requests that will be replayed")
|
||||
("setup-nodes-snapshot-path", value<std::string>()->default_value(""), "directory containing snapshots with starting state")
|
||||
("concurrency,c", value<unsigned>(), "number of parallel queries")
|
||||
("report-delay,d", value<double>(), "delay between intermediate reports in seconds (set 0 to disable reports)")
|
||||
("iterations,i", value<size_t>(), "amount of queries to be executed")
|
||||
("time-limit,t", value<double>(), "stop launch of queries after specified time limit")
|
||||
("hosts,h", value<Strings>()->multitoken()->default_value(Strings{}, ""), "")
|
||||
("continue_on_errors", "continue testing even if a query fails")
|
||||
;
|
||||
|
||||
@ -56,6 +60,8 @@ int main(int argc, char *argv[])
|
||||
|
||||
Runner runner(valueToOptional<unsigned>(options["concurrency"]),
|
||||
options["config"].as<std::string>(),
|
||||
options["input-request-log"].as<std::string>(),
|
||||
options["setup-nodes-snapshot-path"].as<std::string>(),
|
||||
options["hosts"].as<Strings>(),
|
||||
valueToOptional<double>(options["time-limit"]),
|
||||
valueToOptional<double>(options["report-delay"]),
|
||||
@ -66,9 +72,9 @@ int main(int argc, char *argv[])
|
||||
{
|
||||
runner.runBenchmark();
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
catch (...)
|
||||
{
|
||||
std::cout << "Got exception while trying to run benchmark: " << e.message() << std::endl;
|
||||
std::cout << "Got exception while trying to run benchmark: " << DB::getCurrentExceptionMessage(true) << std::endl;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
Loading…
Reference in New Issue
Block a user