#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /** A tool for evaluating ClickHouse performance. * The tool emulates a case with fixed amount of simultaneously executing queries. */ namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int EMPTY_DATA_PASSED; } class Benchmark : public Poco::Util::Application { public: Benchmark(unsigned concurrency_, double delay_, const std::vector & hosts_, const std::vector & ports_, bool cumulative_, bool secure_, const String & default_database_, const String & user_, const String & password_, const String & stage, bool randomize_, size_t max_iterations_, double max_time_, const String & json_path_, const Settings & settings_) : concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_), cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_), json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency) { const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable; size_t connections_cnt = std::max(ports_.size(), hosts_.size()); connections.reserve(connections_cnt); comparison_info_total.reserve(connections_cnt); comparison_info_per_interval.reserve(connections_cnt); for (size_t i = 0; i < connections_cnt; ++i) { UInt16 cur_port = i >= ports_.size() ? 9000 : ports_[i]; std::string cur_host = i >= hosts_.size() ? "localhost" : hosts_[i]; connections.emplace_back(std::make_shared(concurrency, cur_host, cur_port, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure)); comparison_info_per_interval.emplace_back(std::make_shared()); comparison_info_total.emplace_back(std::make_shared()); } global_context.makeGlobalContext(); std::cerr << std::fixed << std::setprecision(3); /// This is needed to receive blocks with columns of AggregateFunction data type /// (example: when using stage = 'with_mergeable_state') registerAggregateFunctions(); if (stage == "complete") query_processing_stage = QueryProcessingStage::Complete; else if (stage == "fetch_columns") query_processing_stage = QueryProcessingStage::FetchColumns; else if (stage == "with_mergeable_state") query_processing_stage = QueryProcessingStage::WithMergeableState; else throw Exception("Unknown query processing stage: " + stage, ErrorCodes::BAD_ARGUMENTS); } void initialize(Poco::Util::Application & self [[maybe_unused]]) { std::string home_path; const char * home_path_cstr = getenv("HOME"); if (home_path_cstr) home_path = home_path_cstr; configReadClient(config(), home_path); } int main(const std::vector &) { if (!json_path.empty() && Poco::File(json_path).exists()) /// Clear file with previous results Poco::File(json_path).remove(); readQueries(); runBenchmark(); return 0; } private: using Entry = ConnectionPool::Entry; using EntryPtr = std::shared_ptr; using EntryPtrs = std::vector; unsigned concurrency; double delay; using Query = std::string; using Queries = std::vector; Queries queries; using Queue = ConcurrentBoundedQueue; Queue queue; ConnectionPoolPtrs connections; bool randomize; bool cumulative; size_t max_iterations; double max_time; String json_path; Settings settings; Context global_context; QueryProcessingStage::Enum query_processing_stage; /// Don't execute new queries after timelimit or SIGINT or exception std::atomic shutdown{false}; std::atomic queries_executed{0}; struct Stats { std::atomic queries{0}; size_t read_rows = 0; size_t read_bytes = 0; size_t result_rows = 0; size_t result_bytes = 0; double work_time = 0; using Sampler = ReservoirSampler; Sampler sampler {1 << 16}; void add(double seconds, size_t read_rows_inc, size_t read_bytes_inc, size_t result_rows_inc, size_t result_bytes_inc) { ++queries; work_time += seconds; read_rows += read_rows_inc; read_bytes += read_bytes_inc; result_rows += result_rows_inc; result_bytes += result_bytes_inc; sampler.insert(seconds); } void clear() { queries = 0; work_time = 0; read_rows = 0; read_bytes = 0; result_rows = 0; result_bytes = 0; sampler.clear(); } }; using MultiStats = std::vector>; MultiStats comparison_info_per_interval; MultiStats comparison_info_total; Stopwatch total_watch; Stopwatch delay_watch; std::mutex mutex; ThreadPool pool; void readQueries() { ReadBufferFromFileDescriptor in(STDIN_FILENO); while (!in.eof()) { std::string query; readText(query, in); assertChar('\n', in); if (!query.empty()) queries.emplace_back(query); } if (queries.empty()) throw Exception("Empty list of queries.", ErrorCodes::EMPTY_DATA_PASSED); std::cerr << "Loaded " << queries.size() << " queries.\n"; } void printNumberOfQueriesExecuted(size_t num) { std::cerr << "\nQueries executed: " << num; if (queries.size() > 1) std::cerr << " (" << (num * 100.0 / queries.size()) << "%)"; std::cerr << ".\n"; } /// Try push new query and check cancellation conditions bool tryPushQueryInteractively(const String & query, InterruptListener & interrupt_listener) { bool inserted = false; while (!inserted) { inserted = queue.tryPush(query, 100); if (shutdown) { /// An exception occurred in a worker return false; } if (max_time > 0 && total_watch.elapsedSeconds() >= max_time) { std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n"; return false; } if (interrupt_listener.check()) { std::cout << "Stopping launch of queries. SIGINT recieved.\n"; return false; } if (delay > 0 && delay_watch.elapsedSeconds() > delay) { printNumberOfQueriesExecuted(queries_executed); cumulative ? report(comparison_info_total) : report(comparison_info_per_interval); delay_watch.restart(); } } return true; } void runBenchmark() { pcg64 generator(randomSeed()); std::uniform_int_distribution distribution(0, queries.size() - 1); for (size_t i = 0; i < concurrency; ++i) { EntryPtrs connection_entries; connection_entries.reserve(connections.size()); for (const auto & connection : connections) connection_entries.emplace_back(std::make_shared(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings)))); pool.schedule(std::bind(&Benchmark::thread, this, connection_entries)); } InterruptListener interrupt_listener; delay_watch.restart(); /// Push queries into queue for (size_t i = 0; !max_iterations || i < max_iterations; ++i) { size_t query_index = randomize ? distribution(generator) : i % queries.size(); if (!tryPushQueryInteractively(queries[query_index], interrupt_listener)) { shutdown = true; break; } } pool.wait(); total_watch.stop(); if (!json_path.empty()) reportJSON(comparison_info_total, json_path); printNumberOfQueriesExecuted(queries_executed); report(comparison_info_total); } void thread(EntryPtrs & connection_entries) { Query query; try { /// In these threads we do not accept INT signal. sigset_t sig_set; if (sigemptyset(&sig_set) || sigaddset(&sig_set, SIGINT) || pthread_sigmask(SIG_BLOCK, &sig_set, nullptr)) throwFromErrno("Cannot block signal.", ErrorCodes::CANNOT_BLOCK_SIGNAL); while (true) { bool extracted = false; while (!extracted) { extracted = queue.tryPop(query, 100); if (shutdown || (max_iterations && queries_executed == max_iterations)) return; } execute(connection_entries, query); ++queries_executed; } } catch (...) { shutdown = true; std::cerr << "An error occurred while processing query:\n" << query << "\n"; throw; } } void execute(EntryPtrs & connection_entries, Query & query) { size_t connection_index = rand() % connection_entries.size(); Stopwatch watch; RemoteBlockInputStream stream( *(*connection_entries[connection_index]), query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage); Progress progress; stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); stream.readPrefix(); while (Block block = stream.read()); stream.readSuffix(); const BlockStreamProfileInfo & info = stream.getProfileInfo(); double seconds = watch.elapsedSeconds(); std::lock_guard lock(mutex); comparison_info_per_interval[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); comparison_info_total[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); } void report(MultiStats & infos) { std::lock_guard lock(mutex); std::cerr << "\n"; size_t info_counter = 1; for (auto & info : infos) { /// Avoid zeros, nans or exceptions if (0 == info->queries) return; std::cerr << "connection " << info_counter++ << ", " << "queries " << info->queries << ", " << "QPS: " << (info->queries / info->work_time) << ", " << "RPS: " << (info->read_rows / info->work_time) << ", " << "MiB/s: " << (info->read_bytes / info->work_time / 1048576) << ", " << "result RPS: " << (info->result_rows / info->work_time) << ", " << "result MiB/s: " << (info->result_bytes / info->work_time / 1048576) << "." << "\n"; } std::cerr << "\n\t\t"; for (size_t i = 1; i <= infos.size(); ++i) std::cerr << "connection " << i << "\t"; std::cerr << "\n"; auto print_percentile = [&](double percent) { std::cerr << percent << "%\t\t"; for (auto & info : infos) { std::cerr << info->sampler.quantileInterpolated(percent / 100.0) << " sec." << "\t"; } std::cerr << "\n"; }; for (int percent = 0; percent <= 90; percent += 10) print_percentile(percent); print_percentile(95); print_percentile(99); print_percentile(99.9); print_percentile(99.99); if (!cumulative) for (auto & info : infos) info->clear(); } void reportJSON(MultiStats & infos, const std::string & filename) { WriteBufferFromFile json_out(filename); std::lock_guard lock(mutex); auto print_key_value = [&](auto key, auto value, bool with_comma = true) { json_out << double_quote << key << ": " << value << (with_comma ? ",\n" : "\n"); }; auto print_percentile = [&json_out](Stats & info, auto percent, bool with_comma = true) { json_out << "\"" << percent << "\"" << ": " << info.sampler.quantileInterpolated(percent / 100.0) << (with_comma ? ",\n" : "\n"); }; json_out << "{\n"; for (size_t i = 1; i <= infos.size(); ++i) { auto info = infos[i - 1]; json_out << double_quote << "connection_" + toString(i) << ": {\n"; json_out << double_quote << "statistics" << ": {\n"; print_key_value("QPS", info->queries / info->work_time); print_key_value("RPS", info->read_rows / info->work_time); print_key_value("MiBPS", info->read_bytes / info->work_time); print_key_value("RPS_result", info->result_rows / info->work_time); print_key_value("MiBPS_result", info->result_bytes / info->work_time); print_key_value("num_queries", info->queries.load(), false); json_out << "},\n"; json_out << double_quote << "query_time_percentiles" << ": {\n"; for (int percent = 0; percent <= 90; percent += 10) print_percentile(*info, percent); print_percentile(*info, 95); print_percentile(*info, 99); print_percentile(*info, 99.9); print_percentile(*info, 99.99, false); json_out << "}\n"; json_out << (i == infos.size() ? "}\n" : "},\n"); } json_out << "}\n"; } public: ~Benchmark() { shutdown = true; } }; } #ifndef __clang__ #pragma GCC optimize("-fno-var-tracking-assignments") #endif int mainEntryClickHouseBenchmark(int argc, char ** argv) { using namespace DB; bool print_stacktrace = true; try { using boost::program_options::value; boost::program_options::options_description desc("Allowed options"); desc.add_options() ("help", "produce help message") ("concurrency,c", value()->default_value(1), "number of parallel queries") ("delay,d", value()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)") ("stage", value()->default_value("complete"), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state") ("iterations,i", value()->default_value(0), "amount of queries to be executed") ("timelimit,t", value()->default_value(0.), "stop launch of queries after specified time limit") ("randomize,r", value()->default_value(false), "randomize order of execution") ("json", value()->default_value(""), "write final report to specified file in JSON format") ("host,h", value>()->default_value(std::vector{"localhost"}, "localhost"), "note that more than one host can be described") ("port,p", value>()->default_value(std::vector{9000}, "9000"), "note that more than one port can be described") ("cumulative", "prints cumulative data instead of data per interval") ("secure,s", "Use TLS connection") ("user", value()->default_value("default"), "") ("password", value()->default_value(""), "") ("database", value()->default_value("default"), "") ("stacktrace", "print stack traces of exceptions") ; Settings settings; settings.addProgramOptions(desc); boost::program_options::variables_map options; boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); boost::program_options::notify(options); if (options.count("help")) { std::cout << "Usage: " << argv[0] << " [options] < queries.txt\n"; std::cout << desc << "\n"; return 1; } print_stacktrace = options.count("stacktrace"); UseSSL use_ssl; Benchmark benchmark( options["concurrency"].as(), options["delay"].as(), options["host"].as>(), options["port"].as>(), options.count("cumulative"), options.count("secure"), options["database"].as(), options["user"].as(), options["password"].as(), options["stage"].as(), options["randomize"].as(), options["iterations"].as(), options["timelimit"].as(), options["json"].as(), settings); return benchmark.run(); } catch (...) { std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << std::endl; return getCurrentExceptionCode(); } }