diff --git a/dbms/programs/benchmark/Benchmark.cpp b/dbms/programs/benchmark/Benchmark.cpp index c69e9a54feb..fedb7f778a1 100644 --- a/dbms/programs/benchmark/Benchmark.cpp +++ b/dbms/programs/benchmark/Benchmark.cpp @@ -32,6 +32,7 @@ #include #include #include +#include /** A tool for evaluating ClickHouse performance. @@ -41,6 +42,8 @@ namespace DB { +using Ports = std::vector; + namespace ErrorCodes { extern const int BAD_ARGUMENTS; @@ -50,17 +53,34 @@ namespace ErrorCodes class Benchmark : public Poco::Util::Application { public: - Benchmark(unsigned concurrency_, double delay_, - const String & host_, UInt16 port_, bool secure_, const String & default_database_, + Benchmark(unsigned concurrency_, double delay_, Strings && hosts_, Ports && ports_, + bool cumulative_, bool secure_, const String & default_database_, const String & user_, const String & password_, const String & stage, bool randomize_, size_t max_iterations_, double max_time_, - const String & json_path_, const Settings & settings_) + const String & json_path_, size_t confidence_, const Settings & settings_) : - concurrency(concurrency_), delay(delay_), queue(concurrency), - connections(concurrency, host_, port_, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable), - randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_), - json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency) + concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_), + cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_), + confidence(confidence_), json_path(json_path_), settings(settings_), + global_context(Context::createGlobal()), pool(concurrency) { + const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable; + size_t connections_cnt = std::max(ports_.size(), hosts_.size()); + + connections.reserve(connections_cnt); + comparison_info_total.reserve(connections_cnt); + comparison_info_per_interval.reserve(connections_cnt); + + for (size_t i = 0; i < connections_cnt; ++i) + { + UInt16 cur_port = i >= ports_.size() ? 9000 : ports_[i]; + std::string cur_host = i >= hosts_.size() ? "localhost" : hosts_[i]; + + connections.emplace_back(std::make_unique(concurrency, cur_host, cur_port, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure)); + comparison_info_per_interval.emplace_back(std::make_shared()); + comparison_info_total.emplace_back(std::make_shared()); + } + global_context.makeGlobalContext(); std::cerr << std::fixed << std::setprecision(3); @@ -101,21 +121,29 @@ public: } private: - using Query = std::string; + using Entry = ConnectionPool::Entry; + using EntryPtr = std::shared_ptr; + using EntryPtrs = std::vector; unsigned concurrency; double delay; + using Query = std::string; using Queries = std::vector; Queries queries; using Queue = ConcurrentBoundedQueue; Queue queue; - ConnectionPool connections; + using ConnectionPoolUniq = std::unique_ptr; + using ConnectionPoolUniqs = std::vector; + ConnectionPoolUniqs connections; + bool randomize; + bool cumulative; size_t max_iterations; double max_time; + size_t confidence; String json_path; Settings settings; Context global_context; @@ -128,12 +156,12 @@ private: struct Stats { - Stopwatch watch; std::atomic queries{0}; size_t read_rows = 0; size_t read_bytes = 0; size_t result_rows = 0; size_t result_bytes = 0; + double work_time = 0; using Sampler = ReservoirSampler; Sampler sampler {1 << 16}; @@ -141,6 +169,7 @@ private: void add(double seconds, size_t read_rows_inc, size_t read_bytes_inc, size_t result_rows_inc, size_t result_bytes_inc) { ++queries; + work_time += seconds; read_rows += read_rows_inc; read_bytes += read_bytes_inc; result_rows += result_rows_inc; @@ -150,8 +179,8 @@ private: void clear() { - watch.restart(); queries = 0; + work_time = 0; read_rows = 0; read_bytes = 0; result_rows = 0; @@ -160,15 +189,18 @@ private: } }; - Stats info_per_interval; - Stats info_total; + using MultiStats = std::vector>; + MultiStats comparison_info_per_interval; + MultiStats comparison_info_total; + StudentTTest t_test; + + Stopwatch total_watch; Stopwatch delay_watch; std::mutex mutex; ThreadPool pool; - void readQueries() { ReadBufferFromFileDescriptor in(STDIN_FILENO); @@ -213,7 +245,7 @@ private: return false; } - if (max_time > 0 && info_total.watch.elapsedSeconds() >= max_time) + if (max_time > 0 && total_watch.elapsedSeconds() >= max_time) { std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n"; return false; @@ -227,8 +259,8 @@ private: if (delay > 0 && delay_watch.elapsedSeconds() > delay) { - printNumberOfQueriesExecuted(info_total.queries); - report(info_per_interval); + printNumberOfQueriesExecuted(queries_executed); + cumulative ? report(comparison_info_total) : report(comparison_info_per_interval); delay_watch.restart(); } } @@ -242,11 +274,17 @@ private: std::uniform_int_distribution distribution(0, queries.size() - 1); for (size_t i = 0; i < concurrency; ++i) - pool.schedule(std::bind(&Benchmark::thread, this, - connections.get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings)))); + { + EntryPtrs connection_entries; + connection_entries.reserve(connections.size()); + + for (const auto & connection : connections) + connection_entries.emplace_back(std::make_shared(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings)))); + + pool.schedule(std::bind(&Benchmark::thread, this, connection_entries)); + } InterruptListener interrupt_listener; - info_per_interval.watch.restart(); delay_watch.restart(); /// Push queries into queue @@ -262,20 +300,24 @@ private: } pool.wait(); - info_total.watch.stop(); + total_watch.stop(); if (!json_path.empty()) - reportJSON(info_total, json_path); + reportJSON(comparison_info_total, json_path); - printNumberOfQueriesExecuted(info_total.queries); - report(info_total); + printNumberOfQueriesExecuted(queries_executed); + report(comparison_info_total); } - void thread(ConnectionPool::Entry connection) + void thread(EntryPtrs & connection_entries) { Query query; + /// Randomly choosing connection index + pcg64 generator(randomSeed()); + std::uniform_int_distribution distribution(0, connection_entries.size() - 1); + try { /// In these threads we do not accept INT signal. @@ -296,8 +338,7 @@ private: if (shutdown || (max_iterations && queries_executed == max_iterations)) return; } - - execute(connection, query); + execute(connection_entries, query, distribution(generator)); ++queries_executed; } } @@ -309,20 +350,19 @@ private: } } - - void execute(ConnectionPool::Entry & connection, Query & query) + void execute(EntryPtrs & connection_entries, Query & query, size_t connection_index) { Stopwatch watch; RemoteBlockInputStream stream( - *connection, + *(*connection_entries[connection_index]), query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage); Progress progress; stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); }); stream.readPrefix(); - while (Block block = stream.read()) - ; + while (Block block = stream.read()); + stream.readSuffix(); const BlockStreamProfileInfo & info = stream.getProfileInfo(); @@ -330,33 +370,47 @@ private: double seconds = watch.elapsedSeconds(); std::lock_guard lock(mutex); - info_per_interval.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); - info_total.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); + + comparison_info_per_interval[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); + comparison_info_total[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes); + t_test.add(connection_index, seconds); } - - void report(Stats & info) + void report(MultiStats & infos) { std::lock_guard lock(mutex); - /// Avoid zeros, nans or exceptions - if (0 == info.queries) - return; + std::cerr << "\n"; + for (size_t i = 0; i < infos.size(); ++i) + { + const auto & info = infos[i]; - double seconds = info.watch.elapsedSeconds(); + /// Avoid zeros, nans or exceptions + if (0 == info->queries) + return; - std::cerr - << "\n" - << "QPS: " << (info.queries / seconds) << ", " - << "RPS: " << (info.read_rows / seconds) << ", " - << "MiB/s: " << (info.read_bytes / seconds / 1048576) << ", " - << "result RPS: " << (info.result_rows / seconds) << ", " - << "result MiB/s: " << (info.result_bytes / seconds / 1048576) << "." - << "\n"; + double seconds = info->work_time / concurrency; + + std::cerr + << connections[i]->getDescription() << ", " + << "queries " << info->queries << ", " + << "QPS: " << (info->queries / seconds) << ", " + << "RPS: " << (info->read_rows / seconds) << ", " + << "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", " + << "result RPS: " << (info->result_rows / seconds) << ", " + << "result MiB/s: " << (info->result_bytes / seconds / 1048576) << "." + << "\n"; + } + std::cerr << "\n"; auto print_percentile = [&](double percent) { - std::cerr << percent << "%\t" << info.sampler.quantileInterpolated(percent / 100.0) << " sec." << std::endl; + std::cerr << percent << "%\t\t"; + for (const auto & info : infos) + { + std::cerr << info->sampler.quantileInterpolated(percent / 100.0) << " sec." << "\t"; + } + std::cerr << "\n"; }; for (int percent = 0; percent <= 90; percent += 10) @@ -367,10 +421,16 @@ private: print_percentile(99.9); print_percentile(99.99); - info.clear(); + std::cerr << "\n" << t_test.compareAndReport(confidence).second << "\n"; + + if (!cumulative) + { + for (auto & info : infos) + info->clear(); + } } - void reportJSON(Stats & info, const std::string & filename) + void reportJSON(MultiStats & infos, const std::string & filename) { WriteBufferFromFile json_out(filename); @@ -381,36 +441,41 @@ private: json_out << double_quote << key << ": " << value << (with_comma ? ",\n" : "\n"); }; - auto print_percentile = [&json_out, &info](auto percent, bool with_comma = true) + auto print_percentile = [&json_out](Stats & info, auto percent, bool with_comma = true) { json_out << "\"" << percent << "\"" << ": " << info.sampler.quantileInterpolated(percent / 100.0) << (with_comma ? ",\n" : "\n"); }; json_out << "{\n"; - json_out << double_quote << "statistics" << ": {\n"; + for (size_t i = 0; i < infos.size(); ++i) + { + const auto & info = infos[i]; - double seconds = info.watch.elapsedSeconds(); - print_key_value("QPS", info.queries / seconds); - print_key_value("RPS", info.read_rows / seconds); - print_key_value("MiBPS", info.read_bytes / seconds); - print_key_value("RPS_result", info.result_rows / seconds); - print_key_value("MiBPS_result", info.result_bytes / seconds); - print_key_value("num_queries", info.queries.load(), false); + json_out << double_quote << connections[i]->getDescription() << ": {\n"; + json_out << double_quote << "statistics" << ": {\n"; - json_out << "},\n"; + print_key_value("QPS", info->queries / info->work_time); + print_key_value("RPS", info->read_rows / info->work_time); + print_key_value("MiBPS", info->read_bytes / info->work_time); + print_key_value("RPS_result", info->result_rows / info->work_time); + print_key_value("MiBPS_result", info->result_bytes / info->work_time); + print_key_value("num_queries", info->queries.load(), false); - json_out << double_quote << "query_time_percentiles" << ": {\n"; + json_out << "},\n"; + json_out << double_quote << "query_time_percentiles" << ": {\n"; - for (int percent = 0; percent <= 90; percent += 10) - print_percentile(percent); + for (int percent = 0; percent <= 90; percent += 10) + print_percentile(*info, percent); - print_percentile(95); - print_percentile(99); - print_percentile(99.9); - print_percentile(99.99, false); + print_percentile(*info, 95); + print_percentile(*info, 99); + print_percentile(*info, 99.9); + print_percentile(*info, 99.99, false); - json_out << "}\n"; + json_out << "}\n"; + json_out << (i == infos.size() - 1 ? "}\n" : "},\n"); + } json_out << "}\n"; } @@ -449,13 +514,15 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv) ("timelimit,t", value()->default_value(0.), "stop launch of queries after specified time limit") ("randomize,r", value()->default_value(false), "randomize order of execution") ("json", value()->default_value(""), "write final report to specified file in JSON format") - ("host,h", value()->default_value("localhost"), "") - ("port", value()->default_value(9000), "") + ("host,h", value()->multitoken(), "") + ("port,p", value()->multitoken(), "") + ("cumulative", "prints cumulative data instead of data per interval") ("secure,s", "Use TLS connection") ("user", value()->default_value("default"), "") ("password", value()->default_value(""), "") ("database", value()->default_value("default"), "") ("stacktrace", "print stack traces of exceptions") + ("confidence", value()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)") ; Settings settings; @@ -475,12 +542,15 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv) print_stacktrace = options.count("stacktrace"); UseSSL use_ssl; + Ports ports = options.count("port") ? options["port"].as() : Ports({9000}); + Strings hosts = options.count("host") ? options["host"].as() : Strings({"localhost"}); Benchmark benchmark( options["concurrency"].as(), options["delay"].as(), - options["host"].as(), - options["port"].as(), + std::move(hosts), + std::move(ports), + options.count("cumulative"), options.count("secure"), options["database"].as(), options["user"].as(), @@ -490,6 +560,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv) options["iterations"].as(), options["timelimit"].as(), options["json"].as(), + options["confidence"].as(), settings); return benchmark.run(); } diff --git a/dbms/src/Client/ConnectionPool.h b/dbms/src/Client/ConnectionPool.h index d18be29b2b3..322bad04794 100644 --- a/dbms/src/Client/ConnectionPool.h +++ b/dbms/src/Client/ConnectionPool.h @@ -88,6 +88,10 @@ public: { return host; } + std::string getDescription() const + { + return host + ":" + toString(port); + } protected: /** Creates a new object to put in the pool. */ diff --git a/dbms/src/Common/StudentTTest.cpp b/dbms/src/Common/StudentTTest.cpp new file mode 100644 index 00000000000..170f06c2877 --- /dev/null +++ b/dbms/src/Common/StudentTTest.cpp @@ -0,0 +1,169 @@ +#include "StudentTTest.h" + +#include +#include +#include +#include +#include + + +namespace +{ + /// First row corresponds to infinity size of distributions case + const double students_table[101][6] = + { + { 1.282, 1.645, 1.960, 2.326, 2.576, 3.090 }, + { 3.078, 6.314, 12.706, 31.821, 63.657, 318.313 }, + { 1.886, 2.920, 4.303, 6.965, 9.925, 22.327 }, + { 1.638, 2.353, 3.182, 4.541, 5.841, 10.215 }, + { 1.533, 2.132, 2.776, 3.747, 4.604, 7.173 }, + { 1.476, 2.015, 2.571, 3.365, 4.032, 5.893 }, + { 1.440, 1.943, 2.447, 3.143, 3.707, 5.208 }, + { 1.415, 1.895, 2.365, 2.998, 3.499, 4.782 }, + { 1.397, 1.860, 2.306, 2.896, 3.355, 4.499 }, + { 1.383, 1.833, 2.262, 2.821, 3.250, 4.296 }, + { 1.372, 1.812, 2.228, 2.764, 3.169, 4.143 }, + { 1.363, 1.796, 2.201, 2.718, 3.106, 4.024 }, + { 1.356, 1.782, 2.179, 2.681, 3.055, 3.929 }, + { 1.350, 1.771, 2.160, 2.650, 3.012, 3.852 }, + { 1.345, 1.761, 2.145, 2.624, 2.977, 3.787 }, + { 1.341, 1.753, 2.131, 2.602, 2.947, 3.733 }, + { 1.337, 1.746, 2.120, 2.583, 2.921, 3.686 }, + { 1.333, 1.740, 2.110, 2.567, 2.898, 3.646 }, + { 1.330, 1.734, 2.101, 2.552, 2.878, 3.610 }, + { 1.328, 1.729, 2.093, 2.539, 2.861, 3.579 }, + { 1.325, 1.725, 2.086, 2.528, 2.845, 3.552 }, + { 1.323, 1.721, 2.080, 2.518, 2.831, 3.527 }, + { 1.321, 1.717, 2.074, 2.508, 2.819, 3.505 }, + { 1.319, 1.714, 2.069, 2.500, 2.807, 3.485 }, + { 1.318, 1.711, 2.064, 2.492, 2.797, 3.467 }, + { 1.316, 1.708, 2.060, 2.485, 2.787, 3.450 }, + { 1.315, 1.706, 2.056, 2.479, 2.779, 3.435 }, + { 1.314, 1.703, 2.052, 2.473, 2.771, 3.421 }, + { 1.313, 1.701, 2.048, 2.467, 2.763, 3.408 }, + { 1.311, 1.699, 2.045, 2.462, 2.756, 3.396 }, + { 1.310, 1.697, 2.042, 2.457, 2.750, 3.385 }, + { 1.309, 1.696, 2.040, 2.453, 2.744, 3.375 }, + { 1.309, 1.694, 2.037, 2.449, 2.738, 3.365 }, + { 1.308, 1.692, 2.035, 2.445, 2.733, 3.356 }, + { 1.307, 1.691, 2.032, 2.441, 2.728, 3.348 }, + { 1.306, 1.690, 2.030, 2.438, 2.724, 3.340 }, + { 1.306, 1.688, 2.028, 2.434, 2.719, 3.333 }, + { 1.305, 1.687, 2.026, 2.431, 2.715, 3.326 }, + { 1.304, 1.686, 2.024, 2.429, 2.712, 3.319 }, + { 1.304, 1.685, 2.023, 2.426, 2.708, 3.313 }, + { 1.303, 1.684, 2.021, 2.423, 2.704, 3.307 }, + { 1.303, 1.683, 2.020, 2.421, 2.701, 3.301 }, + { 1.302, 1.682, 2.018, 2.418, 2.698, 3.296 }, + { 1.302, 1.681, 2.017, 2.416, 2.695, 3.291 }, + { 1.301, 1.680, 2.015, 2.414, 2.692, 3.286 }, + { 1.301, 1.679, 2.014, 2.412, 2.690, 3.281 }, + { 1.300, 1.679, 2.013, 2.410, 2.687, 3.277 }, + { 1.300, 1.678, 2.012, 2.408, 2.685, 3.273 }, + { 1.299, 1.677, 2.011, 2.407, 2.682, 3.269 }, + { 1.299, 1.677, 2.010, 2.405, 2.680, 3.265 }, + { 1.299, 1.676, 2.009, 2.403, 2.678, 3.261 }, + { 1.298, 1.675, 2.008, 2.402, 2.676, 3.258 }, + { 1.298, 1.675, 2.007, 2.400, 2.674, 3.255 }, + { 1.298, 1.674, 2.006, 2.399, 2.672, 3.251 }, + { 1.297, 1.674, 2.005, 2.397, 2.670, 3.248 }, + { 1.297, 1.673, 2.004, 2.396, 2.668, 3.245 }, + { 1.297, 1.673, 2.003, 2.395, 2.667, 3.242 }, + { 1.297, 1.672, 2.002, 2.394, 2.665, 3.239 }, + { 1.296, 1.672, 2.002, 2.392, 2.663, 3.237 }, + { 1.296, 1.671, 2.001, 2.391, 2.662, 3.234 }, + { 1.296, 1.671, 2.000, 2.390, 2.660, 3.232 }, + { 1.296, 1.670, 2.000, 2.389, 2.659, 3.229 }, + { 1.295, 1.670, 1.999, 2.388, 2.657, 3.227 }, + { 1.295, 1.669, 1.998, 2.387, 2.656, 3.225 }, + { 1.295, 1.669, 1.998, 2.386, 2.655, 3.223 }, + { 1.295, 1.669, 1.997, 2.385, 2.654, 3.220 }, + { 1.295, 1.668, 1.997, 2.384, 2.652, 3.218 }, + { 1.294, 1.668, 1.996, 2.383, 2.651, 3.216 }, + { 1.294, 1.668, 1.995, 2.382, 2.650, 3.214 }, + { 1.294, 1.667, 1.995, 2.382, 2.649, 3.213 }, + { 1.294, 1.667, 1.994, 2.381, 2.648, 3.211 }, + { 1.294, 1.667, 1.994, 2.380, 2.647, 3.209 }, + { 1.293, 1.666, 1.993, 2.379, 2.646, 3.207 }, + { 1.293, 1.666, 1.993, 2.379, 2.645, 3.206 }, + { 1.293, 1.666, 1.993, 2.378, 2.644, 3.204 }, + { 1.293, 1.665, 1.992, 2.377, 2.643, 3.202 }, + { 1.293, 1.665, 1.992, 2.376, 2.642, 3.201 }, + { 1.293, 1.665, 1.991, 2.376, 2.641, 3.199 }, + { 1.292, 1.665, 1.991, 2.375, 2.640, 3.198 }, + { 1.292, 1.664, 1.990, 2.374, 2.640, 3.197 }, + { 1.292, 1.664, 1.990, 2.374, 2.639, 3.195 }, + { 1.292, 1.664, 1.990, 2.373, 2.638, 3.194 }, + { 1.292, 1.664, 1.989, 2.373, 2.637, 3.193 }, + { 1.292, 1.663, 1.989, 2.372, 2.636, 3.191 }, + { 1.292, 1.663, 1.989, 2.372, 2.636, 3.190 }, + { 1.292, 1.663, 1.988, 2.371, 2.635, 3.189 }, + { 1.291, 1.663, 1.988, 2.370, 2.634, 3.188 }, + { 1.291, 1.663, 1.988, 2.370, 2.634, 3.187 }, + { 1.291, 1.662, 1.987, 2.369, 2.633, 3.185 }, + { 1.291, 1.662, 1.987, 2.369, 2.632, 3.184 }, + { 1.291, 1.662, 1.987, 2.368, 2.632, 3.183 }, + { 1.291, 1.662, 1.986, 2.368, 2.631, 3.182 }, + { 1.291, 1.662, 1.986, 2.368, 2.630, 3.181 }, + { 1.291, 1.661, 1.986, 2.367, 2.630, 3.180 }, + { 1.291, 1.661, 1.986, 2.367, 2.629, 3.179 }, + { 1.291, 1.661, 1.985, 2.366, 2.629, 3.178 }, + { 1.290, 1.661, 1.985, 2.366, 2.628, 3.177 }, + { 1.290, 1.661, 1.985, 2.365, 2.627, 3.176 }, + { 1.290, 1.661, 1.984, 2.365, 2.627, 3.175 }, + { 1.290, 1.660, 1.984, 2.365, 2.626, 3.175 }, + { 1.290, 1.660, 1.984, 2.364, 2.626, 3.174 }, + }; + + const double confidence_level[6] = { 80, 90, 95, 98, 99, 99.5 }; +} + + +void StudentTTest::clear() +{ + data[0].clear(); + data[1].clear(); +} + +void StudentTTest::add(size_t distribution, double value) +{ + if (distribution > 1) + throw std::logic_error("Distribution number for Student's T-Test must be eigther 0 or 1"); + data[distribution].add(value); +} + +/// Confidence_level_index can be set in range [0, 5]. Corresponding values can be found above. +std::pair StudentTTest::compareAndReport(size_t confidence_level_index) const +{ + if (confidence_level_index > 5) + confidence_level_index = 5; + + if (data[0].size == 0 || data[1].size == 0) + return {true, ""}; + + size_t degrees_of_freedom = (data[0].size - 1) + (data[1].size - 1); + + double table_value = students_table[degrees_of_freedom > 100 ? 0 : degrees_of_freedom][confidence_level_index]; + + double pooled_standard_deviation = sqrt(((data[0].size - 1) * data[0].var() + (data[1].size - 1) * data[1].var()) / degrees_of_freedom); + + double t_statistic = pooled_standard_deviation * sqrt(1.0 / data[0].size + 1.0 / data[1].size); + + double mean_difference = fabs(data[0].avg() - data[1].avg()); + + double mean_confidence_interval = table_value * t_statistic; + + std::stringstream ss; + if (mean_difference > mean_confidence_interval && (mean_difference - mean_confidence_interval > 0.0001)) /// difference must be more than 0.0001, to take into account connection latency. + { + ss << "Difference at " << confidence_level[confidence_level_index] << "% confidence : "; + ss << std::fixed << std::setprecision(8) << "mean difference is " << mean_difference << ", but confidence interval is " << mean_confidence_interval; + return {false, ss.str()}; + } + else + { + ss << "No difference proven at " << confidence_level[confidence_level_index] << "% confidence"; + return {true, ss.str()}; + } +} + diff --git a/dbms/src/Common/StudentTTest.h b/dbms/src/Common/StudentTTest.h new file mode 100644 index 00000000000..b09190050b5 --- /dev/null +++ b/dbms/src/Common/StudentTTest.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include + +/** + * About: + * This is implementation of Independent two-sample t-test + * Read about it on https://en.wikipedia.org/wiki/Student%27s_t-test (Equal or unequal sample sizes, equal variance) + * + * Usage: + * It's it used to assume with some level of confidence that two distributions don't differ. + * Values can be added with t_test.add(0/1, value) and after compared and reported with compareAndReport(). + */ +class StudentTTest +{ +private: + struct DistributionData + { + size_t size = 0; + double sum = 0; + double squares_sum = 0; + + void add(double value) + { + ++size; + sum += value; + squares_sum += value * value; + } + + double avg() const + { + return sum / size; + } + + double var() const + { + return (squares_sum - (sum * sum / size)) / static_cast(size - 1); + } + + void clear() + { + size = 0; + sum = 0; + squares_sum = 0; + } + }; + + std::array data {}; + +public: + void clear(); + + void add(size_t distribution, double value); + + /// Confidence_level_index can be set in range [0, 5]. Corresponding values can be found above. TODO: Trash - no separation of concepts in code. + std::pair compareAndReport(size_t confidence_level_index = 5) const; +}; diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 881ea83f30b..aa6893d6d85 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -302,7 +302,7 @@ struct Settings : public SettingsCollection M(SettingChar, format_csv_delimiter, ',', "The character to be considered as a delimiter in CSV data. If setting with a string, a string has to have a length of 1.") \ M(SettingBool, format_csv_allow_single_quotes, 1, "If it is set to true, allow strings in single quotes.") \ M(SettingBool, format_csv_allow_double_quotes, 1, "If it is set to true, allow strings in double quotes.") \ - M(SettingBool, input_format_csv_unquoted_null_literal_as_null, false, "Consider unquoted NULL literal as \N") \ + M(SettingBool, input_format_csv_unquoted_null_literal_as_null, false, "Consider unquoted NULL literal as \\N") \ \ M(SettingDateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.") \ M(SettingBool, log_profile_events, true, "Log query performance statistics into the query_log and query_thread_log.") \ @@ -347,6 +347,7 @@ struct Settings : public SettingsCollection M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.") \ M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.") \ M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \ + M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \ \ /** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \ \ diff --git a/dbms/src/Core/SettingsCommon.h b/dbms/src/Core/SettingsCommon.h index b8c56d50caa..97edfbe9934 100644 --- a/dbms/src/Core/SettingsCommon.h +++ b/dbms/src/Core/SettingsCommon.h @@ -695,7 +695,7 @@ public: #define IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \ add({[](const Derived & d) { return d.NAME.changed; }, \ - StringRef(#NAME, strlen(#NAME)), StringRef(#DESCRIPTION, strlen(#DESCRIPTION)), true, \ + StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), true, \ &Functions::NAME##_getString, &Functions::NAME##_getField, \ &Functions::NAME##_setString, &Functions::NAME##_setField, \ &Functions::NAME##_serialize, &Functions::NAME##_deserialize, \ @@ -703,7 +703,7 @@ public: #define IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \ add({[](const Derived & d) { return d.NAME.changed; }, \ - StringRef(#NAME, strlen(#NAME)), StringRef(#DESCRIPTION, strlen(#DESCRIPTION)), false, \ + StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), false, \ &Functions::NAME##_getString, &Functions::NAME##_getField, \ &Functions::NAME##_setString, &Functions::NAME##_setField, \ &Functions::NAME##_serialize, &Functions::NAME##_deserialize, \ diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index e30dd4ae1de..377c1fee4e0 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -21,10 +22,11 @@ namespace DB MergeSortingBlockInputStream::MergeSortingBlockInputStream( const BlockInputStreamPtr & input, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_, - size_t max_bytes_before_external_sort_, const std::string & tmp_path_) + size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_) : description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_), max_bytes_before_remerge(max_bytes_before_remerge_), - max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) + max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_), + min_free_disk_space(min_free_disk_space_) { children.push_back(input); header = children.at(0)->getHeader(); @@ -77,6 +79,12 @@ Block MergeSortingBlockInputStream::readImpl() */ if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) { +#if !UNBUNDLED + auto free_space = Poco::File(tmp_path).freeSpace(); + if (sum_bytes_in_blocks + min_free_disk_space > free_space) + throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); +#endif + Poco::File(tmp_path).createDirectories(); temporary_files.emplace_back(std::make_unique(tmp_path)); const std::string & path = temporary_files.back()->path(); diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 9f257b82260..4cd9315bc3c 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -18,6 +18,10 @@ namespace DB { +namespace ErrorCodes +{ + extern const int NOT_ENOUGH_SPACE; +} /** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks. * If data to sort is too much, could use external sorting, with temporary files. */ @@ -73,7 +77,8 @@ public: MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_, - size_t max_bytes_before_external_sort_, const std::string & tmp_path_); + size_t max_bytes_before_external_sort_, const std::string & tmp_path_, + size_t min_free_disk_space_); String getName() const override { return "MergeSorting"; } @@ -93,6 +98,7 @@ private: size_t max_bytes_before_remerge; size_t max_bytes_before_external_sort; const std::string tmp_path; + size_t min_free_disk_space; Logger * log = &Logger::get("MergeSortingBlockInputStream"); diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 373b47f7315..f38573d3d34 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace ProfileEvents @@ -639,6 +640,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re && current_memory_usage > static_cast(params.max_bytes_before_external_group_by) && worth_convert_to_two_level) { +#if !UNBUNDLED + auto free_space = Poco::File(params.tmp_path).freeSpace(); + if (current_memory_usage + params.min_free_disk_space > free_space) + throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); +#endif + writeToTemporaryFile(result); } diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index b48663ff689..c3d1d5df8fd 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -39,6 +39,7 @@ namespace DB namespace ErrorCodes { extern const int UNKNOWN_AGGREGATED_DATA_VARIANT; + extern const int NOT_ENOUGH_SPACE; } class IBlockOutputStream; @@ -796,6 +797,7 @@ public: /// Settings is used to determine cache size. No threads are created. size_t max_threads; + const size_t min_free_disk_space; Params( const Block & src_header_, const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, @@ -803,21 +805,23 @@ public: size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_, size_t max_bytes_before_external_group_by_, bool empty_result_for_aggregation_by_empty_set_, - const std::string & tmp_path_, size_t max_threads_) + const std::string & tmp_path_, size_t max_threads_, + size_t min_free_disk_space_) : src_header(src_header_), keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()), overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_), group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_), max_bytes_before_external_group_by(max_bytes_before_external_group_by_), empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_), - tmp_path(tmp_path_), max_threads(max_threads_) + tmp_path(tmp_path_), max_threads(max_threads_), + min_free_disk_space(min_free_disk_space_) { } /// Only parameters that matter during merge. Params(const Block & intermediate_header_, const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_) - : Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_) + : Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_, 0) { intermediate_header = intermediate_header_; } diff --git a/dbms/src/Interpreters/AnalyzedJoin.cpp b/dbms/src/Interpreters/AnalyzedJoin.cpp index 4176b0b8012..6a3b9b8ac1b 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.cpp +++ b/dbms/src/Interpreters/AnalyzedJoin.cpp @@ -7,7 +7,6 @@ #include #include -#include namespace DB { @@ -102,22 +101,6 @@ std::unordered_map AnalyzedJoin::getOriginalColumnsMap(const Nam return out; } -void AnalyzedJoin::calculateAvailableJoinedColumns(bool make_nullable) -{ - if (!make_nullable) - { - available_joined_columns = columns_from_joined_table; - return; - } - - for (auto & column : columns_from_joined_table) - { - auto type = column.type->canBeInsideNullable() ? makeNullable(column.type) : column.type; - available_joined_columns.emplace_back(NameAndTypePair(column.name, std::move(type))); - } -} - - NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpression & table_expression, const Context & context) { NamesAndTypesList names_and_type_list; diff --git a/dbms/src/Interpreters/AnalyzedJoin.h b/dbms/src/Interpreters/AnalyzedJoin.h index af010aaca11..1ce11da95e0 100644 --- a/dbms/src/Interpreters/AnalyzedJoin.h +++ b/dbms/src/Interpreters/AnalyzedJoin.h @@ -42,8 +42,6 @@ private: /// All columns which can be read from joined table. Duplicating names are qualified. NamesAndTypesList columns_from_joined_table; - /// Columns from joined table which may be added to block. It's columns_from_joined_table with possibly modified types. - NamesAndTypesList available_joined_columns; /// Name -> original name. Names are the same as in columns_from_joined_table list. std::unordered_map original_names; /// Original name -> name. Only ranamed columns. @@ -61,7 +59,6 @@ public: std::unordered_map getOriginalColumnsMap(const NameSet & required_columns) const; void deduplicateAndQualifyColumnNames(const NameSet & left_table_columns, const String & right_table_prefix); - void calculateAvailableJoinedColumns(bool make_nullable); size_t rightKeyInclusion(const String & name) const; }; diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index c7b510abcf0..5ef05569f91 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -278,8 +278,8 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings, case JOIN: { bool is_null_used_as_default = settings.join_use_nulls; - bool right_or_full_join = join_kind == ASTTableJoin::Kind::Right || join_kind == ASTTableJoin::Kind::Full; - bool left_or_full_join = join_kind == ASTTableJoin::Kind::Left || join_kind == ASTTableJoin::Kind::Full; + bool right_or_full_join = isRightOrFull(join_kind); + bool left_or_full_join = isLeftOrFull(join_kind); for (auto & col : sample_block) { diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 2d583c3c353..79fbcf44323 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -1657,7 +1657,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, - context.getTemporaryPath(), settings.max_threads); + context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); /// If there are several sources, then we perform parallel aggregation if (pipeline.streams.size() > 1) @@ -1723,7 +1723,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, - context.getTemporaryPath(), settings.max_threads); + context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); auto transform_params = std::make_shared(params, final); @@ -1943,7 +1943,7 @@ void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificato false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, SettingUInt64(0), SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, - context.getTemporaryPath(), settings.max_threads); + context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); if (modificator == Modificator::ROLLUP) pipeline.firstStream() = std::make_shared(pipeline.firstStream(), params); @@ -1972,7 +1972,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, SettingUInt64(0), SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, - context.getTemporaryPath(), settings.max_threads); + context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); auto transform_params = std::make_shared(params, true); @@ -2073,7 +2073,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr so pipeline.firstStream() = std::make_shared( pipeline.firstStream(), order_descr, settings.max_block_size, limit, settings.max_bytes_before_remerge_sort, - settings.max_bytes_before_external_sort, context.getTemporaryPath()); + settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space_for_temporary_data); } } @@ -2111,7 +2111,7 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoP return std::make_shared( header, order_descr, settings.max_block_size, limit, settings.max_bytes_before_remerge_sort, - settings.max_bytes_before_external_sort, context.getTemporaryPath()); + settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space_for_temporary_data); }); } diff --git a/dbms/src/Interpreters/SyntaxAnalyzer.cpp b/dbms/src/Interpreters/SyntaxAnalyzer.cpp index c7f89153b44..0c73beeef16 100644 --- a/dbms/src/Interpreters/SyntaxAnalyzer.cpp +++ b/dbms/src/Interpreters/SyntaxAnalyzer.cpp @@ -30,6 +30,7 @@ #include #include +#include #include #include @@ -488,13 +489,14 @@ void getArrayJoinedColumns(ASTPtr & query, SyntaxAnalyzerResult & result, const } } -void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_default_strictness) +void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_default_strictness, ASTTableJoin::Kind & join_kind) { const ASTTablesInSelectQueryElement * node = select_query.join(); if (!node) return; auto & table_join = const_cast(node)->table_join->as(); + join_kind = table_join.kind; if (table_join.strictness == ASTTableJoin::Strictness::Unspecified && table_join.kind != ASTTableJoin::Kind::Cross) @@ -511,7 +513,7 @@ void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_defaul /// Find the columns that are obtained by JOIN. void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query, const NameSet & source_columns, - const Aliases & aliases, bool join_use_nulls) + const Aliases & aliases) { const ASTTablesInSelectQueryElement * node = select_query.join(); if (!node) @@ -537,10 +539,6 @@ void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & s if (is_asof) data.asofToJoinKeys(); } - - bool make_nullable = join_use_nulls && isLeftOrFull(table_join.kind); - - analyzed_join.calculateAvailableJoinedColumns(make_nullable); } void replaceJoinedTable(const ASTTablesInSelectQueryElement* join) @@ -611,7 +609,8 @@ std::vector getAggregates(const ASTPtr & query) /// Calculate which columns are required to execute the expression. /// Then, delete all other columns from the list of available columns. /// After execution, columns will only contain the list of columns needed to read from the table. -void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns) +void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns, + bool make_joined_columns_nullable) { /// We caclulate required_source_columns with source_columns modifications and swap them on exit required_source_columns = source_columns; @@ -639,7 +638,7 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA /// Add columns obtained by JOIN (if needed). columns_added_by_join.clear(); - for (const auto & joined_column : analyzed_join.available_joined_columns) + for (const auto & joined_column : analyzed_join.columns_from_joined_table) { auto & name = joined_column.name; if (avaliable_columns.count(name)) @@ -649,7 +648,15 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA { /// Optimisation: do not add columns needed only in JOIN ON section. if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name)) - columns_added_by_join.push_back(joined_column); + { + if (make_joined_columns_nullable) + { + auto type = joined_column.type->canBeInsideNullable() ? makeNullable(joined_column.type) : joined_column.type; + columns_added_by_join.emplace_back(NameAndTypePair(joined_column.name, std::move(type))); + } + else + columns_added_by_join.push_back(joined_column); + } required.erase(name); } } @@ -759,7 +766,7 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA if (columns_context.has_table_join) { ss << ", joined columns:"; - for (const auto & column : analyzed_join.available_joined_columns) + for (const auto & column : analyzed_join.columns_from_joined_table) ss << " '" << column.name << "'"; } @@ -865,6 +872,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze( /// Optimize if with constant condition after constants was substituted instead of scalar subqueries. OptimizeIfWithConstantConditionVisitor(result.aliases).visit(query); + bool make_joined_columns_nullable = false; if (select_query) { /// GROUP BY injective function elimination. @@ -885,12 +893,15 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze( /// Push the predicate expression down to the subqueries. result.rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings, context).optimize(); - setJoinStrictness(*select_query, settings.join_default_strictness); - collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases, settings.join_use_nulls); + ASTTableJoin::Kind join_kind = ASTTableJoin::Kind::Comma; + setJoinStrictness(*select_query, settings.join_default_strictness, join_kind); + make_joined_columns_nullable = settings.join_use_nulls && isLeftOrFull(join_kind); + + collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases); } result.aggregates = getAggregates(query); - result.collectUsedColumns(query, additional_source_columns); + result.collectUsedColumns(query, additional_source_columns, make_joined_columns_nullable); return std::make_shared(result); } diff --git a/dbms/src/Interpreters/SyntaxAnalyzer.h b/dbms/src/Interpreters/SyntaxAnalyzer.h index a31dfef7e82..e95d7354e8a 100644 --- a/dbms/src/Interpreters/SyntaxAnalyzer.h +++ b/dbms/src/Interpreters/SyntaxAnalyzer.h @@ -20,7 +20,7 @@ struct SyntaxAnalyzerResult NamesAndTypesList source_columns; /// Set of columns that are enough to read from the table to evaluate the expression. It does not include joined columns. NamesAndTypesList required_source_columns; - /// Columns will be added to block by JOIN. It's a subset of analyzed_join.available_joined_columns + /// Columns will be added to block by JOIN. It's a subset of analyzed_join.columns_from_joined_table with corrected Nullability NamesAndTypesList columns_added_by_join; Aliases aliases; @@ -42,7 +42,7 @@ struct SyntaxAnalyzerResult /// Predicate optimizer overrides the sub queries bool rewrite_subqueries = false; - void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns); + void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns, bool make_joined_columns_nullable); Names requiredSourceColumns() const { return required_source_columns.getNames(); } }; diff --git a/dbms/src/Interpreters/tests/aggregate.cpp b/dbms/src/Interpreters/tests/aggregate.cpp index 73e71d178ea..4d4d964aa9a 100644 --- a/dbms/src/Interpreters/tests/aggregate.cpp +++ b/dbms/src/Interpreters/tests/aggregate.cpp @@ -79,7 +79,7 @@ int main(int argc, char ** argv) Aggregator::Params params( stream->getHeader(), {0, 1}, aggregate_descriptions, - false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1); + false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1, 0); Aggregator aggregator(params); diff --git a/dbms/src/Processors/Transforms/MergeSortingTransform.cpp b/dbms/src/Processors/Transforms/MergeSortingTransform.cpp index 8591f5447f7..c59fc7cc745 100644 --- a/dbms/src/Processors/Transforms/MergeSortingTransform.cpp +++ b/dbms/src/Processors/Transforms/MergeSortingTransform.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -236,11 +237,13 @@ MergeSortingTransform::MergeSortingTransform( SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_, - size_t max_bytes_before_external_sort_, const std::string & tmp_path_) + size_t max_bytes_before_external_sort_, const std::string & tmp_path_, + size_t min_free_disk_space_) : IProcessor({header}, {header}) , description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_) , max_bytes_before_remerge(max_bytes_before_remerge_) , max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_) + , min_free_disk_space(min_free_disk_space_) { auto & sample = inputs.front().getHeader(); @@ -504,6 +507,12 @@ void MergeSortingTransform::consume(Chunk chunk) */ if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) { +#if !UNBUNDLED + auto free_space = Poco::File(tmp_path).freeSpace(); + if (sum_bytes_in_blocks + min_free_disk_space > free_space) + throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); +#endif + Poco::File(tmp_path).createDirectories(); temporary_files.emplace_back(std::make_unique(tmp_path)); const std::string & path = temporary_files.back()->path(); diff --git a/dbms/src/Processors/Transforms/MergeSortingTransform.h b/dbms/src/Processors/Transforms/MergeSortingTransform.h index 0ab517fc5d4..eec249296ef 100644 --- a/dbms/src/Processors/Transforms/MergeSortingTransform.h +++ b/dbms/src/Processors/Transforms/MergeSortingTransform.h @@ -14,6 +14,10 @@ namespace DB { +namespace ErrorCodes +{ + extern const int NOT_ENOUGH_SPACE; +} class MergeSorter; class MergeSortingTransform : public IProcessor @@ -24,7 +28,8 @@ public: SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_, - size_t max_bytes_before_external_sort_, const std::string & tmp_path_); + size_t max_bytes_before_external_sort_, const std::string & tmp_path_, + size_t min_free_disk_space_); ~MergeSortingTransform() override; @@ -44,6 +49,7 @@ private: size_t max_bytes_before_remerge; size_t max_bytes_before_external_sort; const std::string tmp_path; + size_t min_free_disk_space; Logger * log = &Logger::get("MergeSortingBlockInputStream"); diff --git a/dbms/src/Processors/tests/processors_test_aggregation.cpp b/dbms/src/Processors/tests/processors_test_aggregation.cpp index 2306de4edc0..ed868d08762 100644 --- a/dbms/src/Processors/tests/processors_test_aggregation.cpp +++ b/dbms/src/Processors/tests/processors_test_aggregation.cpp @@ -229,7 +229,8 @@ try max_bytes_before_external_group_by, false, /// empty_result_for_aggregation_by_empty_set cur_path, /// tmp_path - 1 /// max_threads + 1, /// max_threads + 0 ); auto agg_params = std::make_shared(params, /* final =*/ false); @@ -301,7 +302,8 @@ try max_bytes_before_external_group_by, false, /// empty_result_for_aggregation_by_empty_set cur_path, /// tmp_path - 1 /// max_threads + 1, /// max_threads + 0 ); auto agg_params = std::make_shared(params, /* final =*/ false); diff --git a/dbms/src/Processors/tests/processors_test_merge_sorting_transform.cpp b/dbms/src/Processors/tests/processors_test_merge_sorting_transform.cpp index a5059011e9b..8e6b4655127 100644 --- a/dbms/src/Processors/tests/processors_test_merge_sorting_transform.cpp +++ b/dbms/src/Processors/tests/processors_test_merge_sorting_transform.cpp @@ -133,7 +133,7 @@ try SortDescription description = {{0, 1, 1}}; auto transform = std::make_shared( source->getPort().getHeader(), description, - max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, "."); + max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".", 0); auto sink = std::make_shared(); connect(source->getPort(), transform->getInputs().front()); diff --git a/dbms/src/Storages/tests/gtest_transform_query_for_external_database.cpp b/dbms/src/Storages/tests/gtest_transform_query_for_external_database.cpp index a7d79cd23d7..34f6ce64278 100644 --- a/dbms/src/Storages/tests/gtest_transform_query_for_external_database.cpp +++ b/dbms/src/Storages/tests/gtest_transform_query_for_external_database.cpp @@ -76,3 +76,14 @@ TEST(TransformQueryForExternalDatabase, Substring) "SELECT \"column\" FROM \"test\".\"table\"", state().context, state().columns); } + +TEST(TransformQueryForExternalDatabase, MultipleAndSubqueries) +{ + check("SELECT column FROM test.table WHERE 1 = 1 AND toString(column) = '42' AND column = 42 AND left(column, 10) = RIGHT(column, 10) AND column IN (1, 42) AND SUBSTRING(column FROM 1 FOR 2) = 'Hello' AND column != 4", + "SELECT \"column\" FROM \"test\".\"table\" WHERE 1 AND (\"column\" = 42) AND (\"column\" IN (1, 42)) AND (\"column\" != 4)", + state().context, state().columns); + check("SELECT column FROM test.table WHERE toString(column) = '42' AND left(column, 10) = RIGHT(column, 10) AND column = 42", + "SELECT \"column\" FROM \"test\".\"table\" WHERE (\"column\" = 42)", + state().context, state().columns); + +} diff --git a/dbms/src/Storages/transformQueryForExternalDatabase.cpp b/dbms/src/Storages/transformQueryForExternalDatabase.cpp index 55a0ef95200..b6e48836efa 100644 --- a/dbms/src/Storages/transformQueryForExternalDatabase.cpp +++ b/dbms/src/Storages/transformQueryForExternalDatabase.cpp @@ -141,19 +141,17 @@ String transformQueryForExternalDatabase( if (function->name == "and") { bool compatible_found = false; - auto new_function_and = std::make_shared(); - auto new_function_and_arguments = std::make_shared(); - new_function_and->arguments = new_function_and_arguments; - new_function_and->children.push_back(new_function_and_arguments); - + auto new_function_and = makeASTFunction("and"); for (const auto & elem : function->arguments->children) { if (isCompatible(*elem)) { - new_function_and_arguments->children.push_back(elem); + new_function_and->arguments->children.push_back(elem); compatible_found = true; } } + if (new_function_and->arguments->children.size() == 1) + new_function_and->name = ""; if (compatible_found) select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(new_function_and)); diff --git a/dbms/tests/queries/0_stateless/00999_settings_no_extra_quotes.reference b/dbms/tests/queries/0_stateless/00999_settings_no_extra_quotes.reference new file mode 100644 index 00000000000..573541ac970 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00999_settings_no_extra_quotes.reference @@ -0,0 +1 @@ +0 diff --git a/dbms/tests/queries/0_stateless/00999_settings_no_extra_quotes.sql b/dbms/tests/queries/0_stateless/00999_settings_no_extra_quotes.sql new file mode 100644 index 00000000000..55d9ff2780d --- /dev/null +++ b/dbms/tests/queries/0_stateless/00999_settings_no_extra_quotes.sql @@ -0,0 +1 @@ +SELECT DISTINCT description LIKE '"%"' FROM system.settings; diff --git a/docs/en/query_language/dicts/external_dicts_dict_layout.md b/docs/en/query_language/dicts/external_dicts_dict_layout.md index 03279688d6c..c3096544d25 100644 --- a/docs/en/query_language/dicts/external_dicts_dict_layout.md +++ b/docs/en/query_language/dicts/external_dicts_dict_layout.md @@ -95,7 +95,7 @@ Configuration example: The dictionary is stored in memory in the form of a hash table with an ordered array of ranges and their corresponding values. -This storage method works the same way as hashed and allows using date/time ranges in addition to the key, if they appear in the dictionary. +This storage method works the same way as hashed and allows using date/time (arbitrary numeric type) ranges in addition to the key. Example: The table contains discounts for each advertiser in the format: @@ -111,7 +111,7 @@ Example: The table contains discounts for each advertiser in the format: +---------------+---------------------+-------------------+--------+ ``` -To use a sample for date ranges, define the `range_min` and `range_max` elements in the [structure](external_dicts_dict_structure.md). +To use a sample for date ranges, define the `range_min` and `range_max` elements in the [structure](external_dicts_dict_structure.md). These elements must contain elements `name` and` type` (if `type` is not specified, the default type will be used - Date). `type` can be any numeric type (Date / DateTime / UInt64 / Int32 / others). Example: @@ -122,14 +122,16 @@ Example: first + Date last + Date ... ``` -To work with these dictionaries, you need to pass an additional date argument to the `dictGetT` function: +To work with these dictionaries, you need to pass an additional argument to the `dictGetT` function, for which a range is selected: ``` dictGetT('dict_name', 'attr_name', id, date) @@ -160,10 +162,12 @@ Configuration example: Abcdef - StartDate + StartTimeStamp + UInt64 - EndDate + EndTimeStamp + UInt64 XXXType diff --git a/docs/ru/query_language/dicts/external_dicts_dict_layout.md b/docs/ru/query_language/dicts/external_dicts_dict_layout.md index aafcf531860..826d9b78ae9 100644 --- a/docs/ru/query_language/dicts/external_dicts_dict_layout.md +++ b/docs/ru/query_language/dicts/external_dicts_dict_layout.md @@ -95,7 +95,7 @@ Словарь хранится в оперативной памяти в виде хэш-таблицы с упорядоченным массивом диапазонов и соответствующих им значений. -Этот способ размещения работает также как и hashed и позволяет дополнительно к ключу использовать дипазоны по дате/времени, если они указаны в словаре. +Этот способ размещения работает также как и hashed и позволяет дополнительно к ключу использовать дипазоны по дате/времени (произвольному числовому типу). Пример: таблица содержит скидки для каждого рекламодателя в виде: @@ -111,7 +111,7 @@ +---------------+---------------------+-------------------+--------+ ``` -Чтобы использовать выборку по диапазонам дат, необходимо в [structure](external_dicts_dict_structure.md) определить элементы `range_min`, `range_max`. +Чтобы использовать выборку по диапазонам дат, необходимо в [structure](external_dicts_dict_structure.md) определить элементы `range_min`, `range_max`. В этих элементах должны присутствовать элементы `name` и `type` (если `type` не указан, будет использован тип по умолчанию -- Date). `type` может быть любым численным типом (Date/DateTime/UInt64/Int32/др.). Пример: @@ -122,14 +122,16 @@ first + Date last + Date ... ``` -Для работы с такими словарями в функцию `dictGetT` необходимо передавать дополнительный аргумент - дату: : +Для работы с такими словарями в функцию `dictGetT` необходимо передавать дополнительный аргумент, для которого подбирается диапазон: dictGetT('dict_name', 'attr_name', id, date) @@ -158,10 +160,12 @@ Abcdef - StartDate + StartTimeStamp + UInt64 - EndDate + EndTimeStamp + UInt64 XXXType