diff --git a/dbms/src/Client/PerformanceTest.cpp b/dbms/src/Client/PerformanceTest.cpp index 2410d4a2c97..d2caeb2125b 100644 --- a/dbms/src/Client/PerformanceTest.cpp +++ b/dbms/src/Client/PerformanceTest.cpp @@ -1,17 +1,20 @@ #include +#include #include #include #include -#include #include +#include +#include #include #include #include #include #include + #include #include #include @@ -33,6 +36,157 @@ namespace ErrorCodes extern const int UNKNOWN_EXCEPTION; } +struct criterionWithPriority { + std::string priority = ""; + size_t value = 0; +}; + +/// Termination criterions. The running test will be terminated in either of two conditions: +/// 1. All criterions marked 'min' are fulfilled +/// or +/// 2. Any criterion marked 'max' is fulfilled +class StopCriterions { +private: + using AbstractConfiguration = Poco::AutoPtr; + using Keys = std::vector; + + void initializeStruct(const std::string priority, + const AbstractConfiguration & stopCriterionsView) + { + Keys keys; + stopCriterionsView->keys(priority, keys); + + for (const std::string & key : keys) { + if (key == "timeout_ms") { + timeout_ms.value = stopCriterionsView->getUInt64(priority + ".timeout_ms"); + timeout_ms.priority = priority; + } else if (key == "read_rows") { + read_rows.value = stopCriterionsView->getUInt64(priority + ".read_rows"); + read_rows.priority = priority; + } else if (key == "bytes_read_uncompressed") { + bytes_read_uncompressed.value = stopCriterionsView->getUInt64(priority + ".bytes_read_uncompressed"); + bytes_read_uncompressed.priority = priority; + } else if (key == "iterations") { + iterations.value = stopCriterionsView->getUInt64(priority + ".iterations"); + iterations.priority = priority; + } else if (key == "min_time_not_changing_for_ms") { + min_time_not_changing_for_ms.value = stopCriterionsView->getUInt64(priority + ".min_time_not_changing_for_ms"); + min_time_not_changing_for_ms.priority = priority; + } else if (key == "max_speed_not_changing_for_ms") { + max_speed_not_changing_for_ms.value = stopCriterionsView->getUInt64(priority + ".max_speed_not_changing_for_ms"); + max_speed_not_changing_for_ms.priority = priority; + } else if (key == "average_speed_not_changing_for_ms") { + average_speed_not_changing_for_ms.value = stopCriterionsView->getUInt64(priority + ".average_speed_not_changing_for_ms"); + average_speed_not_changing_for_ms.priority = priority; + } else { + throw Poco::Exception("Met unkown stop criterion: " + key, 1); + } + + if (priority == "min") { ++number_of_initialized_min; }; + if (priority == "max") { ++number_of_initialized_max; }; + } + } + +public: + StopCriterions() + : number_of_initialized_min(0), number_of_initialized_max(0), + fulfilled_criterions_min(0), fulfilled_criterions_max(0) {} + + void loadFromConfig(const AbstractConfiguration & stopCriterionsView) + { + if (stopCriterionsView->has("min")) { + initializeStruct("min", stopCriterionsView); + } + + if (stopCriterionsView->has("max")) { + initializeStruct("max", stopCriterionsView); + } + } + + struct criterionWithPriority timeout_ms; + struct criterionWithPriority read_rows; + struct criterionWithPriority bytes_read_uncompressed; + struct criterionWithPriority iterations; + struct criterionWithPriority min_time_not_changing_for_ms; + struct criterionWithPriority max_speed_not_changing_for_ms; + struct criterionWithPriority average_speed_not_changing_for_ms; + + /// Hereafter 'min' and 'max', in context of critetions, mean a level of importance + /// Number of initialized properties met in configuration + std::atomic number_of_initialized_min; + std::atomic number_of_initialized_max; + + std::atomic fulfilled_criterions_min; + std::atomic fulfilled_criterions_max; +}; + +struct Stats +{ + Stopwatch watch; + Stopwatch min_time_watch; + Stopwatch max_speed_watch; + Stopwatch average_speed_watch; + // size_t queries; TODO: Do I need this? + size_t read_rows; + size_t read_bytes; + + /// min_time in ms + UInt64 min_time = std::numeric_limits::max(); + size_t max_speed = 0; + size_t average_speed = 0; + size_t number_of_speed_info_batches = 0; + + void update_min_time(const UInt64 min_time_candidate) + { + // TODO: + std::cout << "current min_time: " << min_time << std::endl; + std::cout << "min_time candidate: " << min_time_candidate << std::endl; + std::cout << std::endl; + + if (min_time_candidate < min_time) { + min_time = min_time_candidate; + min_time_watch.restart(); + } + } + + void update_average_speed(const size_t new_speed_info) + { + size_t new_average_speed = ((average_speed * number_of_speed_info_batches) + + new_speed_info); + new_average_speed /= (++number_of_speed_info_batches); + if (new_average_speed != average_speed) { + average_speed = new_average_speed; + average_speed_watch.restart(); + } + } + + void update_max_speed(const size_t max_speed_candidate) + { + if (max_speed_candidate > max_speed) { + max_speed = max_speed_candidate; + max_speed_watch.restart(); + } + } + + void add(size_t read_rows_inc, size_t read_bytes_inc) + { + read_rows += read_rows_inc; + read_bytes += read_bytes_inc; + + size_t new_speed = read_rows_inc / watch.elapsedSeconds(); + update_max_speed(new_speed); + update_average_speed(new_speed); + } + + void clear() + { + watch.restart(); + + read_rows = 0; + read_bytes = 0; + } +}; + class PerformanceTest { public: @@ -66,7 +220,6 @@ public: private: unsigned concurrency; - size_t max_iterations = 1; using Query = std::string; using Queries = std::vector; @@ -86,9 +239,20 @@ private: using StringToVector = std::map< std::string, std::vector >; std::vector testsConfigurations; + struct StopCriterions stopCriterions; + + #define incFulfilledCriterions(CRITERION) \ + stopCriterions.CRITERION.priority == "min" \ + ? ++stopCriterions.fulfilled_criterions_min \ + : ++stopCriterions.fulfilled_criterions_max; + enum ExecutionType { loop, once }; ExecutionType execType; + Stats info_total; + std::mutex mutex; + + void readTestsConfiguration(const Paths & input_files) { testsConfigurations.resize(input_files.size()); @@ -186,16 +350,74 @@ private: throw Poco::Exception("Unknown type " + configExecType + " in :" + testName, 1); - for (const Query & query : queries) { - std::cout << query << std::endl; + if (testConfig->has("stop")) { + AbstractConfig stopCriterionsView(testConfig + ->createView("stop")); + stopCriterions.loadFromConfig(stopCriterionsView); + } else { + throw Poco::Exception("No termination conditions were found", 1); + } - runQuery(query); + if (execType == loop) { + runLoopQuery(queries[0]); + } else { + runQueries(queries); + } + + } + + void runLoopQuery(const Query & query) + { + info_total.watch.restart(); + info_total.min_time_watch.restart(); + + size_t max_iterations = stopCriterions.iterations.value; + size_t i = -1; + + while (true) { + ++i; + + pool.schedule(std::bind( + &PerformanceTest::thread, + this, + connections.IConnectionPool::get() + )); + + queue.push(query); + queue.push(""); /// asking thread to stop + pool.wait(); + + /// check stop criterions + if (max_iterations && i >= max_iterations) { + incFulfilledCriterions(iterations); + } + + if (stopCriterions.number_of_initialized_min && + (stopCriterions.fulfilled_criterions_min >= + stopCriterions.number_of_initialized_min)) { + /// All 'min' criterions are fulfilled + // TODO: + std::cout << "All 'min' criterions are fulfilled" << std::endl; + break; + } + + if (stopCriterions.number_of_initialized_max && + stopCriterions.fulfilled_criterions_max) { + /// Some 'max' criterions are fulfilled + + // TODO: + std::cout << stopCriterions.fulfilled_criterions_max + << "'max' criterions are fulfilled" << std::endl; + break; + } } } - void runQuery(const Query & query) + void runQueries(const Queries & queries) { - // TODO: proceed terminationConditions + info_total.watch.restart(); + info_total.min_time_watch.restart(); + for (size_t i = 0; i < concurrency; ++i) { pool.schedule(std::bind( &PerformanceTest::thread, @@ -204,8 +426,7 @@ private: )); } - for (size_t i = 0; (execType == loop) || i < max_iterations; ++i) { - // TODO: start timer and terminate after time exceeds + for (const Query & query : queries) { queue.push(query); } @@ -219,6 +440,8 @@ private: void thread(ConnectionPool::Entry & connection) { + Stats info_per_query; + Query query; while (true) { @@ -228,26 +451,45 @@ private: if (query.empty()) break; - execute(connection, query); + execute(connection, query, info_per_query); } } - void execute(ConnectionPool::Entry & connection, const Query & query) + void execute(ConnectionPool::Entry & connection, const Query & query, + Stats & info_per_query) { - // Stopwatch watch; + // TODO: + std::cout << "execute? " << query << std::endl; + RemoteBlockInputStream stream(connection, query, &settings, nullptr, Tables()/*, query_processing_stage*/); - // Progress progress; - // stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); + Progress progress; + stream.setProgressCallback( + [&progress, &stream, &info_per_query, this] + (const Progress & value) { + // TODO: + std::cout << "got some progress" << std::endl; + + progress.incrementPiecewiseAtomically(value); + + this->checkFulfilledCriterionsAndUpdate(progress, stream, info_per_query); + }); + + info_per_query.watch.restart(); + info_per_query.average_speed_watch.restart(); + info_per_query.max_speed_watch.restart(); stream.readPrefix(); - while (Block block = stream.read()) //{} - for (auto column : block.getColumns()) { - std::cout << column.name << std::endl; - } + while (Block block = stream.read()) + ; stream.readSuffix(); + // cast nanoseconds to ms + UInt64 queryExecutionTime = info_per_query.min_time_watch.elapsed() + / (1000 * 1000); + info_total.update_min_time(queryExecutionTime); + // const BlockStreamProfileInfo & info = stream.getProfileInfo(); // double seconds = watch.elapsedSeconds(); @@ -257,6 +499,99 @@ private: // info_total.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes); } + void checkFulfilledCriterionsAndUpdate(const Progress & progress, + RemoteBlockInputStream & stream, + Stats & info_per_query) + { + // TODO: + std::cout << "im checking" << std::endl; + + std::lock_guard lock(mutex); + + info_total.add(progress.rows, progress.bytes); + info_per_query.add(progress.rows, progress.bytes); + + size_t max_rows_to_read = stopCriterions.read_rows.value; + if (max_rows_to_read && info_total.read_rows >= max_rows_to_read) { + incFulfilledCriterions(read_rows); + } + + size_t max_bytes_to_read = stopCriterions.bytes_read_uncompressed.value; + if (max_bytes_to_read && info_total.read_bytes >= max_bytes_to_read) { + incFulfilledCriterions(bytes_read_uncompressed); + } + + if (UInt64 max_timeout_ms = stopCriterions.timeout_ms.value) { + /// cast nanoseconds to ms + if ((info_total.watch.elapsed() / (1000 * 1000)) > max_timeout_ms) { + incFulfilledCriterions(timeout_ms); + } + } + + size_t min_time_not_changing_for_ms = stopCriterions + .min_time_not_changing_for_ms.value; + if (min_time_not_changing_for_ms) { + // TODO: + std::cout << "min time is in attention" << std::endl; + + size_t min_time_did_not_change_for = info_total + .min_time_watch + .elapsed() / (1000 * 1000); + // TODO: + std::cout << "min_time_did_not_change_for: " << min_time_did_not_change_for << std::endl; + std::cout << "min_time_not_changing_for_ms: " << min_time_not_changing_for_ms << std::endl; + + if (min_time_did_not_change_for >= min_time_not_changing_for_ms) { + // TODO: + std::cout << "min time yeahh" << std::endl; + incFulfilledCriterions(min_time_not_changing_for_ms); + } + } + + size_t max_speed_not_changing_for_ms = stopCriterions + .max_speed_not_changing_for_ms + .value; + if (max_speed_not_changing_for_ms) { + UInt64 speed_not_changing_time = info_per_query + .max_speed_watch + .elapsed() / (1000 * 1000); + if (speed_not_changing_time >= max_speed_not_changing_for_ms) { + incFulfilledCriterions(max_speed_not_changing_for_ms); + } + } + + size_t average_speed_not_changing_for_ms = stopCriterions + .average_speed_not_changing_for_ms + .value; + if (average_speed_not_changing_for_ms) { + UInt64 speed_not_changing_time = info_per_query + .average_speed_watch + .elapsed() / (1000 * 1000); + if (speed_not_changing_time >= average_speed_not_changing_for_ms) { + incFulfilledCriterions(average_speed_not_changing_for_ms); + } + } + + if (stopCriterions.number_of_initialized_min && + (stopCriterions.fulfilled_criterions_min >= + stopCriterions.number_of_initialized_min)) { + /// All 'min' criterions are fulfilled + // TODO: + std::cout << "All 'min' criterions are fulfilled" << std::endl; + stream.cancel(); + } + + if (stopCriterions.number_of_initialized_max && + stopCriterions.fulfilled_criterions_max) { + /// Some 'max' criterions are fulfilled + + // TODO: + std::cout << stopCriterions.fulfilled_criterions_max + << "'max' criterions are fulfilled" << std::endl; + stream.cancel(); + } + } + void constructSubstitutions(AbstractConfig & substitutionsView, StringToVector & substitutions) {