mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 11:32:03 +00:00
Merge pull request #71898 from ClickHouse/fix-benchmark
Fix `clickhouse-benchmark` metrics report
This commit is contained in:
commit
7007ce7596
@ -7,7 +7,6 @@
|
|||||||
#include <random>
|
#include <random>
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
#include <pcg_random.hpp>
|
#include <pcg_random.hpp>
|
||||||
#include <Poco/UUID.h>
|
|
||||||
#include <Poco/UUIDGenerator.h>
|
#include <Poco/UUIDGenerator.h>
|
||||||
#include <Poco/Util/Application.h>
|
#include <Poco/Util/Application.h>
|
||||||
#include <Common/Stopwatch.h>
|
#include <Common/Stopwatch.h>
|
||||||
@ -152,8 +151,6 @@ public:
|
|||||||
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
|
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
|
||||||
global_context->setQueryKindInitial();
|
global_context->setQueryKindInitial();
|
||||||
|
|
||||||
std::cerr << std::fixed << std::setprecision(3);
|
|
||||||
|
|
||||||
/// This is needed to receive blocks with columns of AggregateFunction data type
|
/// This is needed to receive blocks with columns of AggregateFunction data type
|
||||||
/// (example: when using stage = 'with_mergeable_state')
|
/// (example: when using stage = 'with_mergeable_state')
|
||||||
registerAggregateFunctions();
|
registerAggregateFunctions();
|
||||||
@ -226,6 +223,8 @@ private:
|
|||||||
ContextMutablePtr global_context;
|
ContextMutablePtr global_context;
|
||||||
QueryProcessingStage::Enum query_processing_stage;
|
QueryProcessingStage::Enum query_processing_stage;
|
||||||
|
|
||||||
|
WriteBufferFromFileDescriptor log{STDERR_FILENO};
|
||||||
|
|
||||||
std::atomic<size_t> consecutive_errors{0};
|
std::atomic<size_t> consecutive_errors{0};
|
||||||
|
|
||||||
/// Don't execute new queries after timelimit or SIGINT or exception
|
/// Don't execute new queries after timelimit or SIGINT or exception
|
||||||
@ -303,16 +302,16 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
std::cerr << "Loaded " << queries.size() << " queries.\n";
|
log << "Loaded " << queries.size() << " queries.\n" << flush;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void printNumberOfQueriesExecuted(size_t num)
|
void printNumberOfQueriesExecuted(size_t num)
|
||||||
{
|
{
|
||||||
std::cerr << "\nQueries executed: " << num;
|
log << "\nQueries executed: " << num;
|
||||||
if (queries.size() > 1)
|
if (queries.size() > 1)
|
||||||
std::cerr << " (" << (num * 100.0 / queries.size()) << "%)";
|
log << " (" << (num * 100.0 / queries.size()) << "%)";
|
||||||
std::cerr << ".\n";
|
log << ".\n" << flush;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Try push new query and check cancellation conditions
|
/// Try push new query and check cancellation conditions
|
||||||
@ -339,9 +338,10 @@ private:
|
|||||||
|
|
||||||
if (interrupt_listener.check())
|
if (interrupt_listener.check())
|
||||||
{
|
{
|
||||||
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
|
std::cout << "Stopping launch of queries. SIGINT received.\n";
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
double seconds = delay_watch.elapsedSeconds();
|
double seconds = delay_watch.elapsedSeconds();
|
||||||
if (delay > 0 && seconds > delay)
|
if (delay > 0 && seconds > delay)
|
||||||
@ -352,7 +352,6 @@ private:
|
|||||||
: report(comparison_info_per_interval, seconds);
|
: report(comparison_info_per_interval, seconds);
|
||||||
delay_watch.restart();
|
delay_watch.restart();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -438,16 +437,16 @@ private:
|
|||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
std::cerr << "An error occurred while processing the query " << "'" << query << "'"
|
log << "An error occurred while processing the query " << "'" << query << "'"
|
||||||
<< ": " << getCurrentExceptionMessage(false) << std::endl;
|
<< ": " << getCurrentExceptionMessage(false) << '\n';
|
||||||
if (!(continue_on_errors || max_consecutive_errors > ++consecutive_errors))
|
if (!(continue_on_errors || max_consecutive_errors > ++consecutive_errors))
|
||||||
{
|
{
|
||||||
shutdown = true;
|
shutdown = true;
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cerr << getCurrentExceptionMessage(print_stacktrace,
|
log << getCurrentExceptionMessage(print_stacktrace,
|
||||||
true /*check embedded stack trace*/) << std::endl;
|
true /*check embedded stack trace*/) << '\n' << flush;
|
||||||
|
|
||||||
size_t info_index = round_robin ? 0 : connection_index;
|
size_t info_index = round_robin ? 0 : connection_index;
|
||||||
++comparison_info_per_interval[info_index]->errors;
|
++comparison_info_per_interval[info_index]->errors;
|
||||||
@ -504,7 +503,7 @@ private:
|
|||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
std::cerr << "\n";
|
log << "\n";
|
||||||
for (size_t i = 0; i < infos.size(); ++i)
|
for (size_t i = 0; i < infos.size(); ++i)
|
||||||
{
|
{
|
||||||
const auto & info = infos[i];
|
const auto & info = infos[i];
|
||||||
@ -524,31 +523,31 @@ private:
|
|||||||
connection_description += conn->getDescription();
|
connection_description += conn->getDescription();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
std::cerr
|
log
|
||||||
<< connection_description << ", "
|
<< connection_description << ", "
|
||||||
<< "queries: " << info->queries << ", ";
|
<< "queries: " << info->queries.load() << ", ";
|
||||||
if (info->errors)
|
if (info->errors)
|
||||||
{
|
{
|
||||||
std::cerr << "errors: " << info->errors << ", ";
|
log << "errors: " << info->errors << ", ";
|
||||||
}
|
}
|
||||||
std::cerr
|
log
|
||||||
<< "QPS: " << (info->queries / seconds) << ", "
|
<< "QPS: " << fmt::format("{:.3f}", info->queries / seconds) << ", "
|
||||||
<< "RPS: " << (info->read_rows / seconds) << ", "
|
<< "RPS: " << fmt::format("{:.3f}", info->read_rows / seconds) << ", "
|
||||||
<< "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", "
|
<< "MiB/s: " << fmt::format("{:.3f}", info->read_bytes / seconds / 1048576) << ", "
|
||||||
<< "result RPS: " << (info->result_rows / seconds) << ", "
|
<< "result RPS: " << fmt::format("{:.3f}", info->result_rows / seconds) << ", "
|
||||||
<< "result MiB/s: " << (info->result_bytes / seconds / 1048576) << "."
|
<< "result MiB/s: " << fmt::format("{:.3f}", info->result_bytes / seconds / 1048576) << "."
|
||||||
<< "\n";
|
<< "\n";
|
||||||
}
|
}
|
||||||
std::cerr << "\n";
|
log << "\n";
|
||||||
|
|
||||||
auto print_percentile = [&](double percent)
|
auto print_percentile = [&](double percent)
|
||||||
{
|
{
|
||||||
std::cerr << percent << "%\t\t";
|
log << percent << "%\t\t";
|
||||||
for (const auto & info : infos)
|
for (const auto & info : infos)
|
||||||
{
|
{
|
||||||
std::cerr << info->sampler.quantileNearest(percent / 100.0) << " sec.\t";
|
log << fmt::format("{:.3f}", info->sampler.quantileNearest(percent / 100.0)) << " sec.\t";
|
||||||
}
|
}
|
||||||
std::cerr << "\n";
|
log << "\n";
|
||||||
};
|
};
|
||||||
|
|
||||||
for (int percent = 0; percent <= 90; percent += 10)
|
for (int percent = 0; percent <= 90; percent += 10)
|
||||||
@ -559,13 +558,15 @@ private:
|
|||||||
print_percentile(99.9);
|
print_percentile(99.9);
|
||||||
print_percentile(99.99);
|
print_percentile(99.99);
|
||||||
|
|
||||||
std::cerr << "\n" << t_test.compareAndReport(confidence).second << "\n";
|
log << "\n" << t_test.compareAndReport(confidence).second << "\n";
|
||||||
|
|
||||||
if (!cumulative)
|
if (!cumulative)
|
||||||
{
|
{
|
||||||
for (auto & info : infos)
|
for (auto & info : infos)
|
||||||
info->clear();
|
info->clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.next();
|
||||||
}
|
}
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@ -741,7 +742,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
|
|||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << std::endl;
|
std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << '\n';
|
||||||
return getCurrentExceptionCode();
|
return getCurrentExceptionCode();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1 @@
|
|||||||
|
0
|
10
tests/queries/0_stateless/03271_benchmark_metrics.sh
Executable file
10
tests/queries/0_stateless/03271_benchmark_metrics.sh
Executable file
@ -0,0 +1,10 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
|
# shellcheck source=../shell_config.sh
|
||||||
|
. "$CUR_DIR"/../shell_config.sh
|
||||||
|
|
||||||
|
# A query with two seconds sleep cannot be processed with QPS > 0.5
|
||||||
|
$CLICKHOUSE_BENCHMARK --query "SELECT sleep(2)" 2>&1 | grep -m1 -o -P 'QPS: \d+\.\d+' | $CLICKHOUSE_LOCAL --query "SELECT throwIf(extract(line, 'QPS: (.+)')::Float64 > 0.75) FROM table" --input-format LineAsString
|
Loading…
Reference in New Issue
Block a user