Add JSON output

This commit is contained in:
Antonio Andelic 2023-04-14 13:32:08 +00:00
parent 32adebb723
commit 6bc1ab7ab1
8 changed files with 255 additions and 119 deletions

View File

@ -13,7 +13,7 @@
# and then to run formatter only for the specified files.
ROOT_PATH=$(git rev-parse --show-toplevel)
EXCLUDE_DIRS='build/|integration/|widechar_width/|glibc-compatibility/|poco/|memcpy/|consistent-hashing|benchmark|tests/'
EXCLUDE_DIRS='build/|integration/|widechar_width/|glibc-compatibility/|poco/|memcpy/|consistent-hashing|benchmark|tests/|utils/keeper-bench/example.yaml'
# From [1]:
# But since array_to_string_internal() in array.c still loops over array

View File

@ -1,2 +1,2 @@
clickhouse_add_executable(keeper-bench Generator.cpp Runner.cpp Stats.cpp main.cpp)
target_link_libraries(keeper-bench PRIVATE clickhouse_common_config_no_zookeeper_log)
target_link_libraries(keeper-bench PRIVATE clickhouse_common_config_no_zookeeper_log ch_contrib::rapidjson)

View File

@ -12,6 +12,7 @@ using namespace zkutil;
namespace DB::ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
namespace
@ -308,7 +309,7 @@ RequestGetter RequestGetter::fromConfig(const std::string & key, const Poco::Uti
auto weight = request_generator->getWeight();
use_weights |= weight != 1;
weight_sum += weight;
generators.push_back(std::move(request_generator));
}
@ -575,7 +576,7 @@ Coordination::ZooKeeperRequestPtr MultiRequestGenerator::generateImpl(const Coor
if (size)
{
auto request_count = size->getNumber();
auto request_count = size->getNumber();
for (size_t i = 0; i < request_count; ++i)
ops.push_back(request_getter.getRequestGenerator()->generate(acls));
@ -604,7 +605,7 @@ Generator::Generator(const Poco::Util::AbstractConfiguration & config)
static const std::string generator_key = "generator";
std::cout << "---- Parsing setup ---- " << std::endl;
std::cerr << "---- Parsing setup ---- " << std::endl;
static const std::string setup_key = generator_key + ".setup";
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(setup_key, keys);
@ -612,20 +613,34 @@ Generator::Generator(const Poco::Util::AbstractConfiguration & config)
{
if (key.starts_with("node"))
{
const auto & node = root_nodes.emplace_back(parseNode(setup_key + "." + key, config));
auto node_key = setup_key + "." + key;
auto parsed_root_node = parseNode(node_key, config);
const auto node = root_nodes.emplace_back(parsed_root_node);
if (config.has(node_key + ".repeat"))
{
if (!node->name.isRandom())
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key);
auto repeat_count = config.getUInt64(node_key + ".repeat");
node->repeat_count = repeat_count;
for (size_t i = 1; i < repeat_count; ++i)
root_nodes.emplace_back(node->clone());
}
std::cerr << "Tree to create:" << std::endl;
std::cout << "Tree to create:" << std::endl;
node->dumpTree();
std::cout << std::endl;
std::cerr << std::endl;
}
}
std::cout << "---- Done parsing data setup ----\n" << std::endl;
std::cerr << "---- Done parsing data setup ----\n" << std::endl;
std::cout << "---- Collecting request generators ----" << std::endl;
std::cerr << "---- Collecting request generators ----" << std::endl;
static const std::string requests_key = generator_key + ".requests";
request_getter = RequestGetter::fromConfig(requests_key, config);
std::cout << request_getter.description() << std::endl;
std::cout << "---- Done collecting request generators ----\n" << std::endl;
std::cerr << request_getter.description() << std::endl;
std::cerr << "---- Done collecting request generators ----\n" << std::endl;
}
std::shared_ptr<Generator::Node> Generator::parseNode(const std::string & key, const Poco::Util::AbstractConfiguration & config)
@ -654,6 +669,7 @@ std::shared_ptr<Generator::Node> Generator::parseNode(const std::string & key, c
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Repeating node creation for key {}, but name is not randomly generated", node_key_string);
auto repeat_count = config.getUInt64(node_key_string + ".repeat");
child_node->repeat_count = repeat_count;
for (size_t i = 1; i < repeat_count; ++i)
node->children.push_back(child_node);
}
@ -666,10 +682,30 @@ void Generator::Node::dumpTree(int level) const
{
std::string data_string
= data.has_value() ? fmt::format("{}", data->description()) : "no data";
std::cout << fmt::format("{}name: {}, data: {}", std::string(level, '\t'), name.description(), data_string) << std::endl;
for (const auto & child : children)
std::string repeat_count_string = repeat_count != 0 ? fmt::format(", repeated {} times", repeat_count) : "";
std::cerr << fmt::format("{}name: {}, data: {}{}", std::string(level, '\t'), name.description(), data_string, repeat_count_string) << std::endl;
for (auto it = children.begin(); it != children.end();)
{
const auto & child = *it;
child->dumpTree(level + 1);
std::advance(it, child->repeat_count != 0 ? child->repeat_count : 1);
}
}
std::shared_ptr<Generator::Node> Generator::Node::clone() const
{
auto new_node = std::make_shared<Node>();
new_node->name = name;
new_node->data = data;
new_node->repeat_count = repeat_count;
// don't do deep copy of children because we will do clone only for root nodes
new_node->children = children;
return new_node;
}
void Generator::Node::createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const
@ -693,21 +729,21 @@ void Generator::Node::createNode(Coordination::ZooKeeper & zookeeper, const std:
void Generator::startup(Coordination::ZooKeeper & zookeeper)
{
std::cout << "---- Creating test data ----" << std::endl;
std::cerr << "---- Creating test data ----" << std::endl;
for (const auto & node : root_nodes)
{
auto node_name = node->name.getString();
node->name.setString(node_name);
std::string root_path = std::filesystem::path("/") / node_name;
std::cout << "Cleaning up " << root_path << std::endl;
std::cerr << "Cleaning up " << root_path << std::endl;
removeRecursive(zookeeper, root_path);
node->createNode(zookeeper, "/", default_acls);
}
std::cout << "---- Created test data ----\n" << std::endl;
std::cerr << "---- Created test data ----\n" << std::endl;
std::cout << "---- Initializing generators ----" << std::endl;
std::cerr << "---- Initializing generators ----" << std::endl;
request_getter.startup(zookeeper);
}
@ -719,12 +755,12 @@ Coordination::ZooKeeperRequestPtr Generator::generate()
void Generator::cleanup(Coordination::ZooKeeper & zookeeper)
{
std::cout << "---- Cleaning up test data ----" << std::endl;
std::cerr << "---- Cleaning up test data ----" << std::endl;
for (const auto & node : root_nodes)
{
auto node_name = node->name.getString();
std::string root_path = std::filesystem::path("/") / node_name;
std::cout << "Cleaning up " << root_path << std::endl;
std::cerr << "Cleaning up " << root_path << std::endl;
removeRecursive(zookeeper, root_path);
}
}

View File

@ -180,6 +180,9 @@ private:
StringGetter name;
std::optional<StringGetter> data;
std::vector<std::shared_ptr<Node>> children;
size_t repeat_count = 0;
std::shared_ptr<Node> clone() const;
void createNode(Coordination::ZooKeeper & zookeeper, const std::string & parent_path, const Coordination::ACLs & acls) const;
void dumpTree(int level = 0) const;

View File

@ -5,10 +5,18 @@
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#include <Common/EventNotifier.h>
#include <Common/Config/ConfigProcessor.h>
#include "IO/WriteBufferFromFile.h"
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
namespace DB::ErrorCodes
{
extern const int CANNOT_BLOCK_SIGNAL;
extern const int BAD_ARGUMENTS;
}
Runner::Runner(
@ -40,41 +48,41 @@ Runner::Runner(
parseHostsFromConfig(*config);
}
std::cout << "---- Run options ---- " << std::endl;
std::cerr << "---- Run options ---- " << std::endl;
static constexpr uint64_t DEFAULT_CONCURRENCY = 1;
if (concurrency_)
concurrency = *concurrency_;
else
concurrency = config->getUInt64("concurrency", DEFAULT_CONCURRENCY);
std::cout << "Concurrency: " << concurrency << std::endl;
std::cerr << "Concurrency: " << concurrency << std::endl;
static constexpr uint64_t DEFAULT_ITERATIONS = 0;
if (max_iterations_)
max_iterations = *max_iterations_;
else
max_iterations = config->getUInt64("iterations", DEFAULT_ITERATIONS);
std::cout << "Iterations: " << max_iterations << std::endl;
std::cerr << "Iterations: " << max_iterations << std::endl;
static constexpr double DEFAULT_DELAY = 1.0;
if (delay_)
delay = *delay_;
else
delay = config->getDouble("report_delay", DEFAULT_DELAY);
std::cout << "Report delay: " << delay << std::endl;
std::cerr << "Report delay: " << delay << std::endl;
static constexpr double DEFAULT_TIME_LIMIT = 1.0;
static constexpr double DEFAULT_TIME_LIMIT = 0.0;
if (max_time_)
max_time = *max_time_;
else
max_time = config->getDouble("timelimit", DEFAULT_TIME_LIMIT);
std::cout << "Time limit: " << max_time << std::endl;
std::cerr << "Time limit: " << max_time << std::endl;
if (continue_on_error_)
continue_on_error = *continue_on_error_;
else
continue_on_error = config->getBool("continue_on_error", false);
std::cout << "Continue on error: " << continue_on_error << std::endl;
std::cout << "---- Run options ----\n" << std::endl;
std::cerr << "Continue on error: " << continue_on_error << std::endl;
std::cerr << "---- Run options ----\n" << std::endl;
pool.emplace(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, concurrency);
queue.emplace(concurrency);
@ -173,7 +181,7 @@ void Runner::thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> zookee
else if (response.error == Coordination::Error::ZNONODE)
{
/// remove can fail with ZNONODE because of different order of execution
/// of generated create and remove requests
/// of generated create and remove requests
/// this is okay for concurrent runs
if (dynamic_cast<const Coordination::ZooKeeperRemoveResponse *>(&response))
set_exception = false;
@ -203,14 +211,14 @@ void Runner::thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> zookee
try
{
auto response_size = future.get();
double seconds = watch.elapsedSeconds();
auto microseconds = watch.elapsedMicroseconds();
std::lock_guard lock(mutex);
if (request->isReadRequest())
info->addRead(seconds, 1, request->bytesSize() + response_size);
info->addRead(microseconds, 1, request->bytesSize() + response_size);
else
info->addWrite(seconds, 1, request->bytesSize() + response_size);
info->addWrite(microseconds, 1, request->bytesSize() + response_size);
}
catch (...)
{
@ -268,7 +276,7 @@ bool Runner::tryPushRequestInteractively(Coordination::ZooKeeperRequestPtr && re
//if (i % 10000 == 0)
//{
// for (const auto & [op_num, count] : counts)
// std::cout << fmt::format("{}: {}", op_num, count) << std::endl;
// std::cerr << fmt::format("{}: {}", op_num, count) << std::endl;
//}
bool inserted = false;
@ -285,13 +293,13 @@ bool Runner::tryPushRequestInteractively(Coordination::ZooKeeperRequestPtr && re
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
std::cerr << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
}
if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
std::cerr << "Stopping launch of queries. SIGINT received." << std::endl;
return false;
}
@ -300,7 +308,7 @@ bool Runner::tryPushRequestInteractively(Coordination::ZooKeeperRequestPtr && re
printNumberOfRequestsExecuted(requests_executed);
std::lock_guard lock(mutex);
report(info, concurrency);
info->report(concurrency);
delay_watch.restart();
}
}
@ -350,18 +358,21 @@ void Runner::runBenchmark()
printNumberOfRequestsExecuted(requests_executed);
std::lock_guard lock(mutex);
report(info, concurrency);
info->report(concurrency);
DB::WriteBufferFromFile out("result.json");
info->writeJSON(out, concurrency);
}
void Runner::createConnections()
{
DB::EventNotifier::init();
std::cout << "---- Creating connections ---- " << std::endl;
std::cerr << "---- Creating connections ---- " << std::endl;
for (size_t connection_info_idx = 0; connection_info_idx < connection_infos.size(); ++connection_info_idx)
{
const auto & connection_info = connection_infos[connection_info_idx];
std::cout << fmt::format("Creating {} session(s) for:\n"
std::cerr << fmt::format("Creating {} session(s) for:\n"
"- host: {}\n"
"- secure: {}\n"
"- session timeout: {}ms\n"
@ -380,7 +391,7 @@ void Runner::createConnections()
connections_to_info_map[connections.size() - 1] = connection_info_idx;
}
}
std::cout << "---- Done creating connections ----\n" << std::endl;
std::cerr << "---- Done creating connections ----\n" << std::endl;
}
std::shared_ptr<Coordination::ZooKeeper> Runner::getConnection(const ConnectionInfo & connection_info)

View File

@ -19,17 +19,6 @@
using Ports = std::vector<UInt16>;
using Strings = std::vector<std::string>;
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
class Runner
{
public:

View File

@ -1,67 +1,174 @@
#include "Stats.h"
#include <iostream>
void report(std::shared_ptr<Stats> & info, size_t concurrency)
#include <rapidjson/document.h>
#include <rapidjson/rapidjson.h>
#include <rapidjson/writer.h>
#include <rapidjson/stringbuffer.h>
void Stats::StatsCollector::add(uint64_t microseconds, size_t requests_inc, size_t bytes_inc)
{
work_time += microseconds;
requests += requests_inc;
requests_bytes += bytes_inc;
sampler.insert(microseconds);
}
void Stats::addRead(uint64_t microseconds, size_t requests_inc, size_t bytes_inc)
{
read_collector.add(microseconds, requests_inc, bytes_inc);
}
void Stats::addWrite(uint64_t microseconds, size_t requests_inc, size_t bytes_inc)
{
write_collector.add(microseconds, requests_inc, bytes_inc);
}
void Stats::StatsCollector::clear()
{
requests = 0;
work_time = 0;
requests_bytes = 0;
sampler.clear();
}
void Stats::clear()
{
read_collector.clear();
write_collector.clear();
}
std::pair<double, double> Stats::StatsCollector::getThroughput(size_t concurrency)
{
assert(requests != 0);
double seconds = work_time / 1'000'000.0 / concurrency;
return {requests / seconds, requests_bytes / seconds};
}
double Stats::StatsCollector::getPercentile(double percent)
{
return sampler.quantileNearest(percent / 100.0) / 1000.0;
}
void Stats::report(size_t concurrency)
{
std::cerr << "\n";
const auto & read_requests = read_collector.requests;
const auto & write_requests = write_collector.requests;
/// Avoid zeros, nans or exceptions
if (0 == info->read_requests && 0 == info->write_requests)
if (0 == read_requests && 0 == write_requests)
return;
double read_seconds = info->read_work_time / concurrency;
double write_seconds = info->write_work_time / concurrency;
auto [read_rps, read_bps] = read_collector.getThroughput(concurrency);
auto [write_rps, write_bps] = write_collector.getThroughput(concurrency);
std::cerr << "read requests " << info->read_requests << ", write requests " << info->write_requests << ", ";
if (info->errors)
{
std::cerr << "errors " << info->errors << ", ";
}
if (0 != info->read_requests)
std::cerr << "read requests " << read_requests << ", write requests " << write_requests << ", ";
if (errors)
std::cerr << "errors " << errors << ", ";
if (0 != read_requests)
{
std::cerr
<< "Read RPS: " << (info->read_requests / read_seconds) << ", "
<< "Read MiB/s: " << (info->requests_read_bytes / read_seconds / 1048576);
if (0 != info->write_requests)
<< "Read RPS: " << read_rps << ", "
<< "Read MiB/s: " << read_bps / 1048576;
if (0 != write_requests)
std::cerr << ", ";
}
if (0 != info->write_requests)
if (0 != write_requests)
{
std::cerr
<< "Write RPS: " << (info->write_requests / write_seconds) << ", "
<< "Write MiB/s: " << (info->requests_write_bytes / write_seconds / 1048576) << ". "
<< "Write RPS: " << write_rps << ", "
<< "Write MiB/s: " << write_bps / 1048576 << ". "
<< "\n";
}
std::cerr << "\n";
auto print_percentile = [&](double percent, Stats::Sampler & sampler)
auto print_percentile = [&](double percent, Stats::StatsCollector & collector)
{
std::cerr << percent << "%\t\t";
std::cerr << sampler.quantileNearest(percent / 100.0) << " sec.\t";
std::cerr << collector.getPercentile(percent) << " msec.\t";
std::cerr << "\n";
};
if (0 != info->read_requests)
const auto print_all_percentiles = [&](auto & collector)
{
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent, collector);
print_percentile(95, collector);
print_percentile(99, collector);
print_percentile(99.9, collector);
print_percentile(99.99, collector);
};
if (0 != read_requests)
{
std::cerr << "Read sampler:\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent, info->read_sampler);
print_percentile(95, info->read_sampler);
print_percentile(99, info->read_sampler);
print_percentile(99.9, info->read_sampler);
print_percentile(99.99, info->read_sampler);
print_all_percentiles(read_collector);
}
if (0 != info->write_requests)
if (0 != write_requests)
{
std::cerr << "Write sampler:\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent, info->write_sampler);
print_percentile(95, info->write_sampler);
print_percentile(99, info->write_sampler);
print_percentile(99.9, info->write_sampler);
print_percentile(99.99, info->write_sampler);
print_all_percentiles(write_collector);
}
}
void Stats::writeJSON(DB::WriteBuffer & out, size_t concurrency)
{
using namespace rapidjson;
Document results;
auto & allocator = results.GetAllocator();
results.SetObject();
const auto get_results = [&](auto & collector)
{
Value specific_results(kObjectType);
auto [rps, bps] = collector.getThroughput(concurrency);
specific_results.AddMember("requests_per_second", Value(rps), allocator);
specific_results.AddMember("bytes_per_second", Value(bps), allocator);
Value percentiles(kArrayType);
const auto add_percentile = [&](double percent)
{
Value percentile(kObjectType);
percentile.AddMember("percentile", Value(percent), allocator);
percentile.AddMember("value", Value(collector.getPercentile(percent)), allocator);
percentiles.PushBack(percentile, allocator);
};
for (int percent = 0; percent <= 90; percent += 10)
add_percentile(percent);
add_percentile(95);
add_percentile(99);
add_percentile(99.9);
add_percentile(99.99);
specific_results.AddMember("percentiles", percentiles, allocator);
return specific_results;
};
if (read_collector.requests != 0)
results.AddMember("read_results", get_results(read_collector), results.GetAllocator());
if (write_collector.requests != 0)
results.AddMember("write_results", get_results(write_collector), results.GetAllocator());
StringBuffer strbuf;
strbuf.Clear();
Writer<StringBuffer> writer(strbuf);
results.Accept(writer);
const char * output_string = strbuf.GetString();
out.write(output_string, strlen(output_string));
}

View File

@ -5,48 +5,38 @@
#include <AggregateFunctions/ReservoirSampler.h>
#include <base/JSON.h>
struct Stats
{
std::atomic<size_t> read_requests{0};
std::atomic<size_t> write_requests{0};
size_t errors = 0;
size_t requests_write_bytes = 0;
size_t requests_read_bytes = 0;
double read_work_time = 0;
double write_work_time = 0;
using Sampler = ReservoirSampler<double>;
Sampler read_sampler {1 << 16};
Sampler write_sampler {1 << 16};
void addRead(double seconds, size_t requests_inc, size_t bytes_inc)
struct StatsCollector
{
read_work_time += seconds;
read_requests += requests_inc;
requests_read_bytes += bytes_inc;
read_sampler.insert(seconds);
}
std::atomic<size_t> requests{0};
uint64_t requests_bytes = 0;
uint64_t work_time = 0;
Sampler sampler;
void addWrite(double seconds, size_t requests_inc, size_t bytes_inc)
{
write_work_time += seconds;
write_requests += requests_inc;
requests_write_bytes += bytes_inc;
write_sampler.insert(seconds);
}
/// requests/second, bytes/second
std::pair<double, double> getThroughput(size_t concurrency);
double getPercentile(double percent);
void clear()
{
read_requests = 0;
write_requests = 0;
read_work_time = 0;
write_work_time = 0;
requests_read_bytes = 0;
requests_write_bytes = 0;
read_sampler.clear();
write_sampler.clear();
}
void add(uint64_t microseconds, size_t requests_inc, size_t bytes_inc);
void clear();
};
StatsCollector read_collector;
StatsCollector write_collector;
void addRead(uint64_t microseconds, size_t requests_inc, size_t bytes_inc);
void addWrite(uint64_t microseconds, size_t requests_inc, size_t bytes_inc);
void clear();
void report(size_t concurrency);
void writeJSON(DB::WriteBuffer & out, size_t concurrency);
};
void report(std::shared_ptr<Stats> & info, size_t concurrency);