Merge branch 'master' into improve-table-locks

This commit is contained in:
Alexey Milovidov 2019-08-28 15:41:30 +03:00
commit 625e128770
27 changed files with 503 additions and 147 deletions

View File

@ -32,6 +32,7 @@
#include <Client/Connection.h>
#include <Common/InterruptListener.h>
#include <Common/Config/configReadClient.h>
#include <Common/StudentTTest.h>
/** A tool for evaluating ClickHouse performance.
@ -41,6 +42,8 @@
namespace DB
{
using Ports = std::vector<UInt16>;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
@ -50,17 +53,34 @@ namespace ErrorCodes
class Benchmark : public Poco::Util::Application
{
public:
Benchmark(unsigned concurrency_, double delay_,
const String & host_, UInt16 port_, bool secure_, const String & default_database_,
Benchmark(unsigned concurrency_, double delay_, Strings && hosts_, Ports && ports_,
bool cumulative_, bool secure_, const String & default_database_,
const String & user_, const String & password_, const String & stage,
bool randomize_, size_t max_iterations_, double max_time_,
const String & json_path_, const Settings & settings_)
const String & json_path_, size_t confidence_, const Settings & settings_)
:
concurrency(concurrency_), delay(delay_), queue(concurrency),
connections(concurrency, host_, port_, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable),
randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency)
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
confidence(confidence_), json_path(json_path_), settings(settings_),
global_context(Context::createGlobal()), pool(concurrency)
{
const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
size_t connections_cnt = std::max(ports_.size(), hosts_.size());
connections.reserve(connections_cnt);
comparison_info_total.reserve(connections_cnt);
comparison_info_per_interval.reserve(connections_cnt);
for (size_t i = 0; i < connections_cnt; ++i)
{
UInt16 cur_port = i >= ports_.size() ? 9000 : ports_[i];
std::string cur_host = i >= hosts_.size() ? "localhost" : hosts_[i];
connections.emplace_back(std::make_unique<ConnectionPool>(concurrency, cur_host, cur_port, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure));
comparison_info_per_interval.emplace_back(std::make_shared<Stats>());
comparison_info_total.emplace_back(std::make_shared<Stats>());
}
global_context.makeGlobalContext();
std::cerr << std::fixed << std::setprecision(3);
@ -101,21 +121,29 @@ public:
}
private:
using Query = std::string;
using Entry = ConnectionPool::Entry;
using EntryPtr = std::shared_ptr<Entry>;
using EntryPtrs = std::vector<EntryPtr>;
unsigned concurrency;
double delay;
using Query = std::string;
using Queries = std::vector<Query>;
Queries queries;
using Queue = ConcurrentBoundedQueue<Query>;
Queue queue;
ConnectionPool connections;
using ConnectionPoolUniq = std::unique_ptr<ConnectionPool>;
using ConnectionPoolUniqs = std::vector<ConnectionPoolUniq>;
ConnectionPoolUniqs connections;
bool randomize;
bool cumulative;
size_t max_iterations;
double max_time;
size_t confidence;
String json_path;
Settings settings;
Context global_context;
@ -128,12 +156,12 @@ private:
struct Stats
{
Stopwatch watch;
std::atomic<size_t> queries{0};
size_t read_rows = 0;
size_t read_bytes = 0;
size_t result_rows = 0;
size_t result_bytes = 0;
double work_time = 0;
using Sampler = ReservoirSampler<double>;
Sampler sampler {1 << 16};
@ -141,6 +169,7 @@ private:
void add(double seconds, size_t read_rows_inc, size_t read_bytes_inc, size_t result_rows_inc, size_t result_bytes_inc)
{
++queries;
work_time += seconds;
read_rows += read_rows_inc;
read_bytes += read_bytes_inc;
result_rows += result_rows_inc;
@ -150,8 +179,8 @@ private:
void clear()
{
watch.restart();
queries = 0;
work_time = 0;
read_rows = 0;
read_bytes = 0;
result_rows = 0;
@ -160,15 +189,18 @@ private:
}
};
Stats info_per_interval;
Stats info_total;
using MultiStats = std::vector<std::shared_ptr<Stats>>;
MultiStats comparison_info_per_interval;
MultiStats comparison_info_total;
StudentTTest t_test;
Stopwatch total_watch;
Stopwatch delay_watch;
std::mutex mutex;
ThreadPool pool;
void readQueries()
{
ReadBufferFromFileDescriptor in(STDIN_FILENO);
@ -213,7 +245,7 @@ private:
return false;
}
if (max_time > 0 && info_total.watch.elapsedSeconds() >= max_time)
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
@ -227,8 +259,8 @@ private:
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
{
printNumberOfQueriesExecuted(info_total.queries);
report(info_per_interval);
printNumberOfQueriesExecuted(queries_executed);
cumulative ? report(comparison_info_total) : report(comparison_info_per_interval);
delay_watch.restart();
}
}
@ -242,11 +274,17 @@ private:
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
for (size_t i = 0; i < concurrency; ++i)
pool.schedule(std::bind(&Benchmark::thread, this,
connections.get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
{
EntryPtrs connection_entries;
connection_entries.reserve(connections.size());
for (const auto & connection : connections)
connection_entries.emplace_back(std::make_shared<Entry>(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
pool.schedule(std::bind(&Benchmark::thread, this, connection_entries));
}
InterruptListener interrupt_listener;
info_per_interval.watch.restart();
delay_watch.restart();
/// Push queries into queue
@ -262,20 +300,24 @@ private:
}
pool.wait();
info_total.watch.stop();
total_watch.stop();
if (!json_path.empty())
reportJSON(info_total, json_path);
reportJSON(comparison_info_total, json_path);
printNumberOfQueriesExecuted(info_total.queries);
report(info_total);
printNumberOfQueriesExecuted(queries_executed);
report(comparison_info_total);
}
void thread(ConnectionPool::Entry connection)
void thread(EntryPtrs & connection_entries)
{
Query query;
/// Randomly choosing connection index
pcg64 generator(randomSeed());
std::uniform_int_distribution<size_t> distribution(0, connection_entries.size() - 1);
try
{
/// In these threads we do not accept INT signal.
@ -296,8 +338,7 @@ private:
if (shutdown || (max_iterations && queries_executed == max_iterations))
return;
}
execute(connection, query);
execute(connection_entries, query, distribution(generator));
++queries_executed;
}
}
@ -309,20 +350,19 @@ private:
}
}
void execute(ConnectionPool::Entry & connection, Query & query)
void execute(EntryPtrs & connection_entries, Query & query, size_t connection_index)
{
Stopwatch watch;
RemoteBlockInputStream stream(
*connection,
*(*connection_entries[connection_index]),
query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage);
Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
stream.readPrefix();
while (Block block = stream.read())
;
while (Block block = stream.read());
stream.readSuffix();
const BlockStreamProfileInfo & info = stream.getProfileInfo();
@ -330,33 +370,47 @@ private:
double seconds = watch.elapsedSeconds();
std::lock_guard lock(mutex);
info_per_interval.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
info_total.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
comparison_info_per_interval[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
comparison_info_total[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
t_test.add(connection_index, seconds);
}
void report(Stats & info)
void report(MultiStats & infos)
{
std::lock_guard lock(mutex);
/// Avoid zeros, nans or exceptions
if (0 == info.queries)
return;
std::cerr << "\n";
for (size_t i = 0; i < infos.size(); ++i)
{
const auto & info = infos[i];
double seconds = info.watch.elapsedSeconds();
/// Avoid zeros, nans or exceptions
if (0 == info->queries)
return;
std::cerr
<< "\n"
<< "QPS: " << (info.queries / seconds) << ", "
<< "RPS: " << (info.read_rows / seconds) << ", "
<< "MiB/s: " << (info.read_bytes / seconds / 1048576) << ", "
<< "result RPS: " << (info.result_rows / seconds) << ", "
<< "result MiB/s: " << (info.result_bytes / seconds / 1048576) << "."
<< "\n";
double seconds = info->work_time / concurrency;
std::cerr
<< connections[i]->getDescription() << ", "
<< "queries " << info->queries << ", "
<< "QPS: " << (info->queries / seconds) << ", "
<< "RPS: " << (info->read_rows / seconds) << ", "
<< "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", "
<< "result RPS: " << (info->result_rows / seconds) << ", "
<< "result MiB/s: " << (info->result_bytes / seconds / 1048576) << "."
<< "\n";
}
std::cerr << "\n";
auto print_percentile = [&](double percent)
{
std::cerr << percent << "%\t" << info.sampler.quantileInterpolated(percent / 100.0) << " sec." << std::endl;
std::cerr << percent << "%\t\t";
for (const auto & info : infos)
{
std::cerr << info->sampler.quantileInterpolated(percent / 100.0) << " sec." << "\t";
}
std::cerr << "\n";
};
for (int percent = 0; percent <= 90; percent += 10)
@ -367,10 +421,16 @@ private:
print_percentile(99.9);
print_percentile(99.99);
info.clear();
std::cerr << "\n" << t_test.compareAndReport(confidence).second << "\n";
if (!cumulative)
{
for (auto & info : infos)
info->clear();
}
}
void reportJSON(Stats & info, const std::string & filename)
void reportJSON(MultiStats & infos, const std::string & filename)
{
WriteBufferFromFile json_out(filename);
@ -381,36 +441,41 @@ private:
json_out << double_quote << key << ": " << value << (with_comma ? ",\n" : "\n");
};
auto print_percentile = [&json_out, &info](auto percent, bool with_comma = true)
auto print_percentile = [&json_out](Stats & info, auto percent, bool with_comma = true)
{
json_out << "\"" << percent << "\"" << ": " << info.sampler.quantileInterpolated(percent / 100.0) << (with_comma ? ",\n" : "\n");
};
json_out << "{\n";
json_out << double_quote << "statistics" << ": {\n";
for (size_t i = 0; i < infos.size(); ++i)
{
const auto & info = infos[i];
double seconds = info.watch.elapsedSeconds();
print_key_value("QPS", info.queries / seconds);
print_key_value("RPS", info.read_rows / seconds);
print_key_value("MiBPS", info.read_bytes / seconds);
print_key_value("RPS_result", info.result_rows / seconds);
print_key_value("MiBPS_result", info.result_bytes / seconds);
print_key_value("num_queries", info.queries.load(), false);
json_out << double_quote << connections[i]->getDescription() << ": {\n";
json_out << double_quote << "statistics" << ": {\n";
json_out << "},\n";
print_key_value("QPS", info->queries / info->work_time);
print_key_value("RPS", info->read_rows / info->work_time);
print_key_value("MiBPS", info->read_bytes / info->work_time);
print_key_value("RPS_result", info->result_rows / info->work_time);
print_key_value("MiBPS_result", info->result_bytes / info->work_time);
print_key_value("num_queries", info->queries.load(), false);
json_out << double_quote << "query_time_percentiles" << ": {\n";
json_out << "},\n";
json_out << double_quote << "query_time_percentiles" << ": {\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent);
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(*info, percent);
print_percentile(95);
print_percentile(99);
print_percentile(99.9);
print_percentile(99.99, false);
print_percentile(*info, 95);
print_percentile(*info, 99);
print_percentile(*info, 99.9);
print_percentile(*info, 99.99, false);
json_out << "}\n";
json_out << "}\n";
json_out << (i == infos.size() - 1 ? "}\n" : "},\n");
}
json_out << "}\n";
}
@ -449,13 +514,15 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("randomize,r", value<bool>()->default_value(false), "randomize order of execution")
("json", value<std::string>()->default_value(""), "write final report to specified file in JSON format")
("host,h", value<std::string>()->default_value("localhost"), "")
("port", value<UInt16>()->default_value(9000), "")
("host,h", value<Strings>()->multitoken(), "")
("port,p", value<Ports>()->multitoken(), "")
("cumulative", "prints cumulative data instead of data per interval")
("secure,s", "Use TLS connection")
("user", value<std::string>()->default_value("default"), "")
("password", value<std::string>()->default_value(""), "")
("database", value<std::string>()->default_value("default"), "")
("stacktrace", "print stack traces of exceptions")
("confidence", value<size_t>()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)")
;
Settings settings;
@ -475,12 +542,15 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
print_stacktrace = options.count("stacktrace");
UseSSL use_ssl;
Ports ports = options.count("port") ? options["port"].as<Ports>() : Ports({9000});
Strings hosts = options.count("host") ? options["host"].as<Strings>() : Strings({"localhost"});
Benchmark benchmark(
options["concurrency"].as<unsigned>(),
options["delay"].as<double>(),
options["host"].as<std::string>(),
options["port"].as<UInt16>(),
std::move(hosts),
std::move(ports),
options.count("cumulative"),
options.count("secure"),
options["database"].as<std::string>(),
options["user"].as<std::string>(),
@ -490,6 +560,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
options["iterations"].as<size_t>(),
options["timelimit"].as<double>(),
options["json"].as<std::string>(),
options["confidence"].as<size_t>(),
settings);
return benchmark.run();
}

View File

@ -88,6 +88,10 @@ public:
{
return host;
}
std::string getDescription() const
{
return host + ":" + toString(port);
}
protected:
/** Creates a new object to put in the pool. */

