mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-10-10 10:30:51 +00:00
Merge remote-tracking branch 'upstream/master' into fix25
This commit is contained in:
commit
8d884bc962
@ -32,6 +32,7 @@
|
|||||||
#include <Client/Connection.h>
|
#include <Client/Connection.h>
|
||||||
#include <Common/InterruptListener.h>
|
#include <Common/InterruptListener.h>
|
||||||
#include <Common/Config/configReadClient.h>
|
#include <Common/Config/configReadClient.h>
|
||||||
|
#include <Common/StudentTTest.h>
|
||||||
|
|
||||||
|
|
||||||
/** A tool for evaluating ClickHouse performance.
|
/** A tool for evaluating ClickHouse performance.
|
||||||
@ -41,6 +42,8 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
using Ports = std::vector<UInt16>;
|
||||||
|
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
@ -50,17 +53,34 @@ namespace ErrorCodes
|
|||||||
class Benchmark : public Poco::Util::Application
|
class Benchmark : public Poco::Util::Application
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
Benchmark(unsigned concurrency_, double delay_,
|
Benchmark(unsigned concurrency_, double delay_, Strings && hosts_, Ports && ports_,
|
||||||
const String & host_, UInt16 port_, bool secure_, const String & default_database_,
|
bool cumulative_, bool secure_, const String & default_database_,
|
||||||
const String & user_, const String & password_, const String & stage,
|
const String & user_, const String & password_, const String & stage,
|
||||||
bool randomize_, size_t max_iterations_, double max_time_,
|
bool randomize_, size_t max_iterations_, double max_time_,
|
||||||
const String & json_path_, const Settings & settings_)
|
const String & json_path_, size_t confidence_, const Settings & settings_)
|
||||||
:
|
:
|
||||||
concurrency(concurrency_), delay(delay_), queue(concurrency),
|
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
|
||||||
connections(concurrency, host_, port_, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable),
|
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
|
||||||
randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_),
|
confidence(confidence_), json_path(json_path_), settings(settings_),
|
||||||
json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency)
|
global_context(Context::createGlobal()), pool(concurrency)
|
||||||
{
|
{
|
||||||
|
const auto secure = secure_ ? Protocol::Secure::Enable : Protocol::Secure::Disable;
|
||||||
|
size_t connections_cnt = std::max(ports_.size(), hosts_.size());
|
||||||
|
|
||||||
|
connections.reserve(connections_cnt);
|
||||||
|
comparison_info_total.reserve(connections_cnt);
|
||||||
|
comparison_info_per_interval.reserve(connections_cnt);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < connections_cnt; ++i)
|
||||||
|
{
|
||||||
|
UInt16 cur_port = i >= ports_.size() ? 9000 : ports_[i];
|
||||||
|
std::string cur_host = i >= hosts_.size() ? "localhost" : hosts_[i];
|
||||||
|
|
||||||
|
connections.emplace_back(std::make_unique<ConnectionPool>(concurrency, cur_host, cur_port, default_database_, user_, password_, "benchmark", Protocol::Compression::Enable, secure));
|
||||||
|
comparison_info_per_interval.emplace_back(std::make_shared<Stats>());
|
||||||
|
comparison_info_total.emplace_back(std::make_shared<Stats>());
|
||||||
|
}
|
||||||
|
|
||||||
global_context.makeGlobalContext();
|
global_context.makeGlobalContext();
|
||||||
|
|
||||||
std::cerr << std::fixed << std::setprecision(3);
|
std::cerr << std::fixed << std::setprecision(3);
|
||||||
@ -101,21 +121,29 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using Query = std::string;
|
using Entry = ConnectionPool::Entry;
|
||||||
|
using EntryPtr = std::shared_ptr<Entry>;
|
||||||
|
using EntryPtrs = std::vector<EntryPtr>;
|
||||||
|
|
||||||
unsigned concurrency;
|
unsigned concurrency;
|
||||||
double delay;
|
double delay;
|
||||||
|
|
||||||
|
using Query = std::string;
|
||||||
using Queries = std::vector<Query>;
|
using Queries = std::vector<Query>;
|
||||||
Queries queries;
|
Queries queries;
|
||||||
|
|
||||||
using Queue = ConcurrentBoundedQueue<Query>;
|
using Queue = ConcurrentBoundedQueue<Query>;
|
||||||
Queue queue;
|
Queue queue;
|
||||||
|
|
||||||
ConnectionPool connections;
|
using ConnectionPoolUniq = std::unique_ptr<ConnectionPool>;
|
||||||
|
using ConnectionPoolUniqs = std::vector<ConnectionPoolUniq>;
|
||||||
|
ConnectionPoolUniqs connections;
|
||||||
|
|
||||||
bool randomize;
|
bool randomize;
|
||||||
|
bool cumulative;
|
||||||
size_t max_iterations;
|
size_t max_iterations;
|
||||||
double max_time;
|
double max_time;
|
||||||
|
size_t confidence;
|
||||||
String json_path;
|
String json_path;
|
||||||
Settings settings;
|
Settings settings;
|
||||||
Context global_context;
|
Context global_context;
|
||||||
@ -128,12 +156,12 @@ private:
|
|||||||
|
|
||||||
struct Stats
|
struct Stats
|
||||||
{
|
{
|
||||||
Stopwatch watch;
|
|
||||||
std::atomic<size_t> queries{0};
|
std::atomic<size_t> queries{0};
|
||||||
size_t read_rows = 0;
|
size_t read_rows = 0;
|
||||||
size_t read_bytes = 0;
|
size_t read_bytes = 0;
|
||||||
size_t result_rows = 0;
|
size_t result_rows = 0;
|
||||||
size_t result_bytes = 0;
|
size_t result_bytes = 0;
|
||||||
|
double work_time = 0;
|
||||||
|
|
||||||
using Sampler = ReservoirSampler<double>;
|
using Sampler = ReservoirSampler<double>;
|
||||||
Sampler sampler {1 << 16};
|
Sampler sampler {1 << 16};
|
||||||
@ -141,6 +169,7 @@ private:
|
|||||||
void add(double seconds, size_t read_rows_inc, size_t read_bytes_inc, size_t result_rows_inc, size_t result_bytes_inc)
|
void add(double seconds, size_t read_rows_inc, size_t read_bytes_inc, size_t result_rows_inc, size_t result_bytes_inc)
|
||||||
{
|
{
|
||||||
++queries;
|
++queries;
|
||||||
|
work_time += seconds;
|
||||||
read_rows += read_rows_inc;
|
read_rows += read_rows_inc;
|
||||||
read_bytes += read_bytes_inc;
|
read_bytes += read_bytes_inc;
|
||||||
result_rows += result_rows_inc;
|
result_rows += result_rows_inc;
|
||||||
@ -150,8 +179,8 @@ private:
|
|||||||
|
|
||||||
void clear()
|
void clear()
|
||||||
{
|
{
|
||||||
watch.restart();
|
|
||||||
queries = 0;
|
queries = 0;
|
||||||
|
work_time = 0;
|
||||||
read_rows = 0;
|
read_rows = 0;
|
||||||
read_bytes = 0;
|
read_bytes = 0;
|
||||||
result_rows = 0;
|
result_rows = 0;
|
||||||
@ -160,15 +189,18 @@ private:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Stats info_per_interval;
|
using MultiStats = std::vector<std::shared_ptr<Stats>>;
|
||||||
Stats info_total;
|
MultiStats comparison_info_per_interval;
|
||||||
|
MultiStats comparison_info_total;
|
||||||
|
StudentTTest t_test;
|
||||||
|
|
||||||
|
Stopwatch total_watch;
|
||||||
Stopwatch delay_watch;
|
Stopwatch delay_watch;
|
||||||
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
|
|
||||||
ThreadPool pool;
|
ThreadPool pool;
|
||||||
|
|
||||||
|
|
||||||
void readQueries()
|
void readQueries()
|
||||||
{
|
{
|
||||||
ReadBufferFromFileDescriptor in(STDIN_FILENO);
|
ReadBufferFromFileDescriptor in(STDIN_FILENO);
|
||||||
@ -213,7 +245,7 @@ private:
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (max_time > 0 && info_total.watch.elapsedSeconds() >= max_time)
|
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
|
||||||
{
|
{
|
||||||
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
|
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
|
||||||
return false;
|
return false;
|
||||||
@ -227,8 +259,8 @@ private:
|
|||||||
|
|
||||||
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
|
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
|
||||||
{
|
{
|
||||||
printNumberOfQueriesExecuted(info_total.queries);
|
printNumberOfQueriesExecuted(queries_executed);
|
||||||
report(info_per_interval);
|
cumulative ? report(comparison_info_total) : report(comparison_info_per_interval);
|
||||||
delay_watch.restart();
|
delay_watch.restart();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -242,11 +274,17 @@ private:
|
|||||||
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
|
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
|
||||||
|
|
||||||
for (size_t i = 0; i < concurrency; ++i)
|
for (size_t i = 0; i < concurrency; ++i)
|
||||||
pool.schedule(std::bind(&Benchmark::thread, this,
|
{
|
||||||
connections.get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
|
EntryPtrs connection_entries;
|
||||||
|
connection_entries.reserve(connections.size());
|
||||||
|
|
||||||
|
for (const auto & connection : connections)
|
||||||
|
connection_entries.emplace_back(std::make_shared<Entry>(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
|
||||||
|
|
||||||
|
pool.schedule(std::bind(&Benchmark::thread, this, connection_entries));
|
||||||
|
}
|
||||||
|
|
||||||
InterruptListener interrupt_listener;
|
InterruptListener interrupt_listener;
|
||||||
info_per_interval.watch.restart();
|
|
||||||
delay_watch.restart();
|
delay_watch.restart();
|
||||||
|
|
||||||
/// Push queries into queue
|
/// Push queries into queue
|
||||||
@ -262,20 +300,24 @@ private:
|
|||||||
}
|
}
|
||||||
|
|
||||||
pool.wait();
|
pool.wait();
|
||||||
info_total.watch.stop();
|
total_watch.stop();
|
||||||
|
|
||||||
if (!json_path.empty())
|
if (!json_path.empty())
|
||||||
reportJSON(info_total, json_path);
|
reportJSON(comparison_info_total, json_path);
|
||||||
|
|
||||||
printNumberOfQueriesExecuted(info_total.queries);
|
printNumberOfQueriesExecuted(queries_executed);
|
||||||
report(info_total);
|
report(comparison_info_total);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void thread(ConnectionPool::Entry connection)
|
void thread(EntryPtrs & connection_entries)
|
||||||
{
|
{
|
||||||
Query query;
|
Query query;
|
||||||
|
|
||||||
|
/// Randomly choosing connection index
|
||||||
|
pcg64 generator(randomSeed());
|
||||||
|
std::uniform_int_distribution<size_t> distribution(0, connection_entries.size() - 1);
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
/// In these threads we do not accept INT signal.
|
/// In these threads we do not accept INT signal.
|
||||||
@ -296,8 +338,7 @@ private:
|
|||||||
if (shutdown || (max_iterations && queries_executed == max_iterations))
|
if (shutdown || (max_iterations && queries_executed == max_iterations))
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
execute(connection_entries, query, distribution(generator));
|
||||||
execute(connection, query);
|
|
||||||
++queries_executed;
|
++queries_executed;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -309,20 +350,19 @@ private:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void execute(EntryPtrs & connection_entries, Query & query, size_t connection_index)
|
||||||
void execute(ConnectionPool::Entry & connection, Query & query)
|
|
||||||
{
|
{
|
||||||
Stopwatch watch;
|
Stopwatch watch;
|
||||||
RemoteBlockInputStream stream(
|
RemoteBlockInputStream stream(
|
||||||
*connection,
|
*(*connection_entries[connection_index]),
|
||||||
query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage);
|
query, {}, global_context, &settings, nullptr, Tables(), query_processing_stage);
|
||||||
|
|
||||||
Progress progress;
|
Progress progress;
|
||||||
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
|
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
|
||||||
|
|
||||||
stream.readPrefix();
|
stream.readPrefix();
|
||||||
while (Block block = stream.read())
|
while (Block block = stream.read());
|
||||||
;
|
|
||||||
stream.readSuffix();
|
stream.readSuffix();
|
||||||
|
|
||||||
const BlockStreamProfileInfo & info = stream.getProfileInfo();
|
const BlockStreamProfileInfo & info = stream.getProfileInfo();
|
||||||
@ -330,33 +370,47 @@ private:
|
|||||||
double seconds = watch.elapsedSeconds();
|
double seconds = watch.elapsedSeconds();
|
||||||
|
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
info_per_interval.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
|
|
||||||
info_total.add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
|
comparison_info_per_interval[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
|
||||||
|
comparison_info_total[connection_index]->add(seconds, progress.read_rows, progress.read_bytes, info.rows, info.bytes);
|
||||||
|
t_test.add(connection_index, seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void report(MultiStats & infos)
|
||||||
void report(Stats & info)
|
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
|
std::cerr << "\n";
|
||||||
|
for (size_t i = 0; i < infos.size(); ++i)
|
||||||
|
{
|
||||||
|
const auto & info = infos[i];
|
||||||
|
|
||||||
/// Avoid zeros, nans or exceptions
|
/// Avoid zeros, nans or exceptions
|
||||||
if (0 == info.queries)
|
if (0 == info->queries)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
double seconds = info.watch.elapsedSeconds();
|
double seconds = info->work_time / concurrency;
|
||||||
|
|
||||||
std::cerr
|
std::cerr
|
||||||
<< "\n"
|
<< connections[i]->getDescription() << ", "
|
||||||
<< "QPS: " << (info.queries / seconds) << ", "
|
<< "queries " << info->queries << ", "
|
||||||
<< "RPS: " << (info.read_rows / seconds) << ", "
|
<< "QPS: " << (info->queries / seconds) << ", "
|
||||||
<< "MiB/s: " << (info.read_bytes / seconds / 1048576) << ", "
|
<< "RPS: " << (info->read_rows / seconds) << ", "
|
||||||
<< "result RPS: " << (info.result_rows / seconds) << ", "
|
<< "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", "
|
||||||
<< "result MiB/s: " << (info.result_bytes / seconds / 1048576) << "."
|
<< "result RPS: " << (info->result_rows / seconds) << ", "
|
||||||
|
<< "result MiB/s: " << (info->result_bytes / seconds / 1048576) << "."
|
||||||
<< "\n";
|
<< "\n";
|
||||||
|
}
|
||||||
|
std::cerr << "\n";
|
||||||
|
|
||||||
auto print_percentile = [&](double percent)
|
auto print_percentile = [&](double percent)
|
||||||
{
|
{
|
||||||
std::cerr << percent << "%\t" << info.sampler.quantileInterpolated(percent / 100.0) << " sec." << std::endl;
|
std::cerr << percent << "%\t\t";
|
||||||
|
for (const auto & info : infos)
|
||||||
|
{
|
||||||
|
std::cerr << info->sampler.quantileInterpolated(percent / 100.0) << " sec." << "\t";
|
||||||
|
}
|
||||||
|
std::cerr << "\n";
|
||||||
};
|
};
|
||||||
|
|
||||||
for (int percent = 0; percent <= 90; percent += 10)
|
for (int percent = 0; percent <= 90; percent += 10)
|
||||||
@ -367,10 +421,16 @@ private:
|
|||||||
print_percentile(99.9);
|
print_percentile(99.9);
|
||||||
print_percentile(99.99);
|
print_percentile(99.99);
|
||||||
|
|
||||||
info.clear();
|
std::cerr << "\n" << t_test.compareAndReport(confidence).second << "\n";
|
||||||
|
|
||||||
|
if (!cumulative)
|
||||||
|
{
|
||||||
|
for (auto & info : infos)
|
||||||
|
info->clear();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void reportJSON(Stats & info, const std::string & filename)
|
void reportJSON(MultiStats & infos, const std::string & filename)
|
||||||
{
|
{
|
||||||
WriteBufferFromFile json_out(filename);
|
WriteBufferFromFile json_out(filename);
|
||||||
|
|
||||||
@ -381,36 +441,41 @@ private:
|
|||||||
json_out << double_quote << key << ": " << value << (with_comma ? ",\n" : "\n");
|
json_out << double_quote << key << ": " << value << (with_comma ? ",\n" : "\n");
|
||||||
};
|
};
|
||||||
|
|
||||||
auto print_percentile = [&json_out, &info](auto percent, bool with_comma = true)
|
auto print_percentile = [&json_out](Stats & info, auto percent, bool with_comma = true)
|
||||||
{
|
{
|
||||||
json_out << "\"" << percent << "\"" << ": " << info.sampler.quantileInterpolated(percent / 100.0) << (with_comma ? ",\n" : "\n");
|
json_out << "\"" << percent << "\"" << ": " << info.sampler.quantileInterpolated(percent / 100.0) << (with_comma ? ",\n" : "\n");
|
||||||
};
|
};
|
||||||
|
|
||||||
json_out << "{\n";
|
json_out << "{\n";
|
||||||
|
|
||||||
|
for (size_t i = 0; i < infos.size(); ++i)
|
||||||
|
{
|
||||||
|
const auto & info = infos[i];
|
||||||
|
|
||||||
|
json_out << double_quote << connections[i]->getDescription() << ": {\n";
|
||||||
json_out << double_quote << "statistics" << ": {\n";
|
json_out << double_quote << "statistics" << ": {\n";
|
||||||
|
|
||||||
double seconds = info.watch.elapsedSeconds();
|
print_key_value("QPS", info->queries / info->work_time);
|
||||||
print_key_value("QPS", info.queries / seconds);
|
print_key_value("RPS", info->read_rows / info->work_time);
|
||||||
print_key_value("RPS", info.read_rows / seconds);
|
print_key_value("MiBPS", info->read_bytes / info->work_time);
|
||||||
print_key_value("MiBPS", info.read_bytes / seconds);
|
print_key_value("RPS_result", info->result_rows / info->work_time);
|
||||||
print_key_value("RPS_result", info.result_rows / seconds);
|
print_key_value("MiBPS_result", info->result_bytes / info->work_time);
|
||||||
print_key_value("MiBPS_result", info.result_bytes / seconds);
|
print_key_value("num_queries", info->queries.load(), false);
|
||||||
print_key_value("num_queries", info.queries.load(), false);
|
|
||||||
|
|
||||||
json_out << "},\n";
|
json_out << "},\n";
|
||||||
|
|
||||||
json_out << double_quote << "query_time_percentiles" << ": {\n";
|
json_out << double_quote << "query_time_percentiles" << ": {\n";
|
||||||
|
|
||||||
for (int percent = 0; percent <= 90; percent += 10)
|
for (int percent = 0; percent <= 90; percent += 10)
|
||||||
print_percentile(percent);
|
print_percentile(*info, percent);
|
||||||
|
|
||||||
print_percentile(95);
|
print_percentile(*info, 95);
|
||||||
print_percentile(99);
|
print_percentile(*info, 99);
|
||||||
print_percentile(99.9);
|
print_percentile(*info, 99.9);
|
||||||
print_percentile(99.99, false);
|
print_percentile(*info, 99.99, false);
|
||||||
|
|
||||||
json_out << "}\n";
|
json_out << "}\n";
|
||||||
|
json_out << (i == infos.size() - 1 ? "}\n" : "},\n");
|
||||||
|
}
|
||||||
|
|
||||||
json_out << "}\n";
|
json_out << "}\n";
|
||||||
}
|
}
|
||||||
@ -449,13 +514,15 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
|
|||||||
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
|
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
|
||||||
("randomize,r", value<bool>()->default_value(false), "randomize order of execution")
|
("randomize,r", value<bool>()->default_value(false), "randomize order of execution")
|
||||||
("json", value<std::string>()->default_value(""), "write final report to specified file in JSON format")
|
("json", value<std::string>()->default_value(""), "write final report to specified file in JSON format")
|
||||||
("host,h", value<std::string>()->default_value("localhost"), "")
|
("host,h", value<Strings>()->multitoken(), "")
|
||||||
("port", value<UInt16>()->default_value(9000), "")
|
("port,p", value<Ports>()->multitoken(), "")
|
||||||
|
("cumulative", "prints cumulative data instead of data per interval")
|
||||||
("secure,s", "Use TLS connection")
|
("secure,s", "Use TLS connection")
|
||||||
("user", value<std::string>()->default_value("default"), "")
|
("user", value<std::string>()->default_value("default"), "")
|
||||||
("password", value<std::string>()->default_value(""), "")
|
("password", value<std::string>()->default_value(""), "")
|
||||||
("database", value<std::string>()->default_value("default"), "")
|
("database", value<std::string>()->default_value("default"), "")
|
||||||
("stacktrace", "print stack traces of exceptions")
|
("stacktrace", "print stack traces of exceptions")
|
||||||
|
("confidence", value<size_t>()->default_value(5), "set the level of confidence for T-test [0=80%, 1=90%, 2=95%, 3=98%, 4=99%, 5=99.5%(default)")
|
||||||
;
|
;
|
||||||
|
|
||||||
Settings settings;
|
Settings settings;
|
||||||
@ -475,12 +542,15 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
|
|||||||
print_stacktrace = options.count("stacktrace");
|
print_stacktrace = options.count("stacktrace");
|
||||||
|
|
||||||
UseSSL use_ssl;
|
UseSSL use_ssl;
|
||||||
|
Ports ports = options.count("port") ? options["port"].as<Ports>() : Ports({9000});
|
||||||
|
Strings hosts = options.count("host") ? options["host"].as<Strings>() : Strings({"localhost"});
|
||||||
|
|
||||||
Benchmark benchmark(
|
Benchmark benchmark(
|
||||||
options["concurrency"].as<unsigned>(),
|
options["concurrency"].as<unsigned>(),
|
||||||
options["delay"].as<double>(),
|
options["delay"].as<double>(),
|
||||||
options["host"].as<std::string>(),
|
std::move(hosts),
|
||||||
options["port"].as<UInt16>(),
|
std::move(ports),
|
||||||
|
options.count("cumulative"),
|
||||||
options.count("secure"),
|
options.count("secure"),
|
||||||
options["database"].as<std::string>(),
|
options["database"].as<std::string>(),
|
||||||
options["user"].as<std::string>(),
|
options["user"].as<std::string>(),
|
||||||
@ -490,6 +560,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
|
|||||||
options["iterations"].as<size_t>(),
|
options["iterations"].as<size_t>(),
|
||||||
options["timelimit"].as<double>(),
|
options["timelimit"].as<double>(),
|
||||||
options["json"].as<std::string>(),
|
options["json"].as<std::string>(),
|
||||||
|
options["confidence"].as<size_t>(),
|
||||||
settings);
|
settings);
|
||||||
return benchmark.run();
|
return benchmark.run();
|
||||||
}
|
}
|
||||||
|
@ -88,6 +88,10 @@ public:
|
|||||||
{
|
{
|
||||||
return host;
|
return host;
|
||||||
}
|
}
|
||||||
|
std::string getDescription() const
|
||||||
|
{
|
||||||
|
return host + ":" + toString(port);
|
||||||
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
/** Creates a new object to put in the pool. */
|
/** Creates a new object to put in the pool. */
|
||||||
|
169
dbms/src/Common/StudentTTest.cpp
Normal file
169
dbms/src/Common/StudentTTest.cpp
Normal file
@ -0,0 +1,169 @@
|
|||||||
|
#include "StudentTTest.h"
|
||||||
|
|
||||||
|
#include <cmath>
|
||||||
|
#include <iostream>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <sstream>
|
||||||
|
#include <stdexcept>
|
||||||
|
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
/// First row corresponds to infinity size of distributions case
|
||||||
|
const double students_table[101][6] =
|
||||||
|
{
|
||||||
|
{ 1.282, 1.645, 1.960, 2.326, 2.576, 3.090 },
|
||||||
|
{ 3.078, 6.314, 12.706, 31.821, 63.657, 318.313 },
|
||||||
|
{ 1.886, 2.920, 4.303, 6.965, 9.925, 22.327 },
|
||||||
|
{ 1.638, 2.353, 3.182, 4.541, 5.841, 10.215 },
|
||||||
|
{ 1.533, 2.132, 2.776, 3.747, 4.604, 7.173 },
|
||||||
|
{ 1.476, 2.015, 2.571, 3.365, 4.032, 5.893 },
|
||||||
|
{ 1.440, 1.943, 2.447, 3.143, 3.707, 5.208 },
|
||||||
|
{ 1.415, 1.895, 2.365, 2.998, 3.499, 4.782 },
|
||||||
|
{ 1.397, 1.860, 2.306, 2.896, 3.355, 4.499 },
|
||||||
|
{ 1.383, 1.833, 2.262, 2.821, 3.250, 4.296 },
|
||||||
|
{ 1.372, 1.812, 2.228, 2.764, 3.169, 4.143 },
|
||||||
|
{ 1.363, 1.796, 2.201, 2.718, 3.106, 4.024 },
|
||||||
|
{ 1.356, 1.782, 2.179, 2.681, 3.055, 3.929 },
|
||||||
|
{ 1.350, 1.771, 2.160, 2.650, 3.012, 3.852 },
|
||||||
|
{ 1.345, 1.761, 2.145, 2.624, 2.977, 3.787 },
|
||||||
|
{ 1.341, 1.753, 2.131, 2.602, 2.947, 3.733 },
|
||||||
|
{ 1.337, 1.746, 2.120, 2.583, 2.921, 3.686 },
|
||||||
|
{ 1.333, 1.740, 2.110, 2.567, 2.898, 3.646 },
|
||||||
|
{ 1.330, 1.734, 2.101, 2.552, 2.878, 3.610 },
|
||||||
|
{ 1.328, 1.729, 2.093, 2.539, 2.861, 3.579 },
|
||||||
|
{ 1.325, 1.725, 2.086, 2.528, 2.845, 3.552 },
|
||||||
|
{ 1.323, 1.721, 2.080, 2.518, 2.831, 3.527 },
|
||||||
|
{ 1.321, 1.717, 2.074, 2.508, 2.819, 3.505 },
|
||||||
|
{ 1.319, 1.714, 2.069, 2.500, 2.807, 3.485 },
|
||||||
|
{ 1.318, 1.711, 2.064, 2.492, 2.797, 3.467 },
|
||||||
|
{ 1.316, 1.708, 2.060, 2.485, 2.787, 3.450 },
|
||||||
|
{ 1.315, 1.706, 2.056, 2.479, 2.779, 3.435 },
|
||||||
|
{ 1.314, 1.703, 2.052, 2.473, 2.771, 3.421 },
|
||||||
|
{ 1.313, 1.701, 2.048, 2.467, 2.763, 3.408 },
|
||||||
|
{ 1.311, 1.699, 2.045, 2.462, 2.756, 3.396 },
|
||||||
|
{ 1.310, 1.697, 2.042, 2.457, 2.750, 3.385 },
|
||||||
|
{ 1.309, 1.696, 2.040, 2.453, 2.744, 3.375 },
|
||||||
|
{ 1.309, 1.694, 2.037, 2.449, 2.738, 3.365 },
|
||||||
|
{ 1.308, 1.692, 2.035, 2.445, 2.733, 3.356 },
|
||||||
|
{ 1.307, 1.691, 2.032, 2.441, 2.728, 3.348 },
|
||||||
|
{ 1.306, 1.690, 2.030, 2.438, 2.724, 3.340 },
|
||||||
|
{ 1.306, 1.688, 2.028, 2.434, 2.719, 3.333 },
|
||||||
|
{ 1.305, 1.687, 2.026, 2.431, 2.715, 3.326 },
|
||||||
|
{ 1.304, 1.686, 2.024, 2.429, 2.712, 3.319 },
|
||||||
|
{ 1.304, 1.685, 2.023, 2.426, 2.708, 3.313 },
|
||||||
|
{ 1.303, 1.684, 2.021, 2.423, 2.704, 3.307 },
|
||||||
|
{ 1.303, 1.683, 2.020, 2.421, 2.701, 3.301 },
|
||||||
|
{ 1.302, 1.682, 2.018, 2.418, 2.698, 3.296 },
|
||||||
|
{ 1.302, 1.681, 2.017, 2.416, 2.695, 3.291 },
|
||||||
|
{ 1.301, 1.680, 2.015, 2.414, 2.692, 3.286 },
|
||||||
|
{ 1.301, 1.679, 2.014, 2.412, 2.690, 3.281 },
|
||||||
|
{ 1.300, 1.679, 2.013, 2.410, 2.687, 3.277 },
|
||||||
|
{ 1.300, 1.678, 2.012, 2.408, 2.685, 3.273 },
|
||||||
|
{ 1.299, 1.677, 2.011, 2.407, 2.682, 3.269 },
|
||||||
|
{ 1.299, 1.677, 2.010, 2.405, 2.680, 3.265 },
|
||||||
|
{ 1.299, 1.676, 2.009, 2.403, 2.678, 3.261 },
|
||||||
|
{ 1.298, 1.675, 2.008, 2.402, 2.676, 3.258 },
|
||||||
|
{ 1.298, 1.675, 2.007, 2.400, 2.674, 3.255 },
|
||||||
|
{ 1.298, 1.674, 2.006, 2.399, 2.672, 3.251 },
|
||||||
|
{ 1.297, 1.674, 2.005, 2.397, 2.670, 3.248 },
|
||||||
|
{ 1.297, 1.673, 2.004, 2.396, 2.668, 3.245 },
|
||||||
|
{ 1.297, 1.673, 2.003, 2.395, 2.667, 3.242 },
|
||||||
|
{ 1.297, 1.672, 2.002, 2.394, 2.665, 3.239 },
|
||||||
|
{ 1.296, 1.672, 2.002, 2.392, 2.663, 3.237 },
|
||||||
|
{ 1.296, 1.671, 2.001, 2.391, 2.662, 3.234 },
|
||||||
|
{ 1.296, 1.671, 2.000, 2.390, 2.660, 3.232 },
|
||||||
|
{ 1.296, 1.670, 2.000, 2.389, 2.659, 3.229 },
|
||||||
|
{ 1.295, 1.670, 1.999, 2.388, 2.657, 3.227 },
|
||||||
|
{ 1.295, 1.669, 1.998, 2.387, 2.656, 3.225 },
|
||||||
|
{ 1.295, 1.669, 1.998, 2.386, 2.655, 3.223 },
|
||||||
|
{ 1.295, 1.669, 1.997, 2.385, 2.654, 3.220 },
|
||||||
|
{ 1.295, 1.668, 1.997, 2.384, 2.652, 3.218 },
|
||||||
|
{ 1.294, 1.668, 1.996, 2.383, 2.651, 3.216 },
|
||||||
|
{ 1.294, 1.668, 1.995, 2.382, 2.650, 3.214 },
|
||||||
|
{ 1.294, 1.667, 1.995, 2.382, 2.649, 3.213 },
|
||||||
|
{ 1.294, 1.667, 1.994, 2.381, 2.648, 3.211 },
|
||||||
|
{ 1.294, 1.667, 1.994, 2.380, 2.647, 3.209 },
|
||||||
|
{ 1.293, 1.666, 1.993, 2.379, 2.646, 3.207 },
|
||||||
|
{ 1.293, 1.666, 1.993, 2.379, 2.645, 3.206 },
|
||||||
|
{ 1.293, 1.666, 1.993, 2.378, 2.644, 3.204 },
|
||||||
|
{ 1.293, 1.665, 1.992, 2.377, 2.643, 3.202 },
|
||||||
|
{ 1.293, 1.665, 1.992, 2.376, 2.642, 3.201 },
|
||||||
|
{ 1.293, 1.665, 1.991, 2.376, 2.641, 3.199 },
|
||||||
|
{ 1.292, 1.665, 1.991, 2.375, 2.640, 3.198 },
|
||||||
|
{ 1.292, 1.664, 1.990, 2.374, 2.640, 3.197 },
|
||||||
|
{ 1.292, 1.664, 1.990, 2.374, 2.639, 3.195 },
|
||||||
|
{ 1.292, 1.664, 1.990, 2.373, 2.638, 3.194 },
|
||||||
|
{ 1.292, 1.664, 1.989, 2.373, 2.637, 3.193 },
|
||||||
|
{ 1.292, 1.663, 1.989, 2.372, 2.636, 3.191 },
|
||||||
|
{ 1.292, 1.663, 1.989, 2.372, 2.636, 3.190 },
|
||||||
|
{ 1.292, 1.663, 1.988, 2.371, 2.635, 3.189 },
|
||||||
|
{ 1.291, 1.663, 1.988, 2.370, 2.634, 3.188 },
|
||||||
|
{ 1.291, 1.663, 1.988, 2.370, 2.634, 3.187 },
|
||||||
|
{ 1.291, 1.662, 1.987, 2.369, 2.633, 3.185 },
|
||||||
|
{ 1.291, 1.662, 1.987, 2.369, 2.632, 3.184 },
|
||||||
|
{ 1.291, 1.662, 1.987, 2.368, 2.632, 3.183 },
|
||||||
|
{ 1.291, 1.662, 1.986, 2.368, 2.631, 3.182 },
|
||||||
|
{ 1.291, 1.662, 1.986, 2.368, 2.630, 3.181 },
|
||||||
|
{ 1.291, 1.661, 1.986, 2.367, 2.630, 3.180 },
|
||||||
|
{ 1.291, 1.661, 1.986, 2.367, 2.629, 3.179 },
|
||||||
|
{ 1.291, 1.661, 1.985, 2.366, 2.629, 3.178 },
|
||||||
|
{ 1.290, 1.661, 1.985, 2.366, 2.628, 3.177 },
|
||||||
|
{ 1.290, 1.661, 1.985, 2.365, 2.627, 3.176 },
|
||||||
|
{ 1.290, 1.661, 1.984, 2.365, 2.627, 3.175 },
|
||||||
|
{ 1.290, 1.660, 1.984, 2.365, 2.626, 3.175 },
|
||||||
|
{ 1.290, 1.660, 1.984, 2.364, 2.626, 3.174 },
|
||||||
|
};
|
||||||
|
|
||||||
|
const double confidence_level[6] = { 80, 90, 95, 98, 99, 99.5 };
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void StudentTTest::clear()
|
||||||
|
{
|
||||||
|
data[0].clear();
|
||||||
|
data[1].clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void StudentTTest::add(size_t distribution, double value)
|
||||||
|
{
|
||||||
|
if (distribution > 1)
|
||||||
|
throw std::logic_error("Distribution number for Student's T-Test must be eigther 0 or 1");
|
||||||
|
data[distribution].add(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Confidence_level_index can be set in range [0, 5]. Corresponding values can be found above.
|
||||||
|
std::pair<bool, std::string> StudentTTest::compareAndReport(size_t confidence_level_index) const
|
||||||
|
{
|
||||||
|
if (confidence_level_index > 5)
|
||||||
|
confidence_level_index = 5;
|
||||||
|
|
||||||
|
if (data[0].size == 0 || data[1].size == 0)
|
||||||
|
return {true, ""};
|
||||||
|
|
||||||
|
size_t degrees_of_freedom = (data[0].size - 1) + (data[1].size - 1);
|
||||||
|
|
||||||
|
double table_value = students_table[degrees_of_freedom > 100 ? 0 : degrees_of_freedom][confidence_level_index];
|
||||||
|
|
||||||
|
double pooled_standard_deviation = sqrt(((data[0].size - 1) * data[0].var() + (data[1].size - 1) * data[1].var()) / degrees_of_freedom);
|
||||||
|
|
||||||
|
double t_statistic = pooled_standard_deviation * sqrt(1.0 / data[0].size + 1.0 / data[1].size);
|
||||||
|
|
||||||
|
double mean_difference = fabs(data[0].avg() - data[1].avg());
|
||||||
|
|
||||||
|
double mean_confidence_interval = table_value * t_statistic;
|
||||||
|
|
||||||
|
std::stringstream ss;
|
||||||
|
if (mean_difference > mean_confidence_interval && (mean_difference - mean_confidence_interval > 0.0001)) /// difference must be more than 0.0001, to take into account connection latency.
|
||||||
|
{
|
||||||
|
ss << "Difference at " << confidence_level[confidence_level_index] << "% confidence : ";
|
||||||
|
ss << std::fixed << std::setprecision(8) << "mean difference is " << mean_difference << ", but confidence interval is " << mean_confidence_interval;
|
||||||
|
return {false, ss.str()};
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ss << "No difference proven at " << confidence_level[confidence_level_index] << "% confidence";
|
||||||
|
return {true, ss.str()};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
59
dbms/src/Common/StudentTTest.h
Normal file
59
dbms/src/Common/StudentTTest.h
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <array>
|
||||||
|
#include <string>
|
||||||
|
#include <map>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* About:
|
||||||
|
* This is implementation of Independent two-sample t-test
|
||||||
|
* Read about it on https://en.wikipedia.org/wiki/Student%27s_t-test (Equal or unequal sample sizes, equal variance)
|
||||||
|
*
|
||||||
|
* Usage:
|
||||||
|
* It's it used to assume with some level of confidence that two distributions don't differ.
|
||||||
|
* Values can be added with t_test.add(0/1, value) and after compared and reported with compareAndReport().
|
||||||
|
*/
|
||||||
|
class StudentTTest
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
struct DistributionData
|
||||||
|
{
|
||||||
|
size_t size = 0;
|
||||||
|
double sum = 0;
|
||||||
|
double squares_sum = 0;
|
||||||
|
|
||||||
|
void add(double value)
|
||||||
|
{
|
||||||
|
++size;
|
||||||
|
sum += value;
|
||||||
|
squares_sum += value * value;
|
||||||
|
}
|
||||||
|
|
||||||
|
double avg() const
|
||||||
|
{
|
||||||
|
return sum / size;
|
||||||
|
}
|
||||||
|
|
||||||
|
double var() const
|
||||||
|
{
|
||||||
|
return (squares_sum - (sum * sum / size)) / static_cast<double>(size - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void clear()
|
||||||
|
{
|
||||||
|
size = 0;
|
||||||
|
sum = 0;
|
||||||
|
squares_sum = 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::array<DistributionData, 2> data {};
|
||||||
|
|
||||||
|
public:
|
||||||
|
void clear();
|
||||||
|
|
||||||
|
void add(size_t distribution, double value);
|
||||||
|
|
||||||
|
/// Confidence_level_index can be set in range [0, 5]. Corresponding values can be found above. TODO: Trash - no separation of concepts in code.
|
||||||
|
std::pair<bool, std::string> compareAndReport(size_t confidence_level_index = 5) const;
|
||||||
|
};
|
@ -347,6 +347,7 @@ struct Settings : public SettingsCollection<Settings>
|
|||||||
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.") \
|
M(SettingSeconds, live_view_heartbeat_interval, DEFAULT_LIVE_VIEW_HEARTBEAT_INTERVAL_SEC, "The heartbeat interval in seconds to indicate live query is alive.") \
|
||||||
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.") \
|
M(SettingSeconds, temporary_live_view_timeout, DEFAULT_TEMPORARY_LIVE_VIEW_TIMEOUT_SEC, "Timeout after which temporary live view is deleted.") \
|
||||||
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \
|
M(SettingUInt64, max_live_view_insert_blocks_before_refresh, 64, "Limit maximum number of inserted blocks after which mergeable blocks are dropped and query is re-executed.") \
|
||||||
|
M(SettingUInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.") \
|
||||||
\
|
\
|
||||||
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
|
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
|
||||||
\
|
\
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#include <DataStreams/copyData.h>
|
#include <DataStreams/copyData.h>
|
||||||
#include <DataStreams/processConstants.h>
|
#include <DataStreams/processConstants.h>
|
||||||
#include <Common/formatReadable.h>
|
#include <Common/formatReadable.h>
|
||||||
|
#include <common/config_common.h>
|
||||||
#include <IO/WriteBufferFromFile.h>
|
#include <IO/WriteBufferFromFile.h>
|
||||||
#include <Compression/CompressedWriteBuffer.h>
|
#include <Compression/CompressedWriteBuffer.h>
|
||||||
#include <Interpreters/sortBlock.h>
|
#include <Interpreters/sortBlock.h>
|
||||||
@ -21,10 +22,11 @@ namespace DB
|
|||||||
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
|
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
|
||||||
const BlockInputStreamPtr & input, SortDescription & description_,
|
const BlockInputStreamPtr & input, SortDescription & description_,
|
||||||
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
|
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
|
||||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
|
size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_)
|
||||||
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
|
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
|
||||||
max_bytes_before_remerge(max_bytes_before_remerge_),
|
max_bytes_before_remerge(max_bytes_before_remerge_),
|
||||||
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
|
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_),
|
||||||
|
min_free_disk_space(min_free_disk_space_)
|
||||||
{
|
{
|
||||||
children.push_back(input);
|
children.push_back(input);
|
||||||
header = children.at(0)->getHeader();
|
header = children.at(0)->getHeader();
|
||||||
@ -77,6 +79,12 @@ Block MergeSortingBlockInputStream::readImpl()
|
|||||||
*/
|
*/
|
||||||
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
|
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
|
||||||
{
|
{
|
||||||
|
#if !UNBUNDLED
|
||||||
|
auto free_space = Poco::File(tmp_path).freeSpace();
|
||||||
|
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
|
||||||
|
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
|
||||||
|
#endif
|
||||||
|
|
||||||
Poco::File(tmp_path).createDirectories();
|
Poco::File(tmp_path).createDirectories();
|
||||||
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
|
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
|
||||||
const std::string & path = temporary_files.back()->path();
|
const std::string & path = temporary_files.back()->path();
|
||||||
|
@ -18,6 +18,10 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int NOT_ENOUGH_SPACE;
|
||||||
|
}
|
||||||
/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
|
/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
|
||||||
* If data to sort is too much, could use external sorting, with temporary files.
|
* If data to sort is too much, could use external sorting, with temporary files.
|
||||||
*/
|
*/
|
||||||
@ -73,7 +77,8 @@ public:
|
|||||||
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
|
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
|
||||||
size_t max_merged_block_size_, UInt64 limit_,
|
size_t max_merged_block_size_, UInt64 limit_,
|
||||||
size_t max_bytes_before_remerge_,
|
size_t max_bytes_before_remerge_,
|
||||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
|
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
|
||||||
|
size_t min_free_disk_space_);
|
||||||
|
|
||||||
String getName() const override { return "MergeSorting"; }
|
String getName() const override { return "MergeSorting"; }
|
||||||
|
|
||||||
@ -93,6 +98,7 @@ private:
|
|||||||
size_t max_bytes_before_remerge;
|
size_t max_bytes_before_remerge;
|
||||||
size_t max_bytes_before_external_sort;
|
size_t max_bytes_before_external_sort;
|
||||||
const std::string tmp_path;
|
const std::string tmp_path;
|
||||||
|
size_t min_free_disk_space;
|
||||||
|
|
||||||
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
||||||
|
|
||||||
|
@ -68,23 +68,20 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl)
|
|||||||
}
|
}
|
||||||
|
|
||||||
Block TTLBlockInputStream::readImpl()
|
Block TTLBlockInputStream::readImpl()
|
||||||
{
|
|
||||||
Block block = children.at(0)->read();
|
|
||||||
if (!block)
|
|
||||||
return block;
|
|
||||||
|
|
||||||
if (storage.hasTableTTL())
|
|
||||||
{
|
{
|
||||||
/// Skip all data if table ttl is expired for part
|
/// Skip all data if table ttl is expired for part
|
||||||
if (isTTLExpired(old_ttl_infos.table_ttl.max))
|
if (storage.hasTableTTL() && isTTLExpired(old_ttl_infos.table_ttl.max))
|
||||||
{
|
{
|
||||||
rows_removed = data_part->rows_count;
|
rows_removed = data_part->rows_count;
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
if (force || isTTLExpired(old_ttl_infos.table_ttl.min))
|
Block block = children.at(0)->read();
|
||||||
|
if (!block)
|
||||||
|
return block;
|
||||||
|
|
||||||
|
if (storage.hasTableTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
|
||||||
removeRowsWithExpiredTableTTL(block);
|
removeRowsWithExpiredTableTTL(block);
|
||||||
}
|
|
||||||
|
|
||||||
removeValuesWithExpiredColumnTTL(block);
|
removeValuesWithExpiredColumnTTL(block);
|
||||||
|
|
||||||
@ -94,9 +91,9 @@ Block TTLBlockInputStream::readImpl()
|
|||||||
void TTLBlockInputStream::readSuffixImpl()
|
void TTLBlockInputStream::readSuffixImpl()
|
||||||
{
|
{
|
||||||
for (const auto & elem : new_ttl_infos.columns_ttl)
|
for (const auto & elem : new_ttl_infos.columns_ttl)
|
||||||
new_ttl_infos.updatePartMinTTL(elem.second.min);
|
new_ttl_infos.updatePartMinMaxTTL(elem.second.min, elem.second.max);
|
||||||
|
|
||||||
new_ttl_infos.updatePartMinTTL(new_ttl_infos.table_ttl.min);
|
new_ttl_infos.updatePartMinMaxTTL(new_ttl_infos.table_ttl.min, new_ttl_infos.table_ttl.max);
|
||||||
|
|
||||||
data_part->ttl_infos = std::move(new_ttl_infos);
|
data_part->ttl_infos = std::move(new_ttl_infos);
|
||||||
data_part->empty_columns = std::move(empty_columns);
|
data_part->empty_columns = std::move(empty_columns);
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <Common/assert_cast.h>
|
#include <Common/assert_cast.h>
|
||||||
#include <common/demangle.h>
|
#include <common/demangle.h>
|
||||||
|
#include <common/config_common.h>
|
||||||
|
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
@ -639,6 +640,12 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
|
|||||||
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
|
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
|
||||||
&& worth_convert_to_two_level)
|
&& worth_convert_to_two_level)
|
||||||
{
|
{
|
||||||
|
#if !UNBUNDLED
|
||||||
|
auto free_space = Poco::File(params.tmp_path).freeSpace();
|
||||||
|
if (current_memory_usage + params.min_free_disk_space > free_space)
|
||||||
|
throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
|
||||||
|
#endif
|
||||||
|
|
||||||
writeToTemporaryFile(result);
|
writeToTemporaryFile(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,6 +39,7 @@ namespace DB
|
|||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
|
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
|
||||||
|
extern const int NOT_ENOUGH_SPACE;
|
||||||
}
|
}
|
||||||
|
|
||||||
class IBlockOutputStream;
|
class IBlockOutputStream;
|
||||||
@ -796,6 +797,7 @@ public:
|
|||||||
/// Settings is used to determine cache size. No threads are created.
|
/// Settings is used to determine cache size. No threads are created.
|
||||||
size_t max_threads;
|
size_t max_threads;
|
||||||
|
|
||||||
|
const size_t min_free_disk_space;
|
||||||
Params(
|
Params(
|
||||||
const Block & src_header_,
|
const Block & src_header_,
|
||||||
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
|
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
|
||||||
@ -803,21 +805,23 @@ public:
|
|||||||
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
|
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
|
||||||
size_t max_bytes_before_external_group_by_,
|
size_t max_bytes_before_external_group_by_,
|
||||||
bool empty_result_for_aggregation_by_empty_set_,
|
bool empty_result_for_aggregation_by_empty_set_,
|
||||||
const std::string & tmp_path_, size_t max_threads_)
|
const std::string & tmp_path_, size_t max_threads_,
|
||||||
|
size_t min_free_disk_space_)
|
||||||
: src_header(src_header_),
|
: src_header(src_header_),
|
||||||
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
|
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
|
||||||
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
|
||||||
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
|
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
|
||||||
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
|
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
|
||||||
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
|
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
|
||||||
tmp_path(tmp_path_), max_threads(max_threads_)
|
tmp_path(tmp_path_), max_threads(max_threads_),
|
||||||
|
min_free_disk_space(min_free_disk_space_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Only parameters that matter during merge.
|
/// Only parameters that matter during merge.
|
||||||
Params(const Block & intermediate_header_,
|
Params(const Block & intermediate_header_,
|
||||||
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
|
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
|
||||||
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_)
|
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_, 0)
|
||||||
{
|
{
|
||||||
intermediate_header = intermediate_header_;
|
intermediate_header = intermediate_header_;
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@
|
|||||||
#include <Parsers/ASTSelectQuery.h>
|
#include <Parsers/ASTSelectQuery.h>
|
||||||
|
|
||||||
#include <Storages/IStorage.h>
|
#include <Storages/IStorage.h>
|
||||||
#include <DataTypes/DataTypeNullable.h>
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -102,22 +101,6 @@ std::unordered_map<String, String> AnalyzedJoin::getOriginalColumnsMap(const Nam
|
|||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
void AnalyzedJoin::calculateAvailableJoinedColumns(bool make_nullable)
|
|
||||||
{
|
|
||||||
if (!make_nullable)
|
|
||||||
{
|
|
||||||
available_joined_columns = columns_from_joined_table;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto & column : columns_from_joined_table)
|
|
||||||
{
|
|
||||||
auto type = column.type->canBeInsideNullable() ? makeNullable(column.type) : column.type;
|
|
||||||
available_joined_columns.emplace_back(NameAndTypePair(column.name, std::move(type)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpression & table_expression, const Context & context)
|
NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpression & table_expression, const Context & context)
|
||||||
{
|
{
|
||||||
NamesAndTypesList names_and_type_list;
|
NamesAndTypesList names_and_type_list;
|
||||||
|
@ -42,8 +42,6 @@ private:
|
|||||||
|
|
||||||
/// All columns which can be read from joined table. Duplicating names are qualified.
|
/// All columns which can be read from joined table. Duplicating names are qualified.
|
||||||
NamesAndTypesList columns_from_joined_table;
|
NamesAndTypesList columns_from_joined_table;
|
||||||
/// Columns from joined table which may be added to block. It's columns_from_joined_table with possibly modified types.
|
|
||||||
NamesAndTypesList available_joined_columns;
|
|
||||||
/// Name -> original name. Names are the same as in columns_from_joined_table list.
|
/// Name -> original name. Names are the same as in columns_from_joined_table list.
|
||||||
std::unordered_map<String, String> original_names;
|
std::unordered_map<String, String> original_names;
|
||||||
/// Original name -> name. Only ranamed columns.
|
/// Original name -> name. Only ranamed columns.
|
||||||
@ -61,7 +59,6 @@ public:
|
|||||||
std::unordered_map<String, String> getOriginalColumnsMap(const NameSet & required_columns) const;
|
std::unordered_map<String, String> getOriginalColumnsMap(const NameSet & required_columns) const;
|
||||||
|
|
||||||
void deduplicateAndQualifyColumnNames(const NameSet & left_table_columns, const String & right_table_prefix);
|
void deduplicateAndQualifyColumnNames(const NameSet & left_table_columns, const String & right_table_prefix);
|
||||||
void calculateAvailableJoinedColumns(bool make_nullable);
|
|
||||||
size_t rightKeyInclusion(const String & name) const;
|
size_t rightKeyInclusion(const String & name) const;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -278,8 +278,8 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings,
|
|||||||
case JOIN:
|
case JOIN:
|
||||||
{
|
{
|
||||||
bool is_null_used_as_default = settings.join_use_nulls;
|
bool is_null_used_as_default = settings.join_use_nulls;
|
||||||
bool right_or_full_join = join_kind == ASTTableJoin::Kind::Right || join_kind == ASTTableJoin::Kind::Full;
|
bool right_or_full_join = isRightOrFull(join_kind);
|
||||||
bool left_or_full_join = join_kind == ASTTableJoin::Kind::Left || join_kind == ASTTableJoin::Kind::Full;
|
bool left_or_full_join = isLeftOrFull(join_kind);
|
||||||
|
|
||||||
for (auto & col : sample_block)
|
for (auto & col : sample_block)
|
||||||
{
|
{
|
||||||
|
@ -1657,7 +1657,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
|
|||||||
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
|
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
|
||||||
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
|
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
|
||||||
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
|
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
|
||||||
context.getTemporaryPath(), settings.max_threads);
|
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
|
||||||
|
|
||||||
/// If there are several sources, then we perform parallel aggregation
|
/// If there are several sources, then we perform parallel aggregation
|
||||||
if (pipeline.streams.size() > 1)
|
if (pipeline.streams.size() > 1)
|
||||||
@ -1723,7 +1723,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
|
|||||||
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
|
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
|
||||||
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
|
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
|
||||||
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
|
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
|
||||||
context.getTemporaryPath(), settings.max_threads);
|
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
|
||||||
|
|
||||||
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
|
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
|
||||||
|
|
||||||
@ -1943,7 +1943,7 @@ void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificato
|
|||||||
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
|
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
|
||||||
SettingUInt64(0), SettingUInt64(0),
|
SettingUInt64(0), SettingUInt64(0),
|
||||||
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
|
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
|
||||||
context.getTemporaryPath(), settings.max_threads);
|
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
|
||||||
|
|
||||||
if (modificator == Modificator::ROLLUP)
|
if (modificator == Modificator::ROLLUP)
|
||||||
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
|
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
|
||||||
@ -1972,7 +1972,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
|
|||||||
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
|
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
|
||||||
SettingUInt64(0), SettingUInt64(0),
|
SettingUInt64(0), SettingUInt64(0),
|
||||||
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
|
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
|
||||||
context.getTemporaryPath(), settings.max_threads);
|
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
|
||||||
|
|
||||||
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
|
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
|
||||||
|
|
||||||
@ -2073,7 +2073,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr so
|
|||||||
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
|
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
|
||||||
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
|
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
|
||||||
settings.max_bytes_before_remerge_sort,
|
settings.max_bytes_before_remerge_sort,
|
||||||
settings.max_bytes_before_external_sort, context.getTemporaryPath());
|
settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2111,7 +2111,7 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoP
|
|||||||
return std::make_shared<MergeSortingTransform>(
|
return std::make_shared<MergeSortingTransform>(
|
||||||
header, order_descr, settings.max_block_size, limit,
|
header, order_descr, settings.max_block_size, limit,
|
||||||
settings.max_bytes_before_remerge_sort,
|
settings.max_bytes_before_remerge_sort,
|
||||||
settings.max_bytes_before_external_sort, context.getTemporaryPath());
|
settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@
|
|||||||
#include <Parsers/queryToString.h>
|
#include <Parsers/queryToString.h>
|
||||||
|
|
||||||
#include <DataTypes/NestedUtils.h>
|
#include <DataTypes/NestedUtils.h>
|
||||||
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
|
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
#include <Storages/IStorage.h>
|
#include <Storages/IStorage.h>
|
||||||
@ -488,13 +489,14 @@ void getArrayJoinedColumns(ASTPtr & query, SyntaxAnalyzerResult & result, const
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_default_strictness)
|
void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_default_strictness, ASTTableJoin::Kind & join_kind)
|
||||||
{
|
{
|
||||||
const ASTTablesInSelectQueryElement * node = select_query.join();
|
const ASTTablesInSelectQueryElement * node = select_query.join();
|
||||||
if (!node)
|
if (!node)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
auto & table_join = const_cast<ASTTablesInSelectQueryElement *>(node)->table_join->as<ASTTableJoin &>();
|
auto & table_join = const_cast<ASTTablesInSelectQueryElement *>(node)->table_join->as<ASTTableJoin &>();
|
||||||
|
join_kind = table_join.kind;
|
||||||
|
|
||||||
if (table_join.strictness == ASTTableJoin::Strictness::Unspecified &&
|
if (table_join.strictness == ASTTableJoin::Strictness::Unspecified &&
|
||||||
table_join.kind != ASTTableJoin::Kind::Cross)
|
table_join.kind != ASTTableJoin::Kind::Cross)
|
||||||
@ -511,7 +513,7 @@ void setJoinStrictness(ASTSelectQuery & select_query, JoinStrictness join_defaul
|
|||||||
|
|
||||||
/// Find the columns that are obtained by JOIN.
|
/// Find the columns that are obtained by JOIN.
|
||||||
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query, const NameSet & source_columns,
|
void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & select_query, const NameSet & source_columns,
|
||||||
const Aliases & aliases, bool join_use_nulls)
|
const Aliases & aliases)
|
||||||
{
|
{
|
||||||
const ASTTablesInSelectQueryElement * node = select_query.join();
|
const ASTTablesInSelectQueryElement * node = select_query.join();
|
||||||
if (!node)
|
if (!node)
|
||||||
@ -537,10 +539,6 @@ void collectJoinedColumns(AnalyzedJoin & analyzed_join, const ASTSelectQuery & s
|
|||||||
if (is_asof)
|
if (is_asof)
|
||||||
data.asofToJoinKeys();
|
data.asofToJoinKeys();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool make_nullable = join_use_nulls && isLeftOrFull(table_join.kind);
|
|
||||||
|
|
||||||
analyzed_join.calculateAvailableJoinedColumns(make_nullable);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void replaceJoinedTable(const ASTTablesInSelectQueryElement* join)
|
void replaceJoinedTable(const ASTTablesInSelectQueryElement* join)
|
||||||
@ -611,7 +609,8 @@ std::vector<const ASTFunction *> getAggregates(const ASTPtr & query)
|
|||||||
/// Calculate which columns are required to execute the expression.
|
/// Calculate which columns are required to execute the expression.
|
||||||
/// Then, delete all other columns from the list of available columns.
|
/// Then, delete all other columns from the list of available columns.
|
||||||
/// After execution, columns will only contain the list of columns needed to read from the table.
|
/// After execution, columns will only contain the list of columns needed to read from the table.
|
||||||
void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns)
|
void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns,
|
||||||
|
bool make_joined_columns_nullable)
|
||||||
{
|
{
|
||||||
/// We caclulate required_source_columns with source_columns modifications and swap them on exit
|
/// We caclulate required_source_columns with source_columns modifications and swap them on exit
|
||||||
required_source_columns = source_columns;
|
required_source_columns = source_columns;
|
||||||
@ -639,7 +638,7 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA
|
|||||||
|
|
||||||
/// Add columns obtained by JOIN (if needed).
|
/// Add columns obtained by JOIN (if needed).
|
||||||
columns_added_by_join.clear();
|
columns_added_by_join.clear();
|
||||||
for (const auto & joined_column : analyzed_join.available_joined_columns)
|
for (const auto & joined_column : analyzed_join.columns_from_joined_table)
|
||||||
{
|
{
|
||||||
auto & name = joined_column.name;
|
auto & name = joined_column.name;
|
||||||
if (avaliable_columns.count(name))
|
if (avaliable_columns.count(name))
|
||||||
@ -649,7 +648,15 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA
|
|||||||
{
|
{
|
||||||
/// Optimisation: do not add columns needed only in JOIN ON section.
|
/// Optimisation: do not add columns needed only in JOIN ON section.
|
||||||
if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name))
|
if (columns_context.nameInclusion(name) > analyzed_join.rightKeyInclusion(name))
|
||||||
|
{
|
||||||
|
if (make_joined_columns_nullable)
|
||||||
|
{
|
||||||
|
auto type = joined_column.type->canBeInsideNullable() ? makeNullable(joined_column.type) : joined_column.type;
|
||||||
|
columns_added_by_join.emplace_back(NameAndTypePair(joined_column.name, std::move(type)));
|
||||||
|
}
|
||||||
|
else
|
||||||
columns_added_by_join.push_back(joined_column);
|
columns_added_by_join.push_back(joined_column);
|
||||||
|
}
|
||||||
required.erase(name);
|
required.erase(name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -759,7 +766,7 @@ void SyntaxAnalyzerResult::collectUsedColumns(const ASTPtr & query, const NamesA
|
|||||||
if (columns_context.has_table_join)
|
if (columns_context.has_table_join)
|
||||||
{
|
{
|
||||||
ss << ", joined columns:";
|
ss << ", joined columns:";
|
||||||
for (const auto & column : analyzed_join.available_joined_columns)
|
for (const auto & column : analyzed_join.columns_from_joined_table)
|
||||||
ss << " '" << column.name << "'";
|
ss << " '" << column.name << "'";
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -865,6 +872,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
|
|||||||
/// Optimize if with constant condition after constants was substituted instead of scalar subqueries.
|
/// Optimize if with constant condition after constants was substituted instead of scalar subqueries.
|
||||||
OptimizeIfWithConstantConditionVisitor(result.aliases).visit(query);
|
OptimizeIfWithConstantConditionVisitor(result.aliases).visit(query);
|
||||||
|
|
||||||
|
bool make_joined_columns_nullable = false;
|
||||||
if (select_query)
|
if (select_query)
|
||||||
{
|
{
|
||||||
/// GROUP BY injective function elimination.
|
/// GROUP BY injective function elimination.
|
||||||
@ -885,12 +893,15 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
|
|||||||
/// Push the predicate expression down to the subqueries.
|
/// Push the predicate expression down to the subqueries.
|
||||||
result.rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings, context).optimize();
|
result.rewrite_subqueries = PredicateExpressionsOptimizer(select_query, settings, context).optimize();
|
||||||
|
|
||||||
setJoinStrictness(*select_query, settings.join_default_strictness);
|
ASTTableJoin::Kind join_kind = ASTTableJoin::Kind::Comma;
|
||||||
collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases, settings.join_use_nulls);
|
setJoinStrictness(*select_query, settings.join_default_strictness, join_kind);
|
||||||
|
make_joined_columns_nullable = settings.join_use_nulls && isLeftOrFull(join_kind);
|
||||||
|
|
||||||
|
collectJoinedColumns(result.analyzed_join, *select_query, source_columns_set, result.aliases);
|
||||||
}
|
}
|
||||||
|
|
||||||
result.aggregates = getAggregates(query);
|
result.aggregates = getAggregates(query);
|
||||||
result.collectUsedColumns(query, additional_source_columns);
|
result.collectUsedColumns(query, additional_source_columns, make_joined_columns_nullable);
|
||||||
return std::make_shared<const SyntaxAnalyzerResult>(result);
|
return std::make_shared<const SyntaxAnalyzerResult>(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,7 +20,7 @@ struct SyntaxAnalyzerResult
|
|||||||
NamesAndTypesList source_columns;
|
NamesAndTypesList source_columns;
|
||||||
/// Set of columns that are enough to read from the table to evaluate the expression. It does not include joined columns.
|
/// Set of columns that are enough to read from the table to evaluate the expression. It does not include joined columns.
|
||||||
NamesAndTypesList required_source_columns;
|
NamesAndTypesList required_source_columns;
|
||||||
/// Columns will be added to block by JOIN. It's a subset of analyzed_join.available_joined_columns
|
/// Columns will be added to block by JOIN. It's a subset of analyzed_join.columns_from_joined_table with corrected Nullability
|
||||||
NamesAndTypesList columns_added_by_join;
|
NamesAndTypesList columns_added_by_join;
|
||||||
|
|
||||||
Aliases aliases;
|
Aliases aliases;
|
||||||
@ -42,7 +42,7 @@ struct SyntaxAnalyzerResult
|
|||||||
/// Predicate optimizer overrides the sub queries
|
/// Predicate optimizer overrides the sub queries
|
||||||
bool rewrite_subqueries = false;
|
bool rewrite_subqueries = false;
|
||||||
|
|
||||||
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns);
|
void collectUsedColumns(const ASTPtr & query, const NamesAndTypesList & additional_source_columns, bool make_joined_columns_nullable);
|
||||||
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
|
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ int main(int argc, char ** argv)
|
|||||||
|
|
||||||
Aggregator::Params params(
|
Aggregator::Params params(
|
||||||
stream->getHeader(), {0, 1}, aggregate_descriptions,
|
stream->getHeader(), {0, 1}, aggregate_descriptions,
|
||||||
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1);
|
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1, 0);
|
||||||
|
|
||||||
Aggregator aggregator(params);
|
Aggregator aggregator(params);
|
||||||
|
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
|
|
||||||
#include <Common/formatReadable.h>
|
#include <Common/formatReadable.h>
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
|
#include <common/config_common.h>
|
||||||
|
|
||||||
#include <IO/WriteBufferFromFile.h>
|
#include <IO/WriteBufferFromFile.h>
|
||||||
#include <Compression/CompressedWriteBuffer.h>
|
#include <Compression/CompressedWriteBuffer.h>
|
||||||
@ -236,11 +237,13 @@ MergeSortingTransform::MergeSortingTransform(
|
|||||||
SortDescription & description_,
|
SortDescription & description_,
|
||||||
size_t max_merged_block_size_, UInt64 limit_,
|
size_t max_merged_block_size_, UInt64 limit_,
|
||||||
size_t max_bytes_before_remerge_,
|
size_t max_bytes_before_remerge_,
|
||||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
|
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
|
||||||
|
size_t min_free_disk_space_)
|
||||||
: IProcessor({header}, {header})
|
: IProcessor({header}, {header})
|
||||||
, description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
|
, description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
|
||||||
, max_bytes_before_remerge(max_bytes_before_remerge_)
|
, max_bytes_before_remerge(max_bytes_before_remerge_)
|
||||||
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
|
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
|
||||||
|
, min_free_disk_space(min_free_disk_space_)
|
||||||
{
|
{
|
||||||
auto & sample = inputs.front().getHeader();
|
auto & sample = inputs.front().getHeader();
|
||||||
|
|
||||||
@ -504,6 +507,12 @@ void MergeSortingTransform::consume(Chunk chunk)
|
|||||||
*/
|
*/
|
||||||
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
|
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
|
||||||
{
|
{
|
||||||
|
#if !UNBUNDLED
|
||||||
|
auto free_space = Poco::File(tmp_path).freeSpace();
|
||||||
|
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
|
||||||
|
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
|
||||||
|
#endif
|
||||||
|
|
||||||
Poco::File(tmp_path).createDirectories();
|
Poco::File(tmp_path).createDirectories();
|
||||||
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
|
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
|
||||||
const std::string & path = temporary_files.back()->path();
|
const std::string & path = temporary_files.back()->path();
|
||||||
|
@ -14,6 +14,10 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int NOT_ENOUGH_SPACE;
|
||||||
|
}
|
||||||
class MergeSorter;
|
class MergeSorter;
|
||||||
|
|
||||||
class MergeSortingTransform : public IProcessor
|
class MergeSortingTransform : public IProcessor
|
||||||
@ -24,7 +28,8 @@ public:
|
|||||||
SortDescription & description_,
|
SortDescription & description_,
|
||||||
size_t max_merged_block_size_, UInt64 limit_,
|
size_t max_merged_block_size_, UInt64 limit_,
|
||||||
size_t max_bytes_before_remerge_,
|
size_t max_bytes_before_remerge_,
|
||||||
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
|
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
|
||||||
|
size_t min_free_disk_space_);
|
||||||
|
|
||||||
~MergeSortingTransform() override;
|
~MergeSortingTransform() override;
|
||||||
|
|
||||||
@ -44,6 +49,7 @@ private:
|
|||||||
size_t max_bytes_before_remerge;
|
size_t max_bytes_before_remerge;
|
||||||
size_t max_bytes_before_external_sort;
|
size_t max_bytes_before_external_sort;
|
||||||
const std::string tmp_path;
|
const std::string tmp_path;
|
||||||
|
size_t min_free_disk_space;
|
||||||
|
|
||||||
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
Logger * log = &Logger::get("MergeSortingBlockInputStream");
|
||||||
|
|
||||||
|
@ -229,7 +229,8 @@ try
|
|||||||
max_bytes_before_external_group_by,
|
max_bytes_before_external_group_by,
|
||||||
false, /// empty_result_for_aggregation_by_empty_set
|
false, /// empty_result_for_aggregation_by_empty_set
|
||||||
cur_path, /// tmp_path
|
cur_path, /// tmp_path
|
||||||
1 /// max_threads
|
1, /// max_threads
|
||||||
|
0
|
||||||
);
|
);
|
||||||
|
|
||||||
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
|
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
|
||||||
@ -301,7 +302,8 @@ try
|
|||||||
max_bytes_before_external_group_by,
|
max_bytes_before_external_group_by,
|
||||||
false, /// empty_result_for_aggregation_by_empty_set
|
false, /// empty_result_for_aggregation_by_empty_set
|
||||||
cur_path, /// tmp_path
|
cur_path, /// tmp_path
|
||||||
1 /// max_threads
|
1, /// max_threads
|
||||||
|
0
|
||||||
);
|
);
|
||||||
|
|
||||||
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
|
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
|
||||||
|
@ -133,7 +133,7 @@ try
|
|||||||
SortDescription description = {{0, 1, 1}};
|
SortDescription description = {{0, 1, 1}};
|
||||||
auto transform = std::make_shared<MergeSortingTransform>(
|
auto transform = std::make_shared<MergeSortingTransform>(
|
||||||
source->getPort().getHeader(), description,
|
source->getPort().getHeader(), description,
|
||||||
max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".");
|
max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".", 0);
|
||||||
auto sink = std::make_shared<CheckSortedSink>();
|
auto sink = std::make_shared<CheckSortedSink>();
|
||||||
|
|
||||||
connect(source->getPort(), transform->getInputs().front());
|
connect(source->getPort(), transform->getInputs().front());
|
||||||
|
@ -40,8 +40,11 @@ public:
|
|||||||
/// Opaque pointer to avoid dependencies (it is not possible to do forward declaration of typedef).
|
/// Opaque pointer to avoid dependencies (it is not possible to do forward declaration of typedef).
|
||||||
const void * data;
|
const void * data;
|
||||||
|
|
||||||
/// Minimal time, when we need to delete some data from this part
|
/// Minimal time, when we need to delete some data from this part.
|
||||||
time_t min_ttl;
|
time_t min_ttl;
|
||||||
|
|
||||||
|
/// Maximum time, when we will need to drop this part altogether because all rows in it are expired.
|
||||||
|
time_t max_ttl;
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Parts are belong to partitions. Only parts within same partition could be merged.
|
/// Parts are belong to partitions. Only parts within same partition could be merged.
|
||||||
|
@ -211,8 +211,11 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
|
|||||||
part_info.level = part->info.level;
|
part_info.level = part->info.level;
|
||||||
part_info.data = ∂
|
part_info.data = ∂
|
||||||
part_info.min_ttl = part->ttl_infos.part_min_ttl;
|
part_info.min_ttl = part->ttl_infos.part_min_ttl;
|
||||||
|
part_info.max_ttl = part->ttl_infos.part_max_ttl;
|
||||||
|
|
||||||
if (part_info.min_ttl && part_info.min_ttl <= current_time)
|
time_t ttl = data_settings->ttl_only_drop_parts ? part_info.max_ttl : part_info.min_ttl;
|
||||||
|
|
||||||
|
if (ttl && ttl <= current_time)
|
||||||
has_part_with_expired_ttl = true;
|
has_part_with_expired_ttl = true;
|
||||||
|
|
||||||
partitions.back().emplace_back(part_info);
|
partitions.back().emplace_back(part_info);
|
||||||
@ -239,7 +242,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
|
|||||||
/// NOTE Could allow selection of different merge strategy.
|
/// NOTE Could allow selection of different merge strategy.
|
||||||
if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
|
if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
|
||||||
{
|
{
|
||||||
merge_selector = std::make_unique<TTLMergeSelector>(current_time);
|
merge_selector = std::make_unique<TTLMergeSelector>(current_time, data_settings->ttl_only_drop_parts);
|
||||||
last_merge_with_ttl = current_time;
|
last_merge_with_ttl = current_time;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -12,11 +12,11 @@ void MergeTreeDataPartTTLInfos::update(const MergeTreeDataPartTTLInfos & other_i
|
|||||||
for (const auto & [name, ttl_info] : other_infos.columns_ttl)
|
for (const auto & [name, ttl_info] : other_infos.columns_ttl)
|
||||||
{
|
{
|
||||||
columns_ttl[name].update(ttl_info);
|
columns_ttl[name].update(ttl_info);
|
||||||
updatePartMinTTL(ttl_info.min);
|
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
|
||||||
}
|
}
|
||||||
|
|
||||||
table_ttl.update(other_infos.table_ttl);
|
table_ttl.update(other_infos.table_ttl);
|
||||||
updatePartMinTTL(table_ttl.min);
|
updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
|
void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
|
||||||
@ -37,7 +37,7 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
|
|||||||
String name = col["name"].getString();
|
String name = col["name"].getString();
|
||||||
columns_ttl.emplace(name, ttl_info);
|
columns_ttl.emplace(name, ttl_info);
|
||||||
|
|
||||||
updatePartMinTTL(ttl_info.min);
|
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (json.has("table"))
|
if (json.has("table"))
|
||||||
@ -46,7 +46,7 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
|
|||||||
table_ttl.min = table["min"].getUInt();
|
table_ttl.min = table["min"].getUInt();
|
||||||
table_ttl.max = table["max"].getUInt();
|
table_ttl.max = table["max"].getUInt();
|
||||||
|
|
||||||
updatePartMinTTL(table_ttl.min);
|
updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,15 +36,19 @@ struct MergeTreeDataPartTTLInfos
|
|||||||
std::unordered_map<String, MergeTreeDataPartTTLInfo> columns_ttl;
|
std::unordered_map<String, MergeTreeDataPartTTLInfo> columns_ttl;
|
||||||
MergeTreeDataPartTTLInfo table_ttl;
|
MergeTreeDataPartTTLInfo table_ttl;
|
||||||
time_t part_min_ttl = 0;
|
time_t part_min_ttl = 0;
|
||||||
|
time_t part_max_ttl = 0;
|
||||||
|
|
||||||
void read(ReadBuffer & in);
|
void read(ReadBuffer & in);
|
||||||
void write(WriteBuffer & out) const;
|
void write(WriteBuffer & out) const;
|
||||||
void update(const MergeTreeDataPartTTLInfos & other_infos);
|
void update(const MergeTreeDataPartTTLInfos & other_infos);
|
||||||
|
|
||||||
void updatePartMinTTL(time_t time)
|
void updatePartMinMaxTTL(time_t time_min, time_t time_max)
|
||||||
{
|
{
|
||||||
if (time && (!part_min_ttl || time < part_min_ttl))
|
if (time_min && (!part_min_ttl || time_min < part_min_ttl))
|
||||||
part_min_ttl = time;
|
part_min_ttl = time_min;
|
||||||
|
|
||||||
|
if (time_max && (!part_max_ttl || time_max > part_max_ttl))
|
||||||
|
part_max_ttl = time_max;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -99,7 +99,7 @@ void updateTTL(const MergeTreeData::TTLEntry & ttl_entry, MergeTreeDataPart::TTL
|
|||||||
else
|
else
|
||||||
throw Exception("Unexpected type of result ttl column", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Unexpected type of result ttl column", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
ttl_infos.updatePartMinTTL(ttl_info.min);
|
ttl_infos.updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -82,6 +82,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
|
|||||||
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
|
M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \
|
||||||
IM(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
|
IM(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \
|
||||||
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
|
M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \
|
||||||
|
M(SettingBool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.") \
|
||||||
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \
|
M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \
|
||||||
M(SettingBool, enable_mixed_granularity_parts, 0, "Enable parts with adaptive and non adaptive granularity") \
|
M(SettingBool, enable_mixed_granularity_parts, 0, "Enable parts with adaptive and non adaptive granularity") \
|
||||||
M(SettingMaxThreads, max_part_loading_threads, 0, "The number of theads to load data parts at startup.") \
|
M(SettingMaxThreads, max_part_loading_threads, 0, "The number of theads to load data parts at startup.") \
|
||||||
|
@ -20,9 +20,11 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
|
|||||||
{
|
{
|
||||||
for (auto it = partitions[i].begin(); it != partitions[i].end(); ++it)
|
for (auto it = partitions[i].begin(); it != partitions[i].end(); ++it)
|
||||||
{
|
{
|
||||||
if (it->min_ttl && (partition_to_merge_index == -1 || it->min_ttl < partition_to_merge_min_ttl))
|
time_t ttl = only_drop_parts ? it->max_ttl : it->min_ttl;
|
||||||
|
|
||||||
|
if (ttl && (partition_to_merge_index == -1 || ttl < partition_to_merge_min_ttl))
|
||||||
{
|
{
|
||||||
partition_to_merge_min_ttl = it->min_ttl;
|
partition_to_merge_min_ttl = ttl;
|
||||||
partition_to_merge_index = i;
|
partition_to_merge_index = i;
|
||||||
best_begin = it;
|
best_begin = it;
|
||||||
}
|
}
|
||||||
@ -38,7 +40,9 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
|
|||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
if (!best_begin->min_ttl || best_begin->min_ttl > current_time
|
time_t ttl = only_drop_parts ? best_begin->max_ttl : best_begin->min_ttl;
|
||||||
|
|
||||||
|
if (!ttl || ttl > current_time
|
||||||
|| (max_total_size_to_merge && total_size > max_total_size_to_merge))
|
|| (max_total_size_to_merge && total_size > max_total_size_to_merge))
|
||||||
{
|
{
|
||||||
++best_begin;
|
++best_begin;
|
||||||
@ -54,7 +58,9 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
|
|||||||
|
|
||||||
while (best_end != best_partition.end())
|
while (best_end != best_partition.end())
|
||||||
{
|
{
|
||||||
if (!best_end->min_ttl || best_end->min_ttl > current_time
|
time_t ttl = only_drop_parts ? best_end->max_ttl : best_end->min_ttl;
|
||||||
|
|
||||||
|
if (!ttl || ttl > current_time
|
||||||
|| (max_total_size_to_merge && total_size > max_total_size_to_merge))
|
|| (max_total_size_to_merge && total_size > max_total_size_to_merge))
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
@ -14,13 +14,14 @@ namespace DB
|
|||||||
class TTLMergeSelector : public IMergeSelector
|
class TTLMergeSelector : public IMergeSelector
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
explicit TTLMergeSelector(time_t current_time_) : current_time(current_time_) {}
|
explicit TTLMergeSelector(time_t current_time_, bool only_drop_parts_) : current_time(current_time_), only_drop_parts(only_drop_parts_) {}
|
||||||
|
|
||||||
PartsInPartition select(
|
PartsInPartition select(
|
||||||
const Partitions & partitions,
|
const Partitions & partitions,
|
||||||
const size_t max_total_size_to_merge) override;
|
const size_t max_total_size_to_merge) override;
|
||||||
private:
|
private:
|
||||||
time_t current_time;
|
time_t current_time;
|
||||||
|
bool only_drop_parts;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -4979,8 +4979,12 @@ void StorageReplicatedMergeTree::replacePartitionFrom(const StoragePtr & source_
|
|||||||
|
|
||||||
/// If necessary, wait until the operation is performed on all replicas.
|
/// If necessary, wait until the operation is performed on all replicas.
|
||||||
if (context.getSettingsRef().replication_alter_partitions_sync > 1)
|
if (context.getSettingsRef().replication_alter_partitions_sync > 1)
|
||||||
|
{
|
||||||
|
lock2.release();
|
||||||
|
lock1.release();
|
||||||
waitForAllReplicasToProcessLogEntry(entry);
|
waitForAllReplicasToProcessLogEntry(entry);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void StorageReplicatedMergeTree::getCommitPartOps(
|
void StorageReplicatedMergeTree::getCommitPartOps(
|
||||||
Coordination::Requests & ops,
|
Coordination::Requests & ops,
|
||||||
|
@ -26,6 +26,11 @@ private:
|
|||||||
|
|
||||||
struct TableStructureReadLockHolder
|
struct TableStructureReadLockHolder
|
||||||
{
|
{
|
||||||
|
void release()
|
||||||
|
{
|
||||||
|
*this = TableStructureReadLockHolder();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class IStorage;
|
friend class IStorage;
|
||||||
|
|
||||||
|
@ -76,3 +76,14 @@ TEST(TransformQueryForExternalDatabase, Substring)
|
|||||||
"SELECT \"column\" FROM \"test\".\"table\"",
|
"SELECT \"column\" FROM \"test\".\"table\"",
|
||||||
state().context, state().columns);
|
state().context, state().columns);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(TransformQueryForExternalDatabase, MultipleAndSubqueries)
|
||||||
|
{
|
||||||
|
check("SELECT column FROM test.table WHERE 1 = 1 AND toString(column) = '42' AND column = 42 AND left(column, 10) = RIGHT(column, 10) AND column IN (1, 42) AND SUBSTRING(column FROM 1 FOR 2) = 'Hello' AND column != 4",
|
||||||
|
"SELECT \"column\" FROM \"test\".\"table\" WHERE 1 AND (\"column\" = 42) AND (\"column\" IN (1, 42)) AND (\"column\" != 4)",
|
||||||
|
state().context, state().columns);
|
||||||
|
check("SELECT column FROM test.table WHERE toString(column) = '42' AND left(column, 10) = RIGHT(column, 10) AND column = 42",
|
||||||
|
"SELECT \"column\" FROM \"test\".\"table\" WHERE (\"column\" = 42)",
|
||||||
|
state().context, state().columns);
|
||||||
|
|
||||||
|
}
|
||||||
|
@ -141,19 +141,17 @@ String transformQueryForExternalDatabase(
|
|||||||
if (function->name == "and")
|
if (function->name == "and")
|
||||||
{
|
{
|
||||||
bool compatible_found = false;
|
bool compatible_found = false;
|
||||||
auto new_function_and = std::make_shared<ASTFunction>();
|
auto new_function_and = makeASTFunction("and");
|
||||||
auto new_function_and_arguments = std::make_shared<ASTExpressionList>();
|
|
||||||
new_function_and->arguments = new_function_and_arguments;
|
|
||||||
new_function_and->children.push_back(new_function_and_arguments);
|
|
||||||
|
|
||||||
for (const auto & elem : function->arguments->children)
|
for (const auto & elem : function->arguments->children)
|
||||||
{
|
{
|
||||||
if (isCompatible(*elem))
|
if (isCompatible(*elem))
|
||||||
{
|
{
|
||||||
new_function_and_arguments->children.push_back(elem);
|
new_function_and->arguments->children.push_back(elem);
|
||||||
compatible_found = true;
|
compatible_found = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (new_function_and->arguments->children.size() == 1)
|
||||||
|
new_function_and->name = "";
|
||||||
|
|
||||||
if (compatible_found)
|
if (compatible_found)
|
||||||
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(new_function_and));
|
select->setExpression(ASTSelectQuery::Expression::WHERE, std::move(new_function_and));
|
||||||
|
@ -1,5 +1,11 @@
|
|||||||
#!/usr/bin/env bash
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
# Because REPLACE PARTITION does not forces immediate removal of replaced data parts from local filesystem
|
||||||
|
# (it tries to do it as quick as possible, but it still performed in separate thread asynchronously)
|
||||||
|
# and when we do DETACH TABLE / ATTACH TABLE or SYSTEM RESTART REPLICA, these files may be discovered
|
||||||
|
# and discarded after restart with Warning/Error messages in log. This is Ok.
|
||||||
|
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
|
||||||
|
|
||||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||||
. $CURDIR/../shell_config.sh
|
. $CURDIR/../shell_config.sh
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ Configuration example:
|
|||||||
|
|
||||||
The dictionary is stored in memory in the form of a hash table with an ordered array of ranges and their corresponding values.
|
The dictionary is stored in memory in the form of a hash table with an ordered array of ranges and their corresponding values.
|
||||||
|
|
||||||
This storage method works the same way as hashed and allows using date/time ranges in addition to the key, if they appear in the dictionary.
|
This storage method works the same way as hashed and allows using date/time (arbitrary numeric type) ranges in addition to the key.
|
||||||
|
|
||||||
Example: The table contains discounts for each advertiser in the format:
|
Example: The table contains discounts for each advertiser in the format:
|
||||||
|
|
||||||
@ -111,7 +111,7 @@ Example: The table contains discounts for each advertiser in the format:
|
|||||||
+---------------+---------------------+-------------------+--------+
|
+---------------+---------------------+-------------------+--------+
|
||||||
```
|
```
|
||||||
|
|
||||||
To use a sample for date ranges, define the `range_min` and `range_max` elements in the [structure](external_dicts_dict_structure.md).
|
To use a sample for date ranges, define the `range_min` and `range_max` elements in the [structure](external_dicts_dict_structure.md). These elements must contain elements `name` and` type` (if `type` is not specified, the default type will be used - Date). `type` can be any numeric type (Date / DateTime / UInt64 / Int32 / others).
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
|
||||||
@ -122,14 +122,16 @@ Example:
|
|||||||
</id>
|
</id>
|
||||||
<range_min>
|
<range_min>
|
||||||
<name>first</name>
|
<name>first</name>
|
||||||
|
<type>Date</type>
|
||||||
</range_min>
|
</range_min>
|
||||||
<range_max>
|
<range_max>
|
||||||
<name>last</name>
|
<name>last</name>
|
||||||
|
<type>Date</type>
|
||||||
</range_max>
|
</range_max>
|
||||||
...
|
...
|
||||||
```
|
```
|
||||||
|
|
||||||
To work with these dictionaries, you need to pass an additional date argument to the `dictGetT` function:
|
To work with these dictionaries, you need to pass an additional argument to the `dictGetT` function, for which a range is selected:
|
||||||
|
|
||||||
```
|
```
|
||||||
dictGetT('dict_name', 'attr_name', id, date)
|
dictGetT('dict_name', 'attr_name', id, date)
|
||||||
@ -160,10 +162,12 @@ Configuration example:
|
|||||||
<name>Abcdef</name>
|
<name>Abcdef</name>
|
||||||
</id>
|
</id>
|
||||||
<range_min>
|
<range_min>
|
||||||
<name>StartDate</name>
|
<name>StartTimeStamp</name>
|
||||||
|
<type>UInt64</type>
|
||||||
</range_min>
|
</range_min>
|
||||||
<range_max>
|
<range_max>
|
||||||
<name>EndDate</name>
|
<name>EndTimeStamp</name>
|
||||||
|
<type>UInt64</type>
|
||||||
</range_max>
|
</range_max>
|
||||||
<attribute>
|
<attribute>
|
||||||
<name>XXXType</name>
|
<name>XXXType</name>
|
||||||
|
@ -95,7 +95,7 @@
|
|||||||
|
|
||||||
Словарь хранится в оперативной памяти в виде хэш-таблицы с упорядоченным массивом диапазонов и соответствующих им значений.
|
Словарь хранится в оперативной памяти в виде хэш-таблицы с упорядоченным массивом диапазонов и соответствующих им значений.
|
||||||
|
|
||||||
Этот способ размещения работает также как и hashed и позволяет дополнительно к ключу использовать дипазоны по дате/времени, если они указаны в словаре.
|
Этот способ размещения работает также как и hashed и позволяет дополнительно к ключу использовать дипазоны по дате/времени (произвольному числовому типу).
|
||||||
|
|
||||||
Пример: таблица содержит скидки для каждого рекламодателя в виде:
|
Пример: таблица содержит скидки для каждого рекламодателя в виде:
|
||||||
|
|
||||||
@ -111,7 +111,7 @@
|
|||||||
+---------------+---------------------+-------------------+--------+
|
+---------------+---------------------+-------------------+--------+
|
||||||
```
|
```
|
||||||
|
|
||||||
Чтобы использовать выборку по диапазонам дат, необходимо в [structure](external_dicts_dict_structure.md) определить элементы `range_min`, `range_max`.
|
Чтобы использовать выборку по диапазонам дат, необходимо в [structure](external_dicts_dict_structure.md) определить элементы `range_min`, `range_max`. В этих элементах должны присутствовать элементы `name` и `type` (если `type` не указан, будет использован тип по умолчанию -- Date). `type` может быть любым численным типом (Date/DateTime/UInt64/Int32/др.).
|
||||||
|
|
||||||
Пример:
|
Пример:
|
||||||
|
|
||||||
@ -122,14 +122,16 @@
|
|||||||
</id>
|
</id>
|
||||||
<range_min>
|
<range_min>
|
||||||
<name>first</name>
|
<name>first</name>
|
||||||
|
<type>Date</type>
|
||||||
</range_min>
|
</range_min>
|
||||||
<range_max>
|
<range_max>
|
||||||
<name>last</name>
|
<name>last</name>
|
||||||
|
<type>Date</type>
|
||||||
</range_max>
|
</range_max>
|
||||||
...
|
...
|
||||||
```
|
```
|
||||||
|
|
||||||
Для работы с такими словарями в функцию `dictGetT` необходимо передавать дополнительный аргумент - дату: :
|
Для работы с такими словарями в функцию `dictGetT` необходимо передавать дополнительный аргумент, для которого подбирается диапазон:
|
||||||
|
|
||||||
dictGetT('dict_name', 'attr_name', id, date)
|
dictGetT('dict_name', 'attr_name', id, date)
|
||||||
|
|
||||||
@ -158,10 +160,12 @@
|
|||||||
<name>Abcdef</name>
|
<name>Abcdef</name>
|
||||||
</id>
|
</id>
|
||||||
<range_min>
|
<range_min>
|
||||||
<name>StartDate</name>
|
<name>StartTimeStamp</name>
|
||||||
|
<type>UInt64</type>
|
||||||
</range_min>
|
</range_min>
|
||||||
<range_max>
|
<range_max>
|
||||||
<name>EndDate</name>
|
<name>EndTimeStamp</name>
|
||||||
|
<type>UInt64</type>
|
||||||
</range_max>
|
</range_max>
|
||||||
<attribute>
|
<attribute>
|
||||||
<name>XXXType</name>
|
<name>XXXType</name>
|
||||||
|
Loading…
Reference in New Issue
Block a user