Merge pull request #26607 from azat/bench-round-robin

Add round-robin support for clickhouse-benchmark
This commit is contained in:
alexey-milovidov 2021-07-24 20:17:35 +03:00 committed by GitHub
commit ec949e4c66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 260 additions and 17 deletions

View File

@ -58,7 +58,8 @@ namespace ErrorCodes
class Benchmark : public Poco::Util::Application class Benchmark : public Poco::Util::Application
{ {
public: public:
Benchmark(unsigned concurrency_, double delay_, Strings && hosts_, Ports && ports_, Benchmark(unsigned concurrency_, double delay_,
Strings && hosts_, Ports && ports_, bool round_robin_,
bool cumulative_, bool secure_, const String & default_database_, bool cumulative_, bool secure_, const String & default_database_,
const String & user_, const String & password_, const String & stage, const String & user_, const String & password_, const String & stage,
bool randomize_, size_t max_iterations_, double max_time_, bool randomize_, size_t max_iterations_, double max_time_,
@ -66,7 +67,7 @@ public:
const String & query_id_, const String & query_to_execute_, bool continue_on_errors_, const String & query_id_, const String & query_to_execute_, bool continue_on_errors_,
bool reconnect_, bool print_stacktrace_, const Settings & settings_) bool reconnect_, bool print_stacktrace_, const Settings & settings_)
: :
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_), round_robin(round_robin_), concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_), cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), confidence(confidence_), query_id(query_id_), json_path(json_path_), confidence(confidence_), query_id(query_id_),
query_to_execute(query_to_execute_), continue_on_errors(continue_on_errors_), reconnect(reconnect_), query_to_execute(query_to_execute_), continue_on_errors(continue_on_errors_), reconnect(reconnect_),
@ -78,8 +79,8 @@ public:
size_t connections_cnt = std::max(ports_.size(), hosts_.size()); size_t connections_cnt = std::max(ports_.size(), hosts_.size());
connections.reserve(connections_cnt); connections.reserve(connections_cnt);
comparison_info_total.reserve(connections_cnt); comparison_info_total.reserve(round_robin ? 1 : connections_cnt);
comparison_info_per_interval.reserve(connections_cnt); comparison_info_per_interval.reserve(round_robin ? 1 : connections_cnt);
for (size_t i = 0; i < connections_cnt; ++i) for (size_t i = 0; i < connections_cnt; ++i)
{ {
@ -90,11 +91,17 @@ public:
concurrency, concurrency,
cur_host, cur_port, cur_host, cur_port,
default_database_, user_, password_, default_database_, user_, password_,
"", /* cluster */ /* cluster_= */ "",
"", /* cluster_secret */ /* cluster_secret_= */ "",
"benchmark", Protocol::Compression::Enable, secure)); /* client_name_= */ "benchmark",
comparison_info_per_interval.emplace_back(std::make_shared<Stats>()); Protocol::Compression::Enable,
comparison_info_total.emplace_back(std::make_shared<Stats>()); secure));
if (!round_robin || comparison_info_per_interval.empty())
{
comparison_info_per_interval.emplace_back(std::make_shared<Stats>());
comparison_info_total.emplace_back(std::make_shared<Stats>());
}
} }
global_context->makeGlobalContext(); global_context->makeGlobalContext();
@ -134,6 +141,7 @@ private:
using EntryPtr = std::shared_ptr<Entry>; using EntryPtr = std::shared_ptr<Entry>;
using EntryPtrs = std::vector<EntryPtr>; using EntryPtrs = std::vector<EntryPtr>;
bool round_robin;
unsigned concurrency; unsigned concurrency;
double delay; double delay;
@ -396,8 +404,9 @@ private:
std::cerr << getCurrentExceptionMessage(print_stacktrace, std::cerr << getCurrentExceptionMessage(print_stacktrace,
true /*check embedded stack trace*/) << std::endl; true /*check embedded stack trace*/) << std::endl;
comparison_info_per_interval[connection_index]->errors++; size_t info_index = round_robin ? 0 : connection_index;
comparison_info_total[connection_index]->errors++; comparison_info_per_interval[info_index]->errors++;
comparison_info_total[info_index]->errors++;
} }
} }
// Count failed queries toward executed, so that we'd reach // Count failed queries toward executed, so that we'd reach
@ -434,9 +443,10 @@ private:
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
comparison_info_per_interval[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); size_t info_index = round_robin ? 0 : connection_index;
comparison_info_total[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); comparison_info_per_interval[info_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
t_test.add(connection_index, seconds); comparison_info_total[info_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
t_test.add(info_index, seconds);
} }
void report(MultiStats & infos) void report(MultiStats & infos)
@ -454,8 +464,19 @@ private:
double seconds = info->work_time / concurrency; double seconds = info->work_time / concurrency;
std::string connection_description = connections[i]->getDescription();
if (round_robin)
{
connection_description.clear();
for (const auto & conn : connections)
{
if (!connection_description.empty())
connection_description += ", ";
connection_description += conn->getDescription();
}
}
std::cerr std::cerr
<< connections[i]->getDescription() << ", " << connection_description << ", "
<< "queries " << info->queries << ", "; << "queries " << info->queries << ", ";
if (info->errors) if (info->errors)
{ {
@ -588,8 +609,9 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit") ("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("randomize,r", value<bool>()->default_value(false), "randomize order of execution") ("randomize,r", value<bool>()->default_value(false), "randomize order of execution")
("json", value<std::string>()->default_value(""), "write final report to specified file in JSON format") ("json", value<std::string>()->default_value(""), "write final report to specified file in JSON format")
("host,h", value<Strings>()->multitoken(), "") ("host,h", value<Strings>()->multitoken(), "list of hosts")
("port,p", value<Ports>()->multitoken(), "") ("port,p", value<Ports>()->multitoken(), "list of ports")
("roundrobin", "Instead of comparing queries for different --host/--port just pick one random --host/--port for every query and send query to it.")
("cumulative", "prints cumulative data instead of data per interval") ("cumulative", "prints cumulative data instead of data per interval")
("secure,s", "Use TLS connection") ("secure,s", "Use TLS connection")
("user", value<std::string>()->default_value("default"), "") ("user", value<std::string>()->default_value("default"), "")
@ -636,6 +658,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["delay"].as<double>(), options["delay"].as<double>(),
std::move(hosts), std::move(hosts),
std::move(ports), std::move(ports),
options.count("roundrobin"),
options.count("cumulative"), options.count("cumulative"),
options.count("secure"), options.count("secure"),
options["database"].as<std::string>(), options["database"].as<std::string>(),

View File

@ -0,0 +1,33 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<console>true</console>
</logger>
<tcp_port>9000</tcp_port>
<mark_cache_size>0</mark_cache_size>
<users>
<default>
<password></password>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
<access_management>1</access_management>
</default>
</users>
<profiles>
<default/>
</profiles>
<quotas>
<default />
</quotas>
</yandex>

View File

@ -0,0 +1,186 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
BASE="$CUR_DIR/$(basename "${BASH_SOURCE[0]}" .sh)"
server_pids=()
paths=()
function cleanup()
{
local pid
for pid in "${server_pids[@]}"; do
kill -9 "$pid"
done
echo "Test failed." >&2
tail -n1000 "$BASE".clickhouse-server*.log "$BASE".clickhouse-benchmark*.log >&2
rm -f "$BASE".clickhouse-server*.log "$BASE".clickhouse-benchmark*.log
local path
for path in "${paths[@]}"; do
rm -fr "$path"
done
exit 1
}
function start_server()
{
local log=$1 && shift
local server_opts=(
"--config-file=$BASE.config.xml"
"--"
# we will discover the real port later.
"--tcp_port=0"
"--shutdown_wait_unfinished=0"
"--listen_host=127.1"
)
CLICKHOUSE_WATCHDOG_ENABLE=0 $CLICKHOUSE_SERVER_BINARY "${server_opts[@]}" "$@" >& "$log" &
local pid=$!
echo "$pid"
}
function get_server_port()
{
local pid=$1 && shift
local port='' i=0 retries=300
# wait until server will start to listen (max 30 seconds)
while [[ -z $port ]] && [[ $i -lt $retries ]]; do
port="$(lsof -n -a -P -i tcp -s tcp:LISTEN -p "$pid" 2>/dev/null | awk -F'[ :]' '/LISTEN/ { print $(NF-1) }')"
((++i))
sleep 0.1
done
if [[ -z $port ]]; then
echo "Cannot wait for LISTEN socket" >&2
exit 1
fi
echo "$port"
}
function wait_server_port()
{
local port=$1 && shift
# wait for the server to start accepting tcp connections (max 30 seconds)
local i=0 retries=300
while ! $CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$port" --format Null -q 'select 1' 2>/dev/null && [[ $i -lt $retries ]]; do
sleep 0.1
done
if ! $CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$port" --format Null -q 'select 1'; then
echo "Cannot wait until server will start accepting connections on <tcp_port>" >&2
exit 1
fi
}
function execute_query()
{
local port=$1 && shift
$CLICKHOUSE_CLIENT_BINARY --host 127.1 --port "$port" "$@"
}
function make_server()
{
local log=$1 && shift
local pid
pid="$(start_server "$log" "$@")"
local port
port="$(get_server_port "$pid")"
wait_server_port "$port"
echo "$pid" "$port"
}
function terminate_servers()
{
local pid
for pid in "${server_pids[@]}"; do
kill -9 "$pid"
# NOTE: we cannot wait the server pid since it was created in a subshell
done
rm -f "$BASE".clickhouse-server*.log "$BASE".clickhouse-benchmark*.log
local path
for path in "${paths[@]}"; do
rm -fr "$path"
done
}
function test_clickhouse_benchmark_multi_hosts()
{
local benchmark_opts=(
--iterations 10000
--host 127.1 --port "$port1"
--host 127.1 --port "$port2"
--query 'select 1'
--concurrency 10
)
clickhouse-benchmark "${benchmark_opts[@]}" >& "$(mktemp "$BASE.clickhouse-benchmark.XXXXXX.log")"
local queries1 queries2
queries1="$(execute_query "$port1" --query "select value from system.events where event = 'Query'")"
queries2="$(execute_query "$port2" --query "select value from system.events where event = 'Query'")"
if [[ $queries1 -lt 4000 ]] || [[ $queries1 -gt 6000 ]]; then
echo "server1 (port=$port1) handled $queries1 queries" >&2
fi
if [[ $queries2 -lt 4000 ]] || [[ $queries2 -gt 6000 ]]; then
echo "server1 (port=$port2) handled $queries2 queries" >&2
fi
}
function test_clickhouse_benchmark_multi_hosts_roundrobin()
{
local benchmark_opts=(
--iterations 10000
--host 127.1 --port "$port1"
--host 127.1 --port "$port2"
--query 'select 1'
--concurrency 10
--roundrobin
)
clickhouse-benchmark "${benchmark_opts[@]}" >& "$(mktemp "$BASE.clickhouse-benchmark.XXXXXX.log")"
local queries1 queries2
queries1="$(execute_query "$port1" --query "select value from system.events where event = 'Query'")"
queries2="$(execute_query "$port2" --query "select value from system.events where event = 'Query'")"
# NOTE: it should take into account test_clickhouse_benchmark_multi_hosts queries too.
# that's why it is [9000, 11000] instead of [4000, 6000]
if [[ $queries1 -lt 9000 ]] || [[ $queries1 -gt 11000 ]]; then
echo "server1 (port=$port1) handled $queries1 queries (with --roundrobin)" >&2
fi
if [[ $queries2 -lt 9000 ]] || [[ $queries2 -gt 11000 ]]; then
echo "server1 (port=$port2) handled $queries2 queries (with --roundrobin)" >&2
fi
}
function main()
{
trap cleanup EXIT
local path port1 port2
path="$(mktemp -d "$BASE.server1.XXXXXX")"
paths+=( "$path" )
read -r pid1 port1 <<<"$(make_server "$(mktemp "$BASE.clickhouse-server-XXXXXX.log")" --path "$path")"
server_pids+=( "$pid1" )
path="$(mktemp -d "$BASE.server2.XXXXXX")"
paths+=( "$path" )
read -r pid2 port2 <<<"$(make_server "$(mktemp "$BASE.clickhouse-server-XXXXXX.log")" --path "$path")"
server_pids+=( "$pid2" )
test_clickhouse_benchmark_multi_hosts
test_clickhouse_benchmark_multi_hosts_roundrobin
terminate_servers
trap '' EXIT
}
main "$@"

View File

@ -494,6 +494,7 @@
"01684_ssd_cache_dictionary_simple_key", "01684_ssd_cache_dictionary_simple_key",
"01685_ssd_cache_dictionary_complex_key", "01685_ssd_cache_dictionary_complex_key",
"01737_clickhouse_server_wait_server_pool_long", // This test is fully compatible to run in parallel, however under ASAN processes are pretty heavy and may fail under flaky adress check. "01737_clickhouse_server_wait_server_pool_long", // This test is fully compatible to run in parallel, however under ASAN processes are pretty heavy and may fail under flaky adress check.
"01954_clickhouse_benchmark_round_robin", // This test is fully compatible to run in parallel, however under ASAN processes are pretty heavy and may fail under flaky adress check.
"01594_too_low_memory_limits", // This test is fully compatible to run in parallel, however under ASAN processes are pretty heavy and may fail under flaky adress check. "01594_too_low_memory_limits", // This test is fully compatible to run in parallel, however under ASAN processes are pretty heavy and may fail under flaky adress check.
"01760_system_dictionaries", "01760_system_dictionaries",
"01760_polygon_dictionaries", "01760_polygon_dictionaries",