Merge branch 'fix-benchmark' into fp16

This commit is contained in:
Alexey Milovidov 2024-11-14 00:17:12 +01:00
commit 6ea31c81e1
3 changed files with 51 additions and 39 deletions

View File

@ -7,7 +7,6 @@
#include <random> #include <random>
#include <string_view> #include <string_view>
#include <pcg_random.hpp> #include <pcg_random.hpp>
#include <Poco/UUID.h>
#include <Poco/UUIDGenerator.h> #include <Poco/UUIDGenerator.h>
#include <Poco/Util/Application.h> #include <Poco/Util/Application.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
@ -152,8 +151,6 @@ public:
global_context->setClientName(std::string(DEFAULT_CLIENT_NAME)); global_context->setClientName(std::string(DEFAULT_CLIENT_NAME));
global_context->setQueryKindInitial(); global_context->setQueryKindInitial();
std::cerr << std::fixed << std::setprecision(3);
/// This is needed to receive blocks with columns of AggregateFunction data type /// This is needed to receive blocks with columns of AggregateFunction data type
/// (example: when using stage = 'with_mergeable_state') /// (example: when using stage = 'with_mergeable_state')
registerAggregateFunctions(); registerAggregateFunctions();
@ -226,6 +223,8 @@ private:
ContextMutablePtr global_context; ContextMutablePtr global_context;
QueryProcessingStage::Enum query_processing_stage; QueryProcessingStage::Enum query_processing_stage;
WriteBufferFromFileDescriptor log{STDERR_FILENO};
std::atomic<size_t> consecutive_errors{0}; std::atomic<size_t> consecutive_errors{0};
/// Don't execute new queries after timelimit or SIGINT or exception /// Don't execute new queries after timelimit or SIGINT or exception
@ -303,16 +302,16 @@ private:
} }
std::cerr << "Loaded " << queries.size() << " queries.\n"; log << "Loaded " << queries.size() << " queries.\n" << flush;
} }
void printNumberOfQueriesExecuted(size_t num) void printNumberOfQueriesExecuted(size_t num)
{ {
std::cerr << "\nQueries executed: " << num; log << "\nQueries executed: " << num;
if (queries.size() > 1) if (queries.size() > 1)
std::cerr << " (" << (num * 100.0 / queries.size()) << "%)"; log << " (" << (num * 100.0 / queries.size()) << "%)";
std::cerr << ".\n"; log << ".\n" << flush;
} }
/// Try push new query and check cancellation conditions /// Try push new query and check cancellation conditions
@ -339,9 +338,10 @@ private:
if (interrupt_listener.check()) if (interrupt_listener.check())
{ {
std::cout << "Stopping launch of queries. SIGINT received." << std::endl; std::cout << "Stopping launch of queries. SIGINT received.\n";
return false; return false;
} }
}
double seconds = delay_watch.elapsedSeconds(); double seconds = delay_watch.elapsedSeconds();
if (delay > 0 && seconds > delay) if (delay > 0 && seconds > delay)
@ -352,7 +352,6 @@ private:
: report(comparison_info_per_interval, seconds); : report(comparison_info_per_interval, seconds);
delay_watch.restart(); delay_watch.restart();
} }
}
return true; return true;
} }
@ -438,16 +437,16 @@ private:
catch (...) catch (...)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
std::cerr << "An error occurred while processing the query " << "'" << query << "'" log << "An error occurred while processing the query " << "'" << query << "'"
<< ": " << getCurrentExceptionMessage(false) << std::endl; << ": " << getCurrentExceptionMessage(false) << '\n';
if (!(continue_on_errors || max_consecutive_errors > ++consecutive_errors)) if (!(continue_on_errors || max_consecutive_errors > ++consecutive_errors))
{ {
shutdown = true; shutdown = true;
throw; throw;
} }
std::cerr << getCurrentExceptionMessage(print_stacktrace, log << getCurrentExceptionMessage(print_stacktrace,
true /*check embedded stack trace*/) << std::endl; true /*check embedded stack trace*/) << '\n' << flush;
size_t info_index = round_robin ? 0 : connection_index; size_t info_index = round_robin ? 0 : connection_index;
++comparison_info_per_interval[info_index]->errors; ++comparison_info_per_interval[info_index]->errors;
@ -504,7 +503,7 @@ private:
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
std::cerr << "\n"; log << "\n";
for (size_t i = 0; i < infos.size(); ++i) for (size_t i = 0; i < infos.size(); ++i)
{ {
const auto & info = infos[i]; const auto & info = infos[i];
@ -524,31 +523,31 @@ private:
connection_description += conn->getDescription(); connection_description += conn->getDescription();
} }
} }
std::cerr log
<< connection_description << ", " << connection_description << ", "
<< "queries: " << info->queries << ", "; << "queries: " << info->queries.load() << ", ";
if (info->errors) if (info->errors)
{ {
std::cerr << "errors: " << info->errors << ", "; log << "errors: " << info->errors << ", ";
} }
std::cerr log
<< "QPS: " << (info->queries / seconds) << ", " << "QPS: " << fmt::format("{:.3f}", info->queries / seconds) << ", "
<< "RPS: " << (info->read_rows / seconds) << ", " << "RPS: " << fmt::format("{:.3f}", info->read_rows / seconds) << ", "
<< "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", " << "MiB/s: " << fmt::format("{:.3f}", info->read_bytes / seconds / 1048576) << ", "
<< "result RPS: " << (info->result_rows / seconds) << ", " << "result RPS: " << fmt::format("{:.3f}", info->result_rows / seconds) << ", "
<< "result MiB/s: " << (info->result_bytes / seconds / 1048576) << "." << "result MiB/s: " << fmt::format("{:.3f}", info->result_bytes / seconds / 1048576) << "."
<< "\n"; << "\n";
} }
std::cerr << "\n"; log << "\n";
auto print_percentile = [&](double percent) auto print_percentile = [&](double percent)
{ {
std::cerr << percent << "%\t\t"; log << percent << "%\t\t";
for (const auto & info : infos) for (const auto & info : infos)
{ {
std::cerr << info->sampler.quantileNearest(percent / 100.0) << " sec.\t"; log << fmt::format("{:.3f}", info->sampler.quantileNearest(percent / 100.0)) << " sec.\t";
} }
std::cerr << "\n"; log << "\n";
}; };
for (int percent = 0; percent <= 90; percent += 10) for (int percent = 0; percent <= 90; percent += 10)
@ -559,13 +558,15 @@ private:
print_percentile(99.9); print_percentile(99.9);
print_percentile(99.99); print_percentile(99.99);
std::cerr << "\n" << t_test.compareAndReport(confidence).second << "\n"; log << "\n" << t_test.compareAndReport(confidence).second << "\n";
if (!cumulative) if (!cumulative)
{ {
for (auto & info : infos) for (auto & info : infos)
info->clear(); info->clear();
} }
log.next();
} }
public: public:
@ -741,7 +742,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
} }
catch (...) catch (...)
{ {
std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << std::endl; std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << '\n';
return getCurrentExceptionCode(); return getCurrentExceptionCode();
} }
} }

View File

@ -0,0 +1 @@
0

View File

@ -0,0 +1,10 @@
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# A query with two seconds sleep cannot be processed with QPS > 0.5
$CLICKHOUSE_BENCHMARK --query "SELECT sleep(2)" 2>&1 | grep -m1 -o -P 'QPS: \d+\.\d+' | $CLICKHOUSE_LOCAL --query "SELECT throwIf(extract(line, 'QPS: (.+)')::Float64 > 0.75) FROM table" --input-format LineAsString