Add round-robin support for clickhouse-benchmark

This commit is contained in:
Azat Khuzhin 2021-07-20 22:49:15 +03:00
parent 1fc822b19d
commit 1860808969

View File

@ -58,7 +58,8 @@ namespace ErrorCodes
class Benchmark : public Poco::Util::Application
{
public:
Benchmark(unsigned concurrency_, double delay_, Strings && hosts_, Ports && ports_,
Benchmark(unsigned concurrency_, double delay_,
Strings && hosts_, Ports && ports_, bool round_robin_,
bool cumulative_, bool secure_, const String & default_database_,
const String & user_, const String & password_, const String & stage,
bool randomize_, size_t max_iterations_, double max_time_,
@ -66,7 +67,7 @@ public:
const String & query_id_, const String & query_to_execute_, bool continue_on_errors_,
bool reconnect_, bool print_stacktrace_, const Settings & settings_)
:
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
round_robin(round_robin_), concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), confidence(confidence_), query_id(query_id_),
query_to_execute(query_to_execute_), continue_on_errors(continue_on_errors_), reconnect(reconnect_),
@ -78,8 +79,8 @@ public:
size_t connections_cnt = std::max(ports_.size(), hosts_.size());
connections.reserve(connections_cnt);
comparison_info_total.reserve(connections_cnt);
comparison_info_per_interval.reserve(connections_cnt);
comparison_info_total.reserve(round_robin ? 1 : connections_cnt);
comparison_info_per_interval.reserve(round_robin ? 1 : connections_cnt);
for (size_t i = 0; i < connections_cnt; ++i)
{
@ -90,11 +91,17 @@ public:
concurrency,
cur_host, cur_port,
default_database_, user_, password_,
"", /* cluster */
"", /* cluster_secret */
"benchmark", Protocol::Compression::Enable, secure));
comparison_info_per_interval.emplace_back(std::make_shared<Stats>());
comparison_info_total.emplace_back(std::make_shared<Stats>());
/* cluster_= */ "",
/* cluster_secret_= */ "",
/* client_name_= */ "benchmark",
Protocol::Compression::Enable,
secure));
if (!round_robin || comparison_info_per_interval.empty())
{
comparison_info_per_interval.emplace_back(std::make_shared<Stats>());
comparison_info_total.emplace_back(std::make_shared<Stats>());
}
}
global_context->makeGlobalContext();
@ -134,6 +141,7 @@ private:
using EntryPtr = std::shared_ptr<Entry>;
using EntryPtrs = std::vector<EntryPtr>;
bool round_robin;
unsigned concurrency;
double delay;
@ -394,8 +402,9 @@ private:
std::cerr << getCurrentExceptionMessage(print_stacktrace,
true /*check embedded stack trace*/) << std::endl;
comparison_info_per_interval[connection_index]->errors++;
comparison_info_total[connection_index]->errors++;
size_t info_index = round_robin ? 0 : connection_index;
comparison_info_per_interval[info_index]->errors++;
comparison_info_total[info_index]->errors++;
}
}
// Count failed queries toward executed, so that we'd reach
@ -432,9 +441,10 @@ private:
std::lock_guard lock(mutex);
comparison_info_per_interval[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
comparison_info_total[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
t_test.add(connection_index, seconds);
size_t info_index = round_robin ? 0 : connection_index;
comparison_info_per_interval[info_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
comparison_info_total[info_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
t_test.add(info_index, seconds);
}
void report(MultiStats & infos)
@ -452,8 +462,19 @@ private:
double seconds = info->work_time / concurrency;
std::string connection_description = connections[i]->getDescription();
if (round_robin)
{
connection_description.clear();
for (const auto & conn : connections)
{
if (!connection_description.empty())
connection_description += ",";
connection_description += conn->getDescription();
}
}
std::cerr
<< connections[i]->getDescription() << ", "
<< connection_description << ", "
<< "queries " << info->queries << ", ";
if (info->errors)
{
@ -586,8 +607,9 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("randomize,r", value<bool>()->default_value(false), "randomize order of execution")
("json", value<std::string>()->default_value(""), "write final report to specified file in JSON format")
("host,h", value<Strings>()->multitoken(), "")
("port,p", value<Ports>()->multitoken(), "")
("host,h", value<Strings>()->multitoken(), "list of hosts")
("port,p", value<Ports>()->multitoken(), "list of ports")
("roundrobin", "Instead of comparing queries for different --host/--port just pick one random --host/--port and send query to it")
("cumulative", "prints cumulative data instead of data per interval")
("secure,s", "Use TLS connection")
("user", value<std::string>()->default_value("default"), "")
@ -634,6 +656,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["delay"].as<double>(),
std::move(hosts),
std::move(ports),
options.count("roundrobin"),
options.count("cumulative"),
options.count("secure"),
options["database"].as<std::string>(),