View File

@ -0,0 +1,169 @@
#include "StudentTTest.h"
#include <cmath>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <stdexcept>
namespace
{
/// First row corresponds to infinity size of distributions case
const double students_table[101][6] =
{
{ 1.282, 1.645, 1.960, 2.326, 2.576, 3.090 },
{ 3.078, 6.314, 12.706, 31.821, 63.657, 318.313 },
{ 1.886, 2.920, 4.303, 6.965, 9.925, 22.327 },
{ 1.638, 2.353, 3.182, 4.541, 5.841, 10.215 },
{ 1.533, 2.132, 2.776, 3.747, 4.604, 7.173 },
{ 1.476, 2.015, 2.571, 3.365, 4.032, 5.893 },
{ 1.440, 1.943, 2.447, 3.143, 3.707, 5.208 },
{ 1.415, 1.895, 2.365, 2.998, 3.499, 4.782 },
{ 1.397, 1.860, 2.306, 2.896, 3.355, 4.499 },
{ 1.383, 1.833, 2.262, 2.821, 3.250, 4.296 },
{ 1.372, 1.812, 2.228, 2.764, 3.169, 4.143 },
{ 1.363, 1.796, 2.201, 2.718, 3.106, 4.024 },
{ 1.356, 1.782, 2.179, 2.681, 3.055, 3.929 },
{ 1.350, 1.771, 2.160, 2.650, 3.012, 3.852 },
{ 1.345, 1.761, 2.145, 2.624, 2.977, 3.787 },
{ 1.341, 1.753, 2.131, 2.602, 2.947, 3.733 },
{ 1.337, 1.746, 2.120, 2.583, 2.921, 3.686 },
{ 1.333, 1.740, 2.110, 2.567, 2.898, 3.646 },
{ 1.330, 1.734, 2.101, 2.552, 2.878, 3.610 },
{ 1.328, 1.729, 2.093, 2.539, 2.861, 3.579 },
{ 1.325, 1.725, 2.086, 2.528, 2.845, 3.552 },
{ 1.323, 1.721, 2.080, 2.518, 2.831, 3.527 },
{ 1.321, 1.717, 2.074, 2.508, 2.819, 3.505 },
{ 1.319, 1.714, 2.069, 2.500, 2.807, 3.485 },
{ 1.318, 1.711, 2.064, 2.492, 2.797, 3.467 },
{ 1.316, 1.708, 2.060, 2.485, 2.787, 3.450 },
{ 1.315, 1.706, 2.056, 2.479, 2.779, 3.435 },
{ 1.314, 1.703, 2.052, 2.473, 2.771, 3.421 },
{ 1.313, 1.701, 2.048, 2.467, 2.763, 3.408 },
{ 1.311, 1.699, 2.045, 2.462, 2.756, 3.396 },
{ 1.310, 1.697, 2.042, 2.457, 2.750, 3.385 },
{ 1.309, 1.696, 2.040, 2.453, 2.744, 3.375 },
{ 1.309, 1.694, 2.037, 2.449, 2.738, 3.365 },
{ 1.308, 1.692, 2.035, 2.445, 2.733, 3.356 },
{ 1.307, 1.691, 2.032, 2.441, 2.728, 3.348 },
{ 1.306, 1.690, 2.030, 2.438, 2.724, 3.340 },
{ 1.306, 1.688, 2.028, 2.434, 2.719, 3.333 },
{ 1.305, 1.687, 2.026, 2.431, 2.715, 3.326 },
{ 1.304, 1.686, 2.024, 2.429, 2.712, 3.319 },
{ 1.304, 1.685, 2.023, 2.426, 2.708, 3.313 },
{ 1.303, 1.684, 2.021, 2.423, 2.704, 3.307 },
{ 1.303, 1.683, 2.020, 2.421, 2.701, 3.301 },
{ 1.302, 1.682, 2.018, 2.418, 2.698, 3.296 },
{ 1.302, 1.681, 2.017, 2.416, 2.695, 3.291 },
{ 1.301, 1.680, 2.015, 2.414, 2.692, 3.286 },
{ 1.301, 1.679, 2.014, 2.412, 2.690, 3.281 },
{ 1.300, 1.679, 2.013, 2.410, 2.687, 3.277 },
{ 1.300, 1.678, 2.012, 2.408, 2.685, 3.273 },
{ 1.299, 1.677, 2.011, 2.407, 2.682, 3.269 },
{ 1.299, 1.677, 2.010, 2.405, 2.680, 3.265 },
{ 1.299, 1.676, 2.009, 2.403, 2.678, 3.261 },
{ 1.298, 1.675, 2.008, 2.402, 2.676, 3.258 },
{ 1.298, 1.675, 2.007, 2.400, 2.674, 3.255 },
{ 1.298, 1.674, 2.006, 2.399, 2.672, 3.251 },
{ 1.297, 1.674, 2.005, 2.397, 2.670, 3.248 },
{ 1.297, 1.673, 2.004, 2.396, 2.668, 3.245 },
{ 1.297, 1.673, 2.003, 2.395, 2.667, 3.242 },
{ 1.297, 1.672, 2.002, 2.394, 2.665, 3.239 },
{ 1.296, 1.672, 2.002, 2.392, 2.663, 3.237 },
{ 1.296, 1.671, 2.001, 2.391, 2.662, 3.234 },
{ 1.296, 1.671, 2.000, 2.390, 2.660, 3.232 },
{ 1.296, 1.670, 2.000, 2.389, 2.659, 3.229 },
{ 1.295, 1.670, 1.999, 2.388, 2.657, 3.227 },
{ 1.295, 1.669, 1.998, 2.387, 2.656, 3.225 },
{ 1.295, 1.669, 1.998, 2.386, 2.655, 3.223 },
{ 1.295, 1.669, 1.997, 2.385, 2.654, 3.220 },
{ 1.295, 1.668, 1.997, 2.384, 2.652, 3.218 },
{ 1.294, 1.668, 1.996, 2.383, 2.651, 3.216 },
{ 1.294, 1.668, 1.995, 2.382, 2.650, 3.214 },
{ 1.294, 1.667, 1.995, 2.382, 2.649, 3.213 },
{ 1.294, 1.667, 1.994, 2.381, 2.648, 3.211 },
{ 1.294, 1.667, 1.994, 2.380, 2.647, 3.209 },
{ 1.293, 1.666, 1.993, 2.379, 2.646, 3.207 },
{ 1.293, 1.666, 1.993, 2.379, 2.645, 3.206 },
{ 1.293, 1.666, 1.993, 2.378, 2.644, 3.204 },
{ 1.293, 1.665, 1.992, 2.377, 2.643, 3.202 },
{ 1.293, 1.665, 1.992, 2.376, 2.642, 3.201 },
{ 1.293, 1.665, 1.991, 2.376, 2.641, 3.199 },
{ 1.292, 1.665, 1.991, 2.375, 2.640, 3.198 },
{ 1.292, 1.664, 1.990, 2.374, 2.640, 3.197 },
{ 1.292, 1.664, 1.990, 2.374, 2.639, 3.195 },
{ 1.292, 1.664, 1.990, 2.373, 2.638, 3.194 },
{ 1.292, 1.664, 1.989, 2.373, 2.637, 3.193 },
{ 1.292, 1.663, 1.989, 2.372, 2.636, 3.191 },
{ 1.292, 1.663, 1.989, 2.372, 2.636, 3.190 },
{ 1.292, 1.663, 1.988, 2.371, 2.635, 3.189 },
{ 1.291, 1.663, 1.988, 2.370, 2.634, 3.188 },
{ 1.291, 1.663, 1.988, 2.370, 2.634, 3.187 },
{ 1.291, 1.662, 1.987, 2.369, 2.633, 3.185 },
{ 1.291, 1.662, 1.987, 2.369, 2.632, 3.184 },
{ 1.291, 1.662, 1.987, 2.368, 2.632, 3.183 },
{ 1.291, 1.662, 1.986, 2.368, 2.631, 3.182 },
{ 1.291, 1.662, 1.986, 2.368, 2.630, 3.181 },
{ 1.291, 1.661, 1.986, 2.367, 2.630, 3.180 },
{ 1.291, 1.661, 1.986, 2.367, 2.629, 3.179 },
{ 1.291, 1.661, 1.985, 2.366, 2.629, 3.178 },
{ 1.290, 1.661, 1.985, 2.366, 2.628, 3.177 },
{ 1.290, 1.661, 1.985, 2.365, 2.627, 3.176 },
{ 1.290, 1.661, 1.984, 2.365, 2.627, 3.175 },
{ 1.290, 1.660, 1.984, 2.365, 2.626, 3.175 },
{ 1.290, 1.660, 1.984, 2.364, 2.626, 3.174 },
};
const double confidence_level[6] = { 80, 90, 95, 98, 99, 99.5 };
}
void StudentTTest::clear()
{
data[0].clear();
data[1].clear();
}
void StudentTTest::add(size_t distribution, double value)
{
if (distribution > 1)
throw std::logic_error("Distribution number for Student's T-Test must be eigther 0 or 1");
data[distribution].add(value);
}
/// Confidence_level_index can be set in range [0, 5]. Corresponding values can be found above.
std::pair<bool, std::string> StudentTTest::compareAndReport(size_t confidence_level_index) const
{
if (confidence_level_index > 5)
confidence_level_index = 5;
if (data[0].size == 0 || data[1].size == 0)
return {true, ""};
size_t degrees_of_freedom = (data[0].size - 1) + (data[1].size - 1);
double table_value = students_table[degrees_of_freedom > 100 ? 0 : degrees_of_freedom][confidence_level_index];
double pooled_standard_deviation = sqrt(((data[0].size - 1) * data[0].var() + (data[1].size - 1) * data[1].var()) / degrees_of_freedom);
double t_statistic = pooled_standard_deviation * sqrt(1.0 / data[0].size + 1.0 / data[1].size);
double mean_difference = fabs(data[0].avg() - data[1].avg());
double mean_confidence_interval = table_value * t_statistic;
std::stringstream ss;
if (mean_difference > mean_confidence_interval && (mean_difference - mean_confidence_interval > 0.0001)) /// difference must be more than 0.0001, to take into account connection latency.
{
ss << "Difference at " << confidence_level[confidence_level_index] << "% confidence : ";
ss << std::fixed << std::setprecision(8) << "mean difference is " << mean_difference << ", but confidence interval is " << mean_confidence_interval;
return {false, ss.str()};
}
else
{
ss << "No difference proven at " << confidence_level[confidence_level_index] << "% confidence";
return {true, ss.str()};
}
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <array>
#include <string>
#include <map>
/**
* About:
* This is implementation of Independent two-sample t-test
* Read about it on https://en.wikipedia.org/wiki/Student%27s_t-test (Equal or unequal sample sizes, equal variance)
*
* Usage:
* It's it used to assume with some level of confidence that two distributions don't differ.
* Values can be added with t_test.add(0/1, value) and after compared and reported with compareAndReport().
*/
class StudentTTest
{
private:
struct DistributionData
{
size_t size = 0;
double sum = 0;
double squares_sum = 0;
void add(double value)
{
++size;
sum += value;
squares_sum += value * value;
}
double avg() const
{
return sum / size;
}
double var() const
{
return (squares_sum - (sum * sum / size)) / static_cast<double>(size - 1);
}
void clear()
{
size = 0;
sum = 0;
squares_sum = 0;
}
};
std::array<DistributionData, 2> data {};
public:
void clear();
void add(size_t distribution, double value);
/// Confidence_level_index can be set in range [0, 5]. Corresponding values can be found above. TODO: Trash - no separation of concepts in code.
std::pair<bool, std::string> compareAndReport(size_t confidence_level_index = 5) const;
};

View File

@ -302,7 +302,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingChar, format_csv_delimiter, ',', "The character to be considered as a delimiter in CSV data. If setting with a string, a string has to have a length of 1.") \
M(SettingBool, format_csv_allow_single_quotes, 1, "If it is set to true, allow strings in single quotes.") \
M(SettingBool, format_csv_allow_double_quotes, 1, "If it is set to true, allow strings in double quotes.") \
M(SettingBool, input_format_csv_unquoted_null_literal_as_null, false, "Consider unquoted NULL literal as \N") \
M(SettingBool, input_format_csv_unquoted_null_literal_as_null, false, "Consider unquoted NULL literal as \\N") \
\
M(SettingDateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.") \
M(SettingBool, log_profile_events, true, "Log query performance statistics into the query_log and query_thread_log.") \
@ -347,6 +347,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.") \
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.") \
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -695,7 +695,7 @@ public:
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_MUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
add({[](const Derived & d) { return d.NAME.changed; }, \
StringRef(#NAME, strlen(#NAME)), StringRef(#DESCRIPTION, strlen(#DESCRIPTION)), true, \
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), true, \
&Functions::NAME##_getString, &Functions::NAME##_getField, \
&Functions::NAME##_setString, &Functions::NAME##_setField, \
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \
@ -703,7 +703,7 @@ public:
#define IMPLEMENT_SETTINGS_COLLECTION_ADD_IMMUTABLE_MEMBER_INFO_HELPER_(TYPE, NAME, DEFAULT, DESCRIPTION) \
add({[](const Derived & d) { return d.NAME.changed; }, \
StringRef(#NAME, strlen(#NAME)), StringRef(#DESCRIPTION, strlen(#DESCRIPTION)), false, \
StringRef(#NAME, strlen(#NAME)), StringRef(DESCRIPTION, strlen(DESCRIPTION)), false, \
&Functions::NAME##_getString, &Functions::NAME##_getField, \
&Functions::NAME##_setString, &Functions::NAME##_setField, \
&Functions::NAME##_serialize, &Functions::NAME##_deserialize, \

View File

@ -4,6 +4,7 @@
#include <DataStreams/copyData.h>
#include <DataStreams/processConstants.h>
#include <Common/formatReadable.h>
#include <common/config_common.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
@ -21,10 +22,11 @@ namespace DB
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_remerge(max_bytes_before_remerge_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_),
min_free_disk_space(min_free_disk_space_)
{
children.push_back(input);
header = children.at(0)->getHeader();
@ -77,6 +79,12 @@ Block MergeSortingBlockInputStream::readImpl()
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
#if !UNBUNDLED
auto free_space = Poco::File(tmp_path).freeSpace();
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();

View File

@ -18,6 +18,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
* If data to sort is too much, could use external sorting, with temporary files.
*/
@ -73,7 +77,8 @@ public:
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; }
@ -93,6 +98,7 @@ private:
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingBlockInputStream");

View File

@ -24,6 +24,7 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <common/demangle.h>
#include <common/config_common.h>
namespace ProfileEvents
@ -639,6 +640,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
#if !UNBUNDLED
auto free_space = Poco::File(params.tmp_path).freeSpace();
if (current_memory_usage + params.min_free_disk_space > free_space)
throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
writeToTemporaryFile(result);
}

View File

@ -39,6 +39,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
extern const int NOT_ENOUGH_SPACE;
}
class IBlockOutputStream;
@ -796,6 +797,7 @@ public:
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
const size_t min_free_disk_space;
Params(
const Block & src_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
@ -803,21 +805,23 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
const std::string & tmp_path_, size_t max_threads_)
const std::string & tmp_path_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_path(tmp_path_), max_threads(max_threads_)
tmp_path(tmp_path_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_)
{
}
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_, 0)
{
intermediate_header = intermediate_header_;
}

View File

@ -7,7 +7,6 @@
#include <Parsers/ASTSelectQuery.h>
#include <Storages/IStorage.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
@ -102,22 +101,6 @@ std::unordered_map<String, String> AnalyzedJoin::getOriginalColumnsMap(const Nam
return out;
}
void AnalyzedJoin::calculateAvailableJoinedColumns(bool make_nullable)
{
if (!make_nullable)
{
available_joined_columns = columns_from_joined_table;
return;
}
for (auto & column : columns_from_joined_table)
{
auto type = column.type->canBeInsideNullable() ? makeNullable(column.type) : column.type;
available_joined_columns.emplace_back(NameAndTypePair(column.name, std::move(type)));
}
}
NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpression & table_expression, const Context & context)
{
NamesAndTypesList names_and_type_list;

View File

@ -42,8 +42,6 @@ private:
/// All columns which can be read from joined table. Duplicating names are qualified.
NamesAndTypesList columns_from_joined_table;
/// Columns from joined table which may be added to block. It's columns_from_joined_table with possibly modified types.
NamesAndTypesList available_joined_columns;
/// Name -> original name. Names are the same as in columns_from_joined_table list.
std::unordered_map<String, String> original_names;
/// Original name -> name. Only ranamed columns.
@ -61,7 +59,6 @@ public:
std::unordered_map<String, String> getOriginalColumnsMap(const NameSet & required_columns) const;
void deduplicateAndQualifyColumnNames(const NameSet & left_table_columns, const String & right_table_prefix);
void calculateAvailableJoinedColumns(bool make_nullable);
size_t rightKeyInclusion(const String & name) const;
};

View File

@ -278,8 +278,8 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings,
case JOIN:
{
bool is_null_used_as_default = settings.join_use_nulls;
bool right_or_full_join = join_kind == ASTTableJoin::Kind::Right || join_kind == ASTTableJoin::Kind::Full;
bool left_or_full_join = join_kind == ASTTableJoin::Kind::Left || join_kind == ASTTableJoin::Kind::Full;
bool right_or_full_join = isRightOrFull(join_kind);
bool left_or_full_join = isLeftOrFull(join_kind);
for (auto & col : sample_block)
{

View File

@ -1657,7 +1657,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
@ -1723,7 +1723,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
@ -1943,7 +1943,7 @@ void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificato
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
if (modificator == Modificator::ROLLUP)
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
@ -1972,7 +1972,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
@ -2073,7 +2073,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr so
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context.getTemporaryPath());
settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
}
}
@ -2111,7 +2111,7 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoP
return std::make_shared<MergeSortingTransform>(
header, order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context.getTemporaryPath());
settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
});
}

