#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /** A tool for evaluating ClickHouse performance. * The tool emulates a case with fixed amount of simultaneously executing queries. */ namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int EMPTY_DATA_PASSED; } class Benchmark : public Poco::Util::Application { public: Benchmark(unsigned concurrency_, double delay_, const String & host_, UInt16 port_, bool secure_, const String & default_database_, const String & user_, const String & password_, const String & stage, bool randomize_, size_t max_iterations_, double max_time_, const String & json_path_, const ConnectionTimeouts & timeouts, const Settings & settings_) : concurrency(concurrency_), delay(delay_), queue(concurrency), connections(concurrency, host_, port_, default_database_, user_, password_, timeouts, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable), randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_), json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency) { std::cerr << std::fixed << std::setprecision(3); /// This is needed to receive blocks with columns of AggregateFunction data type /// (example: when using stage = 'with_mergeable_state') registerAggregateFunctions(); if (stage == "complete") query_processing_stage = QueryProcessingStage::Complete; else if (stage == "fetch_columns") query_processing_stage = QueryProcessingStage::FetchColumns; else if (stage == "with_mergeable_state") query_processing_stage = QueryProcessingStage::WithMergeableState; else throw Exception("Unknown query processing stage: " + stage, ErrorCodes::BAD_ARGUMENTS); } void initialize(Poco::Util::Application & self) { Poco::Util::Application::initialize(self); std::string home_path; const char * home_path_cstr = getenv("HOME"); if (home_path_cstr) home_path = home_path_cstr; configReadClient(config(), home_path); } int main(const std::vector &) { if (!json_path.empty() && Poco::File(json_path).exists()) /// Clear file with previous results Poco::File(json_path).remove(); readQueries(); runBenchmark(); return 0; } private: using Query = std::string; unsigned concurrency; double delay; using Queries = std::vector; Queries queries; using Queue = ConcurrentBoundedQueue; Queue queue; ConnectionPool connections; bool randomize; size_t max_iterations; double max_time; String json_path; Settings settings; Context global_context; QueryProcessingStage::Enum query_processing_stage; /// Don't execute new queries after timelimit or SIGINT or exception std::atomic shutdown{false}; std::atomic queries_executed{0}; struct Stats { Stopwatch watch; std::atomic queries{0}; size_t read_rows = 0; size_t read_bytes = 0; size_t result_rows = 0; size_t result_bytes = 0; using Sampler = ReservoirSampler; Sampler sampler {1 << 16}; void add(double seconds, size_t read_rows_inc, size_t read_bytes_inc, size_t result_rows_inc, size_t result_bytes_inc) { ++queries; read_rows += read_rows_inc; read_bytes += read_bytes_inc; result_rows += result_rows_inc; result_bytes += result_bytes_inc; sampler.insert(seconds); } void clear() { watch.restart(); queries = 0; read_rows = 0; read_bytes = 0; result_rows = 0; result_bytes = 0; sampler.clear(); } }; Stats info_per_interval; Stats info_total; Stopwatch delay_watch; std::mutex mutex; ThreadPool pool; void readQueries() { ReadBufferFromFileDescriptor in(STDIN_FILENO); while (!in.eof()) { std::string query; readText(query, in); assertChar('\n', in); if (!query.empty()) queries.emplace_back(query); } if (queries.empty()) throw Exception("Empty list of queries.", ErrorCodes::EMPTY_DATA_PASSED); std::cerr << "Loaded " << queries.size() << " queries.\n"; } void printNumberOfQueriesExecuted(size_t num) { std::cerr << "\nQueries executed: " << num; if (queries.size() > 1) std::cerr << " (" << (num * 100.0 / queries.size()) << "%)"; std::cerr << ".\n"; } /// Try push new query and check cancellation conditions bool tryPushQueryInteractively(const String & query, InterruptListener & interrupt_listener) { bool inserted = false; while (!inserted) { inserted = queue.tryPush(query, 100); if (shutdown) { /// An exception occurred in a worker return false; } if (max_time > 0 && info_total.watch.elapsedSeconds() >= max_time) { std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n"; return false; } if (interrupt_listener.check()) { std::cout << "Stopping launch of queries. SIGINT recieved.\n"; return false; } if (delay > 0 && delay_watch.elapsedSeconds() > delay) { printNumberOfQueriesExecuted(info_total.queries); report(info_per_interval); delay_watch.restart(); } }; return true; } void runBenchmark() { pcg64 generator(randomSeed()); std::uniform_int_distribution distribution(0, queries.size() - 1); for (size_t i = 0; i < concurrency; ++i) pool.schedule(std::bind(&Benchmark::thread, this, connections.get())); InterruptListener interrupt_listener; info_per_interval.watch.restart(); delay_watch.restart(); /// Push queries into queue for (size_t i = 0; !max_iterations || i < max_iterations; ++i) { size_t query_index = randomize ? distribution(generator) : i % queries.size(); if (!tryPushQueryInteractively(queries[query_index], interrupt_listener)) { shutdown = true; break; } } pool.wait(); info_total.watch.stop(); if (!json_path.empty()) reportJSON(info_total, json_path); printNumberOfQueriesExecuted(info_total.queries); report(info_total); } void thread(ConnectionPool::Entry connection) { Query query; try { /// In these threads we do not accept INT signal. sigset_t sig_set; if (sigemptyset(&sig_set) || sigaddset(&sig_set, SIGINT) || pthread_sigmask(SIG_BLOCK, &sig_set, nullptr)) throwFromErrno("Cannot block signal.", ErrorCodes::CANNOT_BLOCK_SIGNAL); while (true) { bool extracted = false; while (!extracted) { extracted = queue.tryPop(query, 100); if (shutdown || (max_iterations && queries_executed == max_iterations)) return; } execute(connection, query); ++queries_executed; } } catch (...) { shutdown = true; std::cerr << "An error occurred while processing query:\n" << query << "\n"; throw; } } void execute(ConnectionPool::Entry & connection, Query & query) { Stopwatch watch; RemoteBlockInputStream stream(*connection, query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage); Progress progress; stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); stream.readPrefix(); while (Block block = stream.read()) ; stream.readSuffix(); const BlockStreamProfileInfo & info = stream.getProfileInfo(); double seconds = watch.elapsedSeconds(); std::lock_guard lock(mutex); info_per_interval.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes); info_total.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes); } void report(Stats & info) { std::lock_guard lock(mutex); /// Avoid zeros, nans or exceptions if (0 == info.queries) return; double seconds = info.watch.elapsedSeconds(); std::cerr << "\n" << "QPS: " << (info.queries / seconds) << ", " << "RPS: " << (info.read_rows / seconds) << ", " << "MiB/s: " << (info.read_bytes / seconds / 1048576) << ", " << "result RPS: " << (info.result_rows / seconds) << ", " << "result MiB/s: " << (info.result_bytes / seconds / 1048576) << "." << "\n"; auto print_percentile = [&](double percent) { std::cerr << percent << "%\t" << info.sampler.quantileInterpolated(percent / 100.0) << " sec." << std::endl; }; for (int percent = 0; percent <= 90; percent += 10) print_percentile(percent); print_percentile(95); print_percentile(99); print_percentile(99.9); print_percentile(99.99); info.clear(); } void reportJSON(Stats & info, const std::string & filename) { WriteBufferFromFile json_out(filename); std::lock_guard lock(mutex); auto print_key_value = [&](auto key, auto value, bool with_comma = true) { json_out << double_quote << key << ": " << value << (with_comma ? ",\n" : "\n"); }; auto print_percentile = [&json_out, &info](auto percent, bool with_comma = true) { json_out << "\"" << percent << "\"" << ": " << info.sampler.quantileInterpolated(percent / 100.0) << (with_comma ? ",\n" : "\n"); }; json_out << "{\n"; json_out << double_quote << "statistics" << ": {\n"; double seconds = info.watch.elapsedSeconds(); print_key_value("QPS", info.queries / seconds); print_key_value("RPS", info.read_rows / seconds); print_key_value("MiBPS", info.read_bytes / seconds); print_key_value("RPS_result", info.result_rows / seconds); print_key_value("MiBPS_result", info.result_bytes / seconds); print_key_value("num_queries", info.queries.load(), false); json_out << "},\n"; json_out << double_quote << "query_time_percentiles" << ": {\n"; for (int percent = 0; percent <= 90; percent += 10) print_percentile(percent); print_percentile(95); print_percentile(99); print_percentile(99.9); print_percentile(99.99, false); json_out << "}\n"; json_out << "}\n"; } public: ~Benchmark() { shutdown = true; } }; } #ifndef __clang__ #pragma GCC optimize("-fno-var-tracking-assignments") #endif int mainEntryClickHouseBenchmark(int argc, char ** argv) { using namespace DB; bool print_stacktrace = true; try { using boost::program_options::value; boost::program_options::options_description desc("Allowed options"); desc.add_options() ("help", "produce help message") ("concurrency,c", value()->default_value(1), "number of parallel queries") ("delay,d", value()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)") ("stage", value()->default_value("complete"), "request query processing up to specified stage") ("iterations,i", value()->default_value(0), "amount of queries to be executed") ("timelimit,t", value()->default_value(0.), "stop launch of queries after specified time limit") ("randomize,r", value()->default_value(false), "randomize order of execution") ("json", value()->default_value(""), "write final report to specified file in JSON format") ("host,h", value()->default_value("localhost"), "") ("port", value()->default_value(9000), "") ("secure", "Use TLS connection") ("user", value()->default_value("default"), "") ("password", value()->default_value(""), "") ("database", value()->default_value("default"), "") ("stacktrace", "print stack traces of exceptions") #define DECLARE_SETTING(TYPE, NAME, DEFAULT, DESCRIPTION) (#NAME, boost::program_options::value (), DESCRIPTION) APPLY_FOR_SETTINGS(DECLARE_SETTING) #undef DECLARE_SETTING ; boost::program_options::variables_map options; boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); if (options.count("help")) { std::cout << "Usage: " << argv[0] << " [options] < queries.txt\n"; std::cout << desc << "\n"; return 1; } print_stacktrace = options.count("stacktrace"); /// Extract `settings` and `limits` from received `options` Settings settings; #define EXTRACT_SETTING(TYPE, NAME, DEFAULT, DESCRIPTION) \ if (options.count(#NAME)) \ settings.set(#NAME, options[#NAME].as()); APPLY_FOR_SETTINGS(EXTRACT_SETTING) #undef EXTRACT_SETTING UseSSL use_ssl; Benchmark benchmark( options["concurrency"].as(), options["delay"].as(), options["host"].as(), options["port"].as(), options.count("secure"), options["database"].as(), options["user"].as(), options["password"].as(), options["stage"].as(), options["randomize"].as(), options["iterations"].as(), options["timelimit"].as(), options["json"].as(), ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings), settings); return benchmark.run(); } catch (...) { std::cerr << getCurrentExceptionMessage(print_stacktrace, true) << std::endl; return getCurrentExceptionCode(); } return 0; }