View File

@ -30,6 +30,7 @@
#include <Parsers/queryToString.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/WriteHelpers.h>
#include <Storages/IStorage.h>
@ -488,13 +489,14 @@ void getArrayJoinedColumns(ASTPtr & query, SyntaxAnalyzerResult & result, const
}
}
void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_default_strictness)
void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_default_strictness, ASTTableJoin::Kind & join_kind)
{
const ASTTablesInSelectQueryElement * node = select_query.join();
if (!node)
return;
auto & table_join = const_cast<ASTTablesInSelectQueryElement *>(node)->table_join->as<ASTTableJoin &>();
join_kind = table_join.kind;
if (table_join.strictness == ASTTableJoin::Strictness::Unspecified &&
table_join.kind != ASTTableJoin::Kind::Cross)
@ -511,7 +513,7 @@ void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_defaul
/// Find the columns that are obtained by JOIN.
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query, const NameSet & source_columns,
const Aliases & aliases, bool join_use_nulls)
const Aliases & aliases)
{
const ASTTablesInSelectQueryElement * node = select_query.join();
if (!node)
@ -537,10 +539,6 @@ void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & s
if (is_asof)
data.asofToJoinKeys();
}
bool make_nullable = join_use_nulls && isLeftOrFull(table_join.kind);
analyzed_join.calculateAvailableJoinedColumns(make_nullable);
}
void replaceJoinedTable(const ASTTablesInSelectQueryElement* join)
@ -611,7 +609,8 @@ std::vector<const ASTFunction *> getAggregates(const ASTPtr & query)
/// Calculate which columns are required to execute the expression.
/// Then, delete all other columns from the list of available columns.
/// After execution, columns will only contain the list of columns needed to read from the table.
void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns)
void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns,
bool make_joined_columns_nullable)
{
/// We caclulate required_source_columns with source_columns modifications and swap them on exit
required_source_columns = source_columns;
@ -639,7 +638,7 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA
/// Add columns obtained by JOIN (if needed).
columns_added_by_join.clear();
for (const auto & joined_column : analyzed_join.available_joined_columns)
for (const auto & joined_column : analyzed_join.columns_from_joined_table)
{
auto & name = joined_column.name;
if (avaliable_columns.count(name))
@ -649,7 +648,15 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA
{
/// Optimisation: do not add columns needed only in JOIN ON section.
if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name))
columns_added_by_join.push_back(joined_column);
{
if (make_joined_columns_nullable)
{
auto type = joined_column.type->canBeInsideNullable() ? makeNullable(joined_column.type) : joined_column.type;
columns_added_by_join.emplace_back(NameAndTypePair(joined_column.name, std::move(type)));
}
else
columns_added_by_join.push_back(joined_column);
}
required.erase(name);
}
}
@ -759,7 +766,7 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA
if (columns_context.has_table_join)
{
ss << ", joined columns:";
for (const auto & column : analyzed_join.available_joined_columns)
for (const auto & column : analyzed_join.columns_from_joined_table)
ss << " '" << column.name << "'";
}
@ -865,6 +872,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
/// Optimize if with constant condition after constants was substituted instead of scalar subqueries.
OptimizeIfWithConstantConditionVisitor(result.aliases).visit(query);
bool make_joined_columns_nullable = false;
if (select_query)
{
/// GROUP BY injective function elimination.
@ -885,12 +893,15 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
/// Push the predicate expression down to the subqueries.
result.rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings, context).optimize();
setJoinStrictness(*select_query, settings.join_default_strictness);
collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases, settings.join_use_nulls);
ASTTableJoin::Kind join_kind = ASTTableJoin::Kind::Comma;
setJoinStrictness(*select_query, settings.join_default_strictness, join_kind);
make_joined_columns_nullable = settings.join_use_nulls && isLeftOrFull(join_kind);
collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases);
}
result.aggregates = getAggregates(query);
result.collectUsedColumns(query, additional_source_columns);
result.collectUsedColumns(query, additional_source_columns, make_joined_columns_nullable);
return std::make_shared<const SyntaxAnalyzerResult>(result);
}

View File

@ -20,7 +20,7 @@ struct SyntaxAnalyzerResult
NamesAndTypesList source_columns;
/// Set of columns that are enough to read from the table to evaluate the expression. It does not include joined columns.
NamesAndTypesList required_source_columns;
/// Columns will be added to block by JOIN. It's a subset of analyzed_join.available_joined_columns
/// Columns will be added to block by JOIN. It's a subset of analyzed_join.columns_from_joined_table with corrected Nullability
NamesAndTypesList columns_added_by_join;
Aliases aliases;
@ -42,7 +42,7 @@ struct SyntaxAnalyzerResult
/// Predicate optimizer overrides the sub queries
bool rewrite_subqueries = false;
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns);
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns, bool make_joined_columns_nullable);
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
};

View File

@ -79,7 +79,7 @@ int main(int argc, char ** argv)
Aggregator::Params params(
stream->getHeader(), {0, 1}, aggregate_descriptions,
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1);
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1, 0);
Aggregator aggregator(params);

View File

@ -5,6 +5,7 @@
#include <Common/formatReadable.h>
#include <Common/ProfileEvents.h>
#include <common/config_common.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
@ -236,11 +237,13 @@ MergeSortingTransform::MergeSortingTransform(
SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t min_free_disk_space_)
: IProcessor({header}, {header})
, description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
, min_free_disk_space(min_free_disk_space_)
{
auto & sample = inputs.front().getHeader();
@ -504,6 +507,12 @@ void MergeSortingTransform::consume(Chunk chunk)
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
#if !UNBUNDLED
auto free_space = Poco::File(tmp_path).freeSpace();
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();

View File

@ -14,6 +14,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
class MergeSorter;
class MergeSortingTransform : public IProcessor
@ -24,7 +28,8 @@ public:
SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t min_free_disk_space_);
~MergeSortingTransform() override;
@ -44,6 +49,7 @@ private:
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingBlockInputStream");

View File

@ -229,7 +229,8 @@ try
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
cur_path, /// tmp_path
1 /// max_threads
1, /// max_threads
0
);
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
@ -301,7 +302,8 @@ try
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
cur_path, /// tmp_path
1 /// max_threads
1, /// max_threads
0
);
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);

View File

@ -133,7 +133,7 @@ try
SortDescription description = {{0, 1, 1}};
auto transform = std::make_shared<MergeSortingTransform>(
source->getPort().getHeader(), description,
max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".");
max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".", 0);
auto sink = std::make_shared<CheckSortedSink>();
connect(source->getPort(), transform->getInputs().front());

View File

@ -76,3 +76,14 @@ TEST(TransformQueryForExternalDatabase, Substring)
"SELECT \"column\" FROM \"test\".\"table\"",
state().context, state().columns);
}
TEST(TransformQueryForExternalDatabase, MultipleAndSubqueries)
{
check("SELECT column FROM test.table WHERE 1 = 1 AND toString(column) = '42' AND column = 42 AND left(column, 10) = RIGHT(column, 10) AND column IN (1, 42) AND SUBSTRING(column FROM 1 FOR 2) = 'Hello' AND column != 4",
"SELECT \"column\" FROM \"test\".\"table\" WHERE 1 AND (\"column\" = 42) AND (\"column\" IN (1, 42)) AND (\"column\" != 4)",
state().context, state().columns);
check("SELECT column FROM test.table WHERE toString(column) = '42' AND left(column, 10) = RIGHT(column, 10) AND column = 42",
"SELECT \"column\" FROM \"test\".\"table\" WHERE (\"column\" = 42)",
state().context, state().columns);
}

View File

@ -141,19 +141,17 @@ String transformQueryForExternalDatabase(
if (function->name == "and")
{
bool compatible_found = false;
auto new_function_and = std::make_shared<ASTFunction>();
auto new_function_and_arguments = std::make_shared<ASTExpressionList>();
new_function_and->arguments = new_function_and_arguments;
new_function_and->children.push_back(new_function_and_arguments);
auto new_function_and = makeASTFunction("and");
for (const auto & elem : function->arguments->children)
{
if (isCompatible(*elem))
{
new_function_and_arguments->children.push_back(elem);
new_function_and->arguments->children.push_back(elem);
compatible_found = true;
}
}
if (new_function_and->arguments->children.size() == 1)
new_function_and->name = "";
if (compatible_found)
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(new_function_and));

View File

@ -0,0 +1 @@
SELECT DISTINCT description LIKE '"%"' FROM system.settings;

View File

@ -95,7 +95,7 @@ Configuration example:
The dictionary is stored in memory in the form of a hash table with an ordered array of ranges and their corresponding values.
This storage method works the same way as hashed and allows using date/time ranges in addition to the key, if they appear in the dictionary.
This storage method works the same way as hashed and allows using date/time (arbitrary numeric type) ranges in addition to the key.
Example: The table contains discounts for each advertiser in the format:
@ -111,7 +111,7 @@ Example: The table contains discounts for each advertiser in the format:
+---------------+---------------------+-------------------+--------+
```
To use a sample for date ranges, define the `range_min` and `range_max` elements in the [structure](external_dicts_dict_structure.md).
To use a sample for date ranges, define the `range_min` and `range_max` elements in the [structure](external_dicts_dict_structure.md). These elements must contain elements `name` and` type` (if `type` is not specified, the default type will be used - Date). `type` can be any numeric type (Date / DateTime / UInt64 / Int32 / others).
Example:
@ -122,14 +122,16 @@ Example:
</id>
<range_min>
<name>first</name>
<type>Date</type>
</range_min>
<range_max>
<name>last</name>
<type>Date</type>
</range_max>
...
```
To work with these dictionaries, you need to pass an additional date argument to the `dictGetT` function:
To work with these dictionaries, you need to pass an additional argument to the `dictGetT` function, for which a range is selected:
```
dictGetT('dict_name', 'attr_name', id, date)
@ -160,10 +162,12 @@ Configuration example:
<name>Abcdef</name>
</id>
<range_min>
<name>StartDate</name>
<name>StartTimeStamp</name>
<type>UInt64</type>
</range_min>
<range_max>
<name>EndDate</name>
<name>EndTimeStamp</name>
<type>UInt64</type>
</range_max>
<attribute>
<name>XXXType</name>

View File

@ -95,7 +95,7 @@
Словарь хранится в оперативной памяти в виде хэш-таблицы с упорядоченным массивом диапазонов и соответствующих им значений.
Этот способ размещения работает также как и hashed и позволяет дополнительно к ключу использовать дипазоны по дате/времени, если они указаны в словаре.
Этот способ размещения работает также как и hashed и позволяет дополнительно к ключу использовать дипазоны по дате/времени (произвольному числовому типу).
Пример: таблица содержит скидки для каждого рекламодателя в виде:
@ -111,7 +111,7 @@
+---------------+---------------------+-------------------+--------+
```
Чтобы использовать выборку по диапазонам дат, необходимо в [structure](external_dicts_dict_structure.md) определить элементы `range_min`, `range_max`.
Чтобы использовать выборку по диапазонам дат, необходимо в [structure](external_dicts_dict_structure.md) определить элементы `range_min`, `range_max`. В этих элементах должны присутствовать элементы `name` и `type` (если `type` не указан, будет использован тип по умолчанию -- Date). `type` может быть любым численным типом (Date/DateTime/UInt64/Int32/др.).
Пример:
@ -122,14 +122,16 @@
</id>
<range_min>
<name>first</name>
<type>Date</type>
</range_min>
<range_max>
<name>last</name>
<type>Date</type>
</range_max>
...
```
Для работы с такими словарями в функцию `dictGetT` необходимо передавать дополнительный аргумент - дату: :
Для работы с такими словарями в функцию `dictGetT` необходимо передавать дополнительный аргумент, для которого подбирается диапазон:
dictGetT('dict_name', 'attr_name', id, date)
@ -158,10 +160,12 @@
<name>Abcdef</name>
</id>
<range_min>
<name>StartDate</name>
<name>StartTimeStamp</name>
<type>UInt64</type>
</range_min>
<range_max>
<name>EndDate</name>
<name>EndTimeStamp</name>
<type>UInt64</type>
</range_max>
<attribute>
<name>XXXType</name>