Merge branch 'master' of github.com:yandex/ClickHouse

This commit is contained in:
Alexey Milovidov 2017-06-22 19:07:57 +03:00
commit 22e7070fb0
13 changed files with 505 additions and 347 deletions

View File

@ -62,7 +62,7 @@ std::ostream & operator<<(std::ostream & stream, const DB::Block & what)
{
stream << "Block("
<< "size = " << what.getColumns().size()
<< ")";
<< "){" << what.dumpStructure() << "}";
return stream;
}
@ -79,3 +79,15 @@ std::ostream & operator<<(std::ostream & stream, const DB::IColumn & what)
<< ")";
return stream;
}
std::ostream & operator<<(std::ostream & stream, const DB::Connection::Packet & what) {
stream << "Connection::Packet("
<< "type = " << what.type;
// types description: Core/Protocol.h
if (what.exception)
stream << "exception = " << what.exception.get();
//TODO: profile_info
stream << ") {" << what.block << "}";
return stream;
}

View File

@ -32,6 +32,9 @@ std::ostream & operator<<(std::ostream & stream, const DB::ColumnWithTypeAndName
namespace DB { class IColumn; }
std::ostream & operator<<(std::ostream & stream, const DB::IColumn & what);
#include <Client/Connection.h>
std::ostream & operator<<(std::ostream & stream, const DB::Connection::Packet & what);
/// some operator<< should be declared before operator<<(... std::shared_ptr<>)
#include <common/iostream_debug_helpers.h>

View File

@ -1,5 +1,6 @@
#include <Dictionaries/ComplexKeyCacheDictionary.h>
#include <Dictionaries/DictionaryBlockInputStream.h>
#include <Common/Arena.h>
#include <Common/BitHelpers.h>
#include <Common/randomSeed.h>
#include <Common/Stopwatch.h>
@ -7,6 +8,8 @@
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <ext/range.h>
#include <ext/scope_guard.h>
#include <ext/map.h>
namespace ProfileEvents

View File

@ -3,15 +3,12 @@
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/SmallObjectPool.h>
#include <Common/HashTable/HashMap.h>
#include <Columns/ColumnString.h>
#include <Core/StringRef.h>
#include <ext/scope_guard.h>
#include <ext/bit_cast.h>
#include <ext/map.h>
#include <Poco/RWLock.h>
#include <atomic>
#include <chrono>

View File

@ -2,9 +2,6 @@
#include <iostream>
#include <limits>
#include <regex>
#if __has_include(<sys/sysinfo.h>)
#include <sys/sysinfo.h>
#endif
#include <unistd.h>
#include <boost/program_options.hpp>
@ -16,12 +13,14 @@
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <Common/getFQDNOrHostName.h>
#include <Core/Types.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <Interpreters/Settings.h>
#include <common/getMemoryAmount.h>
#include <Poco/AutoPtr.h>
@ -116,131 +115,44 @@ public:
}
};
enum class PriorityType
{
Min,
Max
};
struct CriterionWithPriority
{
PriorityType priority;
size_t value;
bool fulfilled;
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
CriterionWithPriority() : value(0), fulfilled(false)
{
}
CriterionWithPriority(const CriterionWithPriority &) = default;
};
/// Termination criterions. The running test will be terminated in either of two conditions:
/// 1. All criterions marked 'min' are fulfilled
/// or
/// 2. Any criterion marked 'max' is fulfilled
class StopCriterions
{
private:
using AbstractConfiguration = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
using Keys = std::vector<String>;
void initializeStruct(const String & priority, const AbstractConfiguration & stop_criterions_view)
/// A set of supported stop conditions.
struct StopConditionsSet
{
void loadFromConfig(const ConfigurationPtr & stop_conditions_view)
{
using Keys = std::vector<String>;
Keys keys;
stop_criterions_view->keys(priority, keys);
PriorityType priority_type = (priority == "min" ? PriorityType::Min : PriorityType::Max);
stop_conditions_view->keys(keys);
for (const String & key : keys)
{
if (key == "timeout_ms")
{
timeout_ms.value = stop_criterions_view->getUInt64(priority + ".timeout_ms");
timeout_ms.priority = priority_type;
}
if (key == "total_time_ms")
total_time_ms.value = stop_conditions_view->getUInt64(key);
else if (key == "rows_read")
{
rows_read.value = stop_criterions_view->getUInt64(priority + ".rows_read");
rows_read.priority = priority_type;
}
rows_read.value = stop_conditions_view->getUInt64(key);
else if (key == "bytes_read_uncompressed")
{
bytes_read_uncompressed.value = stop_criterions_view->getUInt64(priority + ".bytes_read_uncompressed");
bytes_read_uncompressed.priority = priority_type;
}
bytes_read_uncompressed.value = stop_conditions_view->getUInt64(key);
else if (key == "iterations")
{
iterations.value = stop_criterions_view->getUInt64(priority + ".iterations");
iterations.priority = priority_type;
}
iterations.value = stop_conditions_view->getUInt64(key);
else if (key == "min_time_not_changing_for_ms")
{
min_time_not_changing_for_ms.value = stop_criterions_view->getUInt64(priority + ".min_time_not_changing_for_ms");
min_time_not_changing_for_ms.priority = priority_type;
}
min_time_not_changing_for_ms.value = stop_conditions_view->getUInt64(key);
else if (key == "max_speed_not_changing_for_ms")
{
max_speed_not_changing_for_ms.value = stop_criterions_view->getUInt64(priority + ".max_speed_not_changing_for_ms");
max_speed_not_changing_for_ms.priority = priority_type;
}
max_speed_not_changing_for_ms.value = stop_conditions_view->getUInt64(key);
else if (key == "average_speed_not_changing_for_ms")
{
average_speed_not_changing_for_ms.value = stop_criterions_view->getUInt64(priority + ".average_speed_not_changing_for_ms");
average_speed_not_changing_for_ms.priority = priority_type;
}
average_speed_not_changing_for_ms.value = stop_conditions_view->getUInt64(key);
else
{
throw DB::Exception("Met unkown stop criterion: " + key, 1);
}
throw DB::Exception("Met unkown stop condition: " + key, 1);
if (priority == "min")
{
++number_of_initialized_min;
};
if (priority == "max")
{
++number_of_initialized_max;
};
}
}
public:
StopCriterions() : number_of_initialized_min(0), number_of_initialized_max(0), fulfilled_criterions_min(0), fulfilled_criterions_max(0)
{
}
StopCriterions(const StopCriterions & another_criterions)
: timeout_ms(another_criterions.timeout_ms),
rows_read(another_criterions.rows_read),
bytes_read_uncompressed(another_criterions.bytes_read_uncompressed),
iterations(another_criterions.iterations),
min_time_not_changing_for_ms(another_criterions.min_time_not_changing_for_ms),
max_speed_not_changing_for_ms(another_criterions.max_speed_not_changing_for_ms),
average_speed_not_changing_for_ms(another_criterions.average_speed_not_changing_for_ms),
number_of_initialized_min(another_criterions.number_of_initialized_min),
number_of_initialized_max(another_criterions.number_of_initialized_max),
fulfilled_criterions_min(another_criterions.fulfilled_criterions_min),
fulfilled_criterions_max(another_criterions.fulfilled_criterions_max)
{
}
void loadFromConfig(const AbstractConfiguration & stop_criterions_view)
{
if (stop_criterions_view->has("min"))
{
initializeStruct("min", stop_criterions_view);
}
if (stop_criterions_view->has("max"))
{
initializeStruct("max", stop_criterions_view);
++initialized_count;
}
}
void reset()
{
timeout_ms.fulfilled = false;
total_time_ms.fulfilled = false;
rows_read.fulfilled = false;
bytes_read_uncompressed.fulfilled = false;
iterations.fulfilled = false;
@ -248,25 +160,98 @@ public:
max_speed_not_changing_for_ms.fulfilled = false;
average_speed_not_changing_for_ms.fulfilled = false;
fulfilled_criterions_min = 0;
fulfilled_criterions_max = 0;
fulfilled_count = 0;
}
CriterionWithPriority timeout_ms;
CriterionWithPriority rows_read;
CriterionWithPriority bytes_read_uncompressed;
CriterionWithPriority iterations;
CriterionWithPriority min_time_not_changing_for_ms;
CriterionWithPriority max_speed_not_changing_for_ms;
CriterionWithPriority average_speed_not_changing_for_ms;
/// Note: only conditions with UInt64 minimal thresholds are supported.
/// I.e. condition is fulfilled when value is exceeded.
struct StopCondition
{
UInt64 value = 0;
bool fulfilled = false;
};
/// Hereafter 'min' and 'max', in context of critetions, mean a level of importance
/// Number of initialized properties met in configuration
size_t number_of_initialized_min;
size_t number_of_initialized_max;
void report(UInt64 value, StopCondition & condition)
{
if (condition.value && !condition.fulfilled && value >= condition.value)
{
condition.fulfilled = true;
++fulfilled_count;
}
}
size_t fulfilled_criterions_min;
size_t fulfilled_criterions_max;
StopCondition total_time_ms;
StopCondition rows_read;
StopCondition bytes_read_uncompressed;
StopCondition iterations;
StopCondition min_time_not_changing_for_ms;
StopCondition max_speed_not_changing_for_ms;
StopCondition average_speed_not_changing_for_ms;
size_t initialized_count = 0;
size_t fulfilled_count = 0;
};
/// Stop conditions for a test run. The running test will be terminated in either of two conditions:
/// 1. All conditions marked 'all_of' are fulfilled
/// or
/// 2. Any condition marked 'any_of' is fulfilled
class TestStopConditions
{
public:
void loadFromConfig(ConfigurationPtr & stop_conditions_config)
{
if (stop_conditions_config->has("all_of"))
{
ConfigurationPtr config_all_of(stop_conditions_config->createView("all_of"));
conditions_all_of.loadFromConfig(config_all_of);
}
if (stop_conditions_config->has("any_of"))
{
ConfigurationPtr config_any_of(stop_conditions_config->createView("any_of"));
conditions_any_of.loadFromConfig(config_any_of);
}
}
bool empty() const
{
return !conditions_all_of.initialized_count && !conditions_any_of.initialized_count;
}
#define DEFINE_REPORT_FUNC(FUNC_NAME, CONDITION) \
void FUNC_NAME(UInt64 value) \
{ \
conditions_all_of.report(value, conditions_all_of.CONDITION); \
conditions_any_of.report(value, conditions_any_of.CONDITION); \
} \
DEFINE_REPORT_FUNC(reportTotalTime, total_time_ms);
DEFINE_REPORT_FUNC(reportRowsRead, rows_read);
DEFINE_REPORT_FUNC(reportBytesReadUncompressed, bytes_read_uncompressed);
DEFINE_REPORT_FUNC(reportIterations, iterations);
DEFINE_REPORT_FUNC(reportMinTimeNotChangingFor, min_time_not_changing_for_ms);
DEFINE_REPORT_FUNC(reportMaxSpeedNotChangingFor, max_speed_not_changing_for_ms);
DEFINE_REPORT_FUNC(reportAverageSpeedNotChangingFor, average_speed_not_changing_for_ms);
#undef REPORT
bool areFulfilled() const
{
return
(conditions_all_of.initialized_count
&& conditions_all_of.fulfilled_count >= conditions_all_of.initialized_count)
|| (conditions_any_of.initialized_count && conditions_any_of.fulfilled_count);
}
void reset()
{
conditions_all_of.reset();
conditions_any_of.reset();
}
private:
StopConditionsSet conditions_all_of;
StopConditionsSet conditions_any_of;
};
struct Stats
@ -278,6 +263,9 @@ struct Stats
Stopwatch max_bytes_speed_watch;
Stopwatch avg_rows_speed_watch;
Stopwatch avg_bytes_speed_watch;
bool last_query_was_cancelled = false;
size_t queries;
size_t rows_read;
size_t bytes_read;
@ -453,6 +441,8 @@ struct Stats
avg_rows_speed_watch.restart();
avg_bytes_speed_watch.restart();
last_query_was_cancelled = false;
sampler.clear();
queries = 0;
@ -515,8 +505,16 @@ public:
throw DB::Exception("No tests were specified", 0);
}
std::cerr << std::fixed << std::setprecision(3);
std::cout << std::fixed << std::setprecision(3);
std::string name;
UInt64 version_major;
UInt64 version_minor;
UInt64 version_revision;
connection.getServerVersion(name, version_major, version_minor, version_revision);
std::stringstream ss;
ss << name << " v" << version_major << "." << version_minor << "." << version_revision;
server_version = ss.str();
processTestsConfigurations(input_files);
}
@ -529,6 +527,7 @@ private:
Queries queries;
Connection connection;
std::string server_version;
using Keys = std::vector<String>;
@ -538,8 +537,7 @@ private:
InterruptListener interrupt_listener;
using XMLConfiguration = Poco::Util::XMLConfiguration;
using AbstractConfig = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
using Config = Poco::AutoPtr<XMLConfiguration>;
using XMLConfigurationPtr = Poco::AutoPtr<XMLConfiguration>;
using Paths = std::vector<String>;
using StringToVector = std::map<String, std::vector<String>>;
@ -549,13 +547,13 @@ private:
std::vector<StringKeyValue> substitutions_maps;
bool gotSIGINT;
std::vector<StopCriterions> stop_criterions;
std::vector<TestStopConditions> stop_conditions_by_run;
String main_metric;
bool lite_output;
String profiles_file;
Strings input_files;
std::vector<Config> tests_configurations;
std::vector<XMLConfigurationPtr> tests_configurations;
Strings tests_tags;
Strings skip_tags;
@ -564,14 +562,6 @@ private:
Strings tests_names_regexp;
Strings skip_names_regexp;
#define incFulfilledCriterions(index, CRITERION) \
if (!stop_criterions[index].CRITERION.fulfilled) \
{ \
stop_criterions[index].CRITERION.priority == PriorityType::Min ? ++stop_criterions[index].fulfilled_criterions_min \
: ++stop_criterions[index].fulfilled_criterions_max; \
stop_criterions[index].CRITERION.fulfilled = true; \
}
enum class ExecutionType
{
Loop,
@ -587,12 +577,15 @@ private:
};
size_t times_to_run = 1;
std::vector<Stats> statistics;
std::vector<Stats> statistics_by_run;
/// Removes configurations that has a given value. If leave is true, the logic is reversed.
void removeConfigurationsIf(std::vector<Config> & configs, FilterType filter_type, const Strings & values, bool leave = false)
void removeConfigurationsIf(
std::vector<XMLConfigurationPtr> & configs,
FilterType filter_type, const Strings & values, bool leave = false)
{
auto checker = [&filter_type, &values, &leave](Config & config) {
auto checker = [&filter_type, &values, &leave](XMLConfigurationPtr & config)
{
if (values.size() == 0)
return false;
@ -635,7 +628,7 @@ private:
return remove_or_not;
};
std::vector<Config>::iterator new_end = std::remove_if(configs.begin(), configs.end(), checker);
auto new_end = std::remove_if(configs.begin(), configs.end(), checker);
configs.erase(new_end, configs.end());
}
@ -655,7 +648,7 @@ private:
}
/// Checks specified preconditions per test (process cache, table existence, etc.)
bool checkPreconditions(const Config & config)
bool checkPreconditions(const XMLConfigurationPtr & config)
{
if (!config->has("preconditions"))
return true;
@ -666,35 +659,25 @@ private:
for (const String & precondition : preconditions)
{
if (precondition == "reset_cpu_cache")
if (system("(>&2 echo 'Flushing cache...') && (sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches') && (>&2 echo 'Flushed.')")) {
std::cerr << "Failed to flush cache" << std::endl;
if (precondition == "flush_disk_cache")
if (system("(>&2 echo 'Flushing disk cache...') && (sudo sh -c 'echo 3 > /proc/sys/vm/drop_caches') && (>&2 echo 'Flushed.')"))
{
std::cerr << "Failed to flush disk cache" << std::endl;
return false;
}
if (precondition == "ram_size")
{
#if __has_include(<sys/sysinfo.h>)
struct sysinfo *system_information = new struct sysinfo();
if (sysinfo(system_information))
size_t ram_size_needed = config->getUInt64("preconditions.ram_size");
size_t actual_ram = getMemoryAmount();
if (!actual_ram)
throw DB::Exception("ram_size precondition not available on this platform", ErrorCodes::NOT_IMPLEMENTED);
if (ram_size_needed > actual_ram)
{
std::cerr << "Failed to check system RAM size" << std::endl;
delete system_information;
std::cerr << "Not enough RAM: need = " << ram_size_needed << ", present = " << actual_ram << std::endl;
return false;
}
else
{
size_t ram_size_needed = config->getUInt64("preconditions.ram_size");
size_t actual_ram = system_information->totalram / 1024 / 1024;
if (ram_size_needed > actual_ram)
{
std::cerr << "Not enough RAM" << std::endl;
delete system_information;
return false;
}
}
#else
throw DB::Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED);
#endif
}
if (precondition == "table_exists")
@ -711,11 +694,15 @@ private:
{
Connection::Packet packet = connection.receivePacket();
if (packet.type == Protocol::Server::Data) {
if (packet.type == Protocol::Server::Data)
{
for (const ColumnWithTypeAndName & column : packet.block.getColumns())
{
if (column.name == "result" && column.column->getDataAt(0).data != nullptr) {
if (column.name == "result" && column.column->size() > 0)
{
exist = column.column->get64(0);
if (exist)
break;
}
}
}
@ -724,8 +711,9 @@ private:
break;
}
if (exist == 0) {
std::cerr << "Table " + table_to_check + " doesn't exist" << std::endl;
if (!exist)
{
std::cerr << "Table " << table_to_check << " doesn't exist" << std::endl;
return false;
}
}
@ -741,7 +729,7 @@ private:
for (size_t i = 0; i != input_files.size(); ++i)
{
const String path = input_files[i];
tests_configurations[i] = Config(new XMLConfiguration(path));
tests_configurations[i] = XMLConfigurationPtr(new XMLConfiguration(path));
}
filterConfigurations();
@ -760,7 +748,7 @@ private:
String output = runTest(test_config);
if (lite_output)
std::cout << output << std::endl;
std::cout << output;
else
outputs.push_back(output);
}
@ -783,9 +771,9 @@ private:
}
}
void extractSettings(const Config & config, const String & key,
const Strings & settings_list,
std::map<String, String> settings_to_apply)
void extractSettings(
const XMLConfigurationPtr & config, const String & key,
const Strings & settings_list, std::map<String, String> & settings_to_apply)
{
for (const String & setup : settings_list)
{
@ -800,7 +788,7 @@ private:
}
}
String runTest(Config & test_config)
String runTest(XMLConfigurationPtr & test_config)
{
queries.clear();
@ -819,7 +807,7 @@ private:
if (!profiles_file.empty())
{
String profile_name = test_config->getString("settings.profile");
Config profiles_config(new XMLConfiguration(profiles_file));
XMLConfigurationPtr profiles_config(new XMLConfiguration(profiles_file));
Keys profile_settings;
profiles_config->keys("profiles." + profile_name, profile_settings);
@ -868,7 +856,17 @@ private:
}
if (test_config->has("query"))
{
queries.push_back(test_config->getString("query"));
for (size_t i = 1; ; ++i)
{
std::string key = "query[" + std::to_string(i) + "]";
if (!test_config->has(key))
break;
queries.push_back(test_config->getString(key));
}
}
if (test_config->has("query_file"))
{
@ -907,7 +905,7 @@ private:
throw DB::Exception("Only one query is allowed when using substitutions", 1);
/// Make "subconfig" of inner xml block
AbstractConfig substitutions_view(test_config->createView("substitutions"));
ConfigurationPtr substitutions_view(test_config->createView("substitutions"));
constructSubstitutions(substitutions_view, substitutions);
queries = formatQueries(queries[0], substitutions);
@ -931,22 +929,21 @@ private:
times_to_run = test_config->getUInt("times_to_run");
}
stop_criterions.resize(times_to_run * queries.size());
if (test_config->has("stop"))
TestStopConditions stop_conditions_template;
if (test_config->has("stop_conditions"))
{
AbstractConfig stop_criterions_view(test_config->createView("stop"));
for (StopCriterions & stop_criterion : stop_criterions)
{
stop_criterion.loadFromConfig(stop_criterions_view);
}
ConfigurationPtr stop_conditions_config(test_config->createView("stop_conditions"));
stop_conditions_template.loadFromConfig(stop_conditions_config);
}
else
{
if (stop_conditions_template.empty())
throw DB::Exception("No termination conditions were found in config", 1);
}
AbstractConfig metrics_view(test_config->createView("metrics"));
for (size_t i = 0; i < times_to_run * queries.size(); ++i)
stop_conditions_by_run.push_back(stop_conditions_template);
ConfigurationPtr metrics_view(test_config->createView("metrics"));
Keys metrics;
metrics_view->keys(metrics);
@ -972,7 +969,7 @@ private:
if (metrics.size() > 0)
checkMetricsInput(metrics);
statistics.resize(times_to_run * queries.size());
statistics_by_run.resize(times_to_run * queries.size());
for (size_t number_of_launch = 0; number_of_launch < times_to_run; ++number_of_launch)
{
QueriesWithIndexes queries_with_indexes;
@ -980,7 +977,7 @@ private:
for (size_t query_index = 0; query_index < queries.size(); ++query_index)
{
size_t statistic_index = number_of_launch * queries.size() + query_index;
stop_criterions[statistic_index].reset();
stop_conditions_by_run[statistic_index].reset();
queries_with_indexes.push_back({queries[query_index], statistic_index});
}
@ -1035,159 +1032,102 @@ private:
for (const std::pair<Query, const size_t> & query_and_index : queries_with_indexes)
{
Query query = query_and_index.first;
const size_t statistic_index = query_and_index.second;
const size_t run_index = query_and_index.second;
TestStopConditions & stop_conditions = stop_conditions_by_run[run_index];
Stats & statistics = statistics_by_run[run_index];
size_t max_iterations = stop_criterions[statistic_index].iterations.value;
size_t iteration = 0;
statistics[statistic_index].clear();
execute(query, statistic_index);
statistics.clear();
execute(query, statistics, stop_conditions);
if (exec_type == ExecutionType::Loop)
{
while (!gotSIGINT)
{
++iteration;
/// check stop criterions
if (max_iterations && iteration >= max_iterations)
{
incFulfilledCriterions(statistic_index, iterations);
}
if (stop_criterions[statistic_index].number_of_initialized_min
&& (stop_criterions[statistic_index].fulfilled_criterions_min
>= stop_criterions[statistic_index].number_of_initialized_min))
{
/// All 'min' criterions are fulfilled
stop_conditions.reportIterations(iteration);
if (stop_conditions.areFulfilled())
break;
}
if (stop_criterions[statistic_index].number_of_initialized_max && stop_criterions[statistic_index].fulfilled_criterions_max)
{
/// Some 'max' criterions are fulfilled
break;
}
execute(query, statistic_index);
execute(query, statistics, stop_conditions);
}
}
if (!gotSIGINT)
{
statistics[statistic_index].ready = true;
statistics.ready = true;
}
}
}
void execute(const Query & query, const size_t statistic_index)
void execute(const Query & query, Stats & statistics, TestStopConditions & stop_conditions)
{
statistics[statistic_index].watch_per_query.restart();
statistics.watch_per_query.restart();
statistics.last_query_was_cancelled = false;
RemoteBlockInputStream stream(connection, query, &settings, global_context, nullptr, Tables() /*, query_processing_stage*/);
Progress progress;
stream.setProgressCallback([&progress, &stream, statistic_index, this](const Progress & value) {
progress.incrementPiecewiseAtomically(value);
this->checkFulfilledCriterionsAndUpdate(progress, stream, statistic_index);
});
stream.setProgressCallback(
[&progress, &stream, &statistics, &stop_conditions, this](const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
this->checkFulfilledConditionsAndUpdate(progress, stream, statistics, stop_conditions);
});
stream.readPrefix();
while (Block block = stream.read())
;
stream.readSuffix();
statistics[statistic_index].updateQueryInfo();
statistics[statistic_index].setTotalTime();
if (!statistics.last_query_was_cancelled)
statistics.updateQueryInfo();
statistics.setTotalTime();
}
void checkFulfilledCriterionsAndUpdate(const Progress & progress,
RemoteBlockInputStream & stream,
const size_t statistic_index)
void checkFulfilledConditionsAndUpdate(
const Progress & progress,
RemoteBlockInputStream & stream,
Stats & statistics,
TestStopConditions & stop_conditions)
{
statistics[statistic_index].add(progress.rows, progress.bytes);
statistics.add(progress.rows, progress.bytes);
size_t max_rows_to_read = stop_criterions[statistic_index].rows_read.value;
if (max_rows_to_read && statistics[statistic_index].rows_read >= max_rows_to_read)
stop_conditions.reportRowsRead(statistics.rows_read);
stop_conditions.reportBytesReadUncompressed(statistics.bytes_read);
stop_conditions.reportTotalTime(statistics.watch.elapsed() / (1000 * 1000));
stop_conditions.reportMinTimeNotChangingFor(statistics.min_time_watch.elapsed() / (1000 * 1000));
stop_conditions.reportMaxSpeedNotChangingFor(
statistics.max_rows_speed_watch.elapsed() / (1000 * 1000));
stop_conditions.reportAverageSpeedNotChangingFor(
statistics.avg_rows_speed_watch.elapsed() / (1000 * 1000));
if (stop_conditions.areFulfilled())
{
incFulfilledCriterions(statistic_index, rows_read);
}
size_t max_bytes_to_read = stop_criterions[statistic_index].bytes_read_uncompressed.value;
if (max_bytes_to_read && statistics[statistic_index].bytes_read >= max_bytes_to_read)
{
incFulfilledCriterions(statistic_index, bytes_read_uncompressed);
}
if (UInt64 max_timeout_ms = stop_criterions[statistic_index].timeout_ms.value)
{
/// cast nanoseconds to ms
if ((statistics[statistic_index].watch.elapsed() / (1000 * 1000)) > max_timeout_ms)
{
incFulfilledCriterions(statistic_index, timeout_ms);
}
}
size_t min_time_not_changing_for_ms = stop_criterions[statistic_index].min_time_not_changing_for_ms.value;
if (min_time_not_changing_for_ms)
{
size_t min_time_did_not_change_for = statistics[statistic_index].min_time_watch.elapsed() / (1000 * 1000);
if (min_time_did_not_change_for >= min_time_not_changing_for_ms)
{
incFulfilledCriterions(statistic_index, min_time_not_changing_for_ms);
}
}
size_t max_speed_not_changing_for_ms = stop_criterions[statistic_index].max_speed_not_changing_for_ms.value;
if (max_speed_not_changing_for_ms)
{
UInt64 speed_not_changing_time = statistics[statistic_index].max_rows_speed_watch.elapsed() / (1000 * 1000);
if (speed_not_changing_time >= max_speed_not_changing_for_ms)
{
incFulfilledCriterions(statistic_index, max_speed_not_changing_for_ms);
}
}
size_t average_speed_not_changing_for_ms = stop_criterions[statistic_index].average_speed_not_changing_for_ms.value;
if (average_speed_not_changing_for_ms)
{
UInt64 speed_not_changing_time = statistics[statistic_index].avg_rows_speed_watch.elapsed() / (1000 * 1000);
if (speed_not_changing_time >= average_speed_not_changing_for_ms)
{
incFulfilledCriterions(statistic_index, average_speed_not_changing_for_ms);
}
}
if (stop_criterions[statistic_index].number_of_initialized_min
&& (stop_criterions[statistic_index].fulfilled_criterions_min >= stop_criterions[statistic_index].number_of_initialized_min))
{
/// All 'min' criterions are fulfilled
stream.cancel();
}
if (stop_criterions[statistic_index].number_of_initialized_max && stop_criterions[statistic_index].fulfilled_criterions_max)
{
/// Some 'max' criterions are fulfilled
statistics.last_query_was_cancelled = true;
stream.cancel();
}
if (interrupt_listener.check())
{
gotSIGINT = true;
statistics.last_query_was_cancelled = true;
stream.cancel();
}
}
void constructSubstitutions(AbstractConfig & substitutions_view, StringToVector & substitutions)
void constructSubstitutions(ConfigurationPtr & substitutions_view, StringToVector & substitutions)
{
Keys xml_substitutions;
substitutions_view->keys(xml_substitutions);
for (size_t i = 0; i != xml_substitutions.size(); ++i)
{
const AbstractConfig xml_substitution(substitutions_view->createView("substitution[" + std::to_string(i) + "]"));
const ConfigurationPtr xml_substitution(
substitutions_view->createView("substitution[" + std::to_string(i) + "]"));
/// Property values for substitution will be stored in a vector
/// accessible by property name
@ -1268,16 +1208,10 @@ public:
String constructTotalInfo(Strings metrics)
{
JSONString json_output;
String hostname;
char hostname_buffer[256];
if (gethostname(hostname_buffer, 256) == 0)
{
hostname = String(hostname_buffer);
}
json_output.set("hostname", hostname);
json_output.set("hostname", getFQDNOrHostName());
json_output.set("cpu_num", sysconf(_SC_NPROCESSORS_ONLN));
json_output.set("server_version", server_version);
json_output.set("test_name", test_name);
json_output.set("main_metric", main_metric);
@ -1310,13 +1244,17 @@ public:
std::vector<JSONString> run_infos;
for (size_t query_index = 0; query_index < queries.size(); ++query_index)
{
for (size_t number_of_launch = 0; number_of_launch < statistics.size(); ++number_of_launch)
for (size_t number_of_launch = 0; number_of_launch < times_to_run; ++number_of_launch)
{
if (!statistics[number_of_launch].ready)
Stats & statistics = statistics_by_run[number_of_launch * queries.size() + query_index];
if (!statistics.ready)
continue;
JSONString runJSON;
runJSON.set("query", queries[query_index]);
if (substitutions_maps.size())
{
JSONString parameters(4);
@ -1329,11 +1267,13 @@ public:
runJSON.set("parameters", parameters.asString());
}
if (exec_type == ExecutionType::Loop)
{
/// in seconds
if (std::find(metrics.begin(), metrics.end(), "min_time") != metrics.end())
runJSON.set("min_time", statistics[number_of_launch].min_time / double(1000));
runJSON.set("min_time", statistics.min_time / double(1000));
if (std::find(metrics.begin(), metrics.end(), "quantiles") != metrics.end())
{
@ -1344,44 +1284,44 @@ public:
while (quantile_key.back() == '0')
quantile_key.pop_back();
quantiles.set(quantile_key, statistics[number_of_launch].sampler.quantileInterpolated(percent / 100.0));
quantiles.set(quantile_key, statistics.sampler.quantileInterpolated(percent / 100.0));
}
quantiles.set("0.95", statistics[number_of_launch].sampler.quantileInterpolated(95 / 100.0));
quantiles.set("0.99", statistics[number_of_launch].sampler.quantileInterpolated(99 / 100.0));
quantiles.set("0.999", statistics[number_of_launch].sampler.quantileInterpolated(99.9 / 100.0));
quantiles.set("0.9999", statistics[number_of_launch].sampler.quantileInterpolated(99.99 / 100.0));
quantiles.set("0.95", statistics.sampler.quantileInterpolated(95 / 100.0));
quantiles.set("0.99", statistics.sampler.quantileInterpolated(99 / 100.0));
quantiles.set("0.999", statistics.sampler.quantileInterpolated(99.9 / 100.0));
quantiles.set("0.9999", statistics.sampler.quantileInterpolated(99.99 / 100.0));
runJSON.set("quantiles", quantiles.asString());
}
if (std::find(metrics.begin(), metrics.end(), "total_time") != metrics.end())
runJSON.set("total_time", statistics[number_of_launch].total_time);
runJSON.set("total_time", statistics.total_time);
if (std::find(metrics.begin(), metrics.end(), "queries_per_second") != metrics.end())
runJSON.set("queries_per_second", double(statistics[number_of_launch].queries) /
statistics[number_of_launch].total_time);
runJSON.set("queries_per_second", double(statistics.queries) /
statistics.total_time);
if (std::find(metrics.begin(), metrics.end(), "rows_per_second") != metrics.end())
runJSON.set("rows_per_second", double(statistics[number_of_launch].rows_read) /
statistics[number_of_launch].total_time);
runJSON.set("rows_per_second", double(statistics.rows_read) /
statistics.total_time);
if (std::find(metrics.begin(), metrics.end(), "bytes_per_second") != metrics.end())
runJSON.set("bytes_per_second", double(statistics[number_of_launch].bytes_read) /
statistics[number_of_launch].total_time);
runJSON.set("bytes_per_second", double(statistics.bytes_read) /
statistics.total_time);
}
else
{
if (std::find(metrics.begin(), metrics.end(), "max_rows_per_second") != metrics.end())
runJSON.set("max_rows_per_second", statistics[number_of_launch].max_rows_speed);
runJSON.set("max_rows_per_second", statistics.max_rows_speed);
if (std::find(metrics.begin(), metrics.end(), "max_bytes_per_second") != metrics.end())
runJSON.set("max_bytes_per_second", statistics[number_of_launch].max_bytes_speed);
runJSON.set("max_bytes_per_second", statistics.max_bytes_speed);
if (std::find(metrics.begin(), metrics.end(), "avg_rows_per_second") != metrics.end())
runJSON.set("avg_rows_per_second", statistics[number_of_launch].avg_rows_speed_value);
runJSON.set("avg_rows_per_second", statistics.avg_rows_speed_value);
if (std::find(metrics.begin(), metrics.end(), "avg_bytes_per_second") != metrics.end())
runJSON.set("avg_bytes_per_second", statistics[number_of_launch].avg_bytes_speed_value);
runJSON.set("avg_bytes_per_second", statistics.avg_bytes_speed_value);
}
run_infos.push_back(runJSON);
@ -1401,7 +1341,10 @@ public:
{
for (size_t number_of_launch = 0; number_of_launch < times_to_run; ++number_of_launch)
{
output += test_name + ", ";
if (queries.size() > 1)
{
output += "query \"" + queries[query_index] + "\", ";
}
if (substitutions_maps.size())
{
@ -1413,7 +1356,7 @@ public:
output += "run " + std::to_string(number_of_launch + 1) + ": ";
output += main_metric + " = ";
output += statistics[number_of_launch * queries.size() + query_index].getStatisticByName(main_metric);
output += statistics_by_run[number_of_launch * queries.size() + query_index].getStatisticByName(main_metric);
output += "\n";
}
}
@ -1490,13 +1433,18 @@ int mainEntryClickhousePerformanceTest(int argc, char ** argv)
if (!options.count("input-files"))
{
std::cerr << "Trying to find tests in current folder" << std::endl;
std::cerr << "Trying to find test scenario files in the current folder...";
FS::path curr_dir(".");
getFilesFromDir(curr_dir, input_files);
if (input_files.empty())
{
std::cerr << std::endl;
throw DB::Exception("Did not find any xml files", 1);
}
else
std::cerr << " found " << input_files.size() << " files." << std::endl;
}
else
{

View File

@ -26,6 +26,7 @@ namespace ErrorCodes
extern const int INCORRECT_FILE_NAME;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int TOO_LARGE_SIZE_COMPRESSED;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
}
@ -210,9 +211,10 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
const auto code = e.code();
/// mark file as broken if necessary
if (code == ErrorCodes::CHECKSUM_DOESNT_MATCH ||
code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED ||
code == ErrorCodes::CANNOT_READ_ALL_DATA)
if (code == ErrorCodes::CHECKSUM_DOESNT_MATCH
|| code == ErrorCodes::TOO_LARGE_SIZE_COMPRESSED
|| code == ErrorCodes::CANNOT_READ_ALL_DATA
|| code == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
{
const auto last_path_separator_pos = file_path.rfind('/');
const auto & path = file_path.substr(0, last_path_separator_pos + 1);

View File

@ -1616,8 +1616,7 @@ bool StorageReplicatedMergeTree::queueTask()
bool StorageReplicatedMergeTree::canMergeParts(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
MemoizedPartsThatCouldBeMerged * memo)
const MergeTreeData::DataPartPtr & right)
{
/** It can take a long time to determine whether it is possible to merge two adjacent parts.
* Two adjacent parts can be merged if all block numbers between their numbers are not used (abandoned).
@ -1638,10 +1637,6 @@ bool StorageReplicatedMergeTree::canMergeParts(
|| (left.get() != right.get() && queue.partWillBeMergedOrMergesDisabled(right->name)))
return false;
auto key = std::make_pair(left->name, right->name);
if (memo && memo->count(key))
return true;
String month_name = left->name.substr(0, 6);
auto zookeeper = getZooKeeper();
@ -1705,13 +1700,80 @@ bool StorageReplicatedMergeTree::canMergeParts(
}
}
if (memo)
memo->insert(key);
return true;
}
/** Cache for function, that returns bool.
* If function returned true, cache it forever.
* If function returned false, cache it for exponentially growing time.
* Not thread safe.
*/
template <typename Key>
struct CachedMergingPredicate
{
using clock = std::chrono::steady_clock;
struct Expiration
{
static constexpr clock::duration min_delay = std::chrono::seconds(1);
static constexpr clock::duration max_delay = std::chrono::seconds(600);
static constexpr double exponent_base = 2;
clock::time_point expire_time;
clock::duration delay = clock::duration::zero();
void next(clock::time_point now)
{
if (delay == clock::duration::zero())
delay = min_delay;
else
{
delay *= exponent_base;
if (delay > max_delay)
delay = max_delay;
}
expire_time = now + delay;
}
bool expired(clock::time_point now) const
{
return now > expire_time;
}
};
std::set<Key> true_keys;
std::map<Key, Expiration> false_keys;
template <typename Function, typename ArgsToKey, typename... Args>
bool get(clock::time_point now, Function && function, ArgsToKey && args_to_key, Args &&... args)
{
Key key{args_to_key(std::forward<Args>(args)...)};
if (true_keys.count(key))
return true;
auto it = false_keys.find(key);
if (false_keys.end() != it && !it->second.expired(now))
return false;
bool value = function(std::forward<Args>(args)...);
if (value)
true_keys.insert(key);
else
false_keys[key].next(now);
return value;
}
};
template <typename Key> constexpr CachedMergingPredicate<Key>::clock::duration CachedMergingPredicate<Key>::Expiration::min_delay;
template <typename Key> constexpr CachedMergingPredicate<Key>::clock::duration CachedMergingPredicate<Key>::Expiration::max_delay;
template <typename Key> constexpr double CachedMergingPredicate<Key>::Expiration::exponent_base;
void StorageReplicatedMergeTree::mergeSelectingThread()
{
setThreadName("ReplMTMergeSel");
@ -1720,12 +1782,25 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
bool deduplicate = false; /// TODO: read deduplicate option from table config
bool need_pull = true;
MemoizedPartsThatCouldBeMerged memoized_parts_that_could_be_merged;
auto can_merge = [&memoized_parts_that_could_be_merged, this]
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right) -> bool
auto uncached_merging_predicate = [this](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return canMergeParts(left, right, &memoized_parts_that_could_be_merged);
return canMergeParts(left, right);
};
auto merging_predicate_args_to_key = [](const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return std::make_pair(left->name, right->name);
};
CachedMergingPredicate<std::pair<std::string, std::string>> cached_merging_predicate;
/// Will be updated below.
std::chrono::steady_clock::time_point now;
auto can_merge = [&]
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return cached_merging_predicate.get(now, uncached_merging_predicate, merging_predicate_args_to_key, left, right);
};
while (!shutdown_called && is_leader_node)
@ -1763,6 +1838,8 @@ void StorageReplicatedMergeTree::mergeSelectingThread()
size_t max_parts_size_for_merge = merger.getMaxPartsSizeForMerge(data.settings.max_replicated_merges_in_queue, merges_queued);
now = std::chrono::steady_clock::now();
if (max_parts_size_for_merge > 0
&& merger.selectPartsToMerge(
parts, merged_name, false,
@ -2347,7 +2424,7 @@ bool StorageReplicatedMergeTree::optimize(const String & partition, bool final,
auto can_merge = [this]
(const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right)
{
return canMergeParts(left, right, nullptr);
return canMergeParts(left, right);
};
pullLogsToQueue();

View File

@ -418,12 +418,10 @@ private:
*/
void mergeSelectingThread();
using MemoizedPartsThatCouldBeMerged = std::set<std::pair<std::string, std::string>>;
/// Is it possible to merge parts in the specified range? `memo` is an optional parameter.
bool canMergeParts(
const MergeTreeData::DataPartPtr & left,
const MergeTreeData::DataPartPtr & right,
MemoizedPartsThatCouldBeMerged * memo);
const MergeTreeData::DataPartPtr & right);
/** Write the selected parts to merge into the log,
* Call when merge_selecting_mutex is locked.

View File

@ -26,6 +26,7 @@ add_library (common
src/DateLUTImpl.cpp
src/exp10.cpp
src/JSON.cpp
src/getMemoryAmount.cpp
include/common/ApplicationServerExt.h
include/common/Types.h
@ -41,6 +42,7 @@ add_library (common
include/common/singleton.h
include/common/strong_typedef.h
include/common/JSON.h
include/common/getMemoryAmount.h
include/ext/bit_cast.h
include/ext/collection_cast.h

View File

@ -0,0 +1,9 @@
#pragma once
#include <cstdint>
/**
* Returns the size of physical memory (RAM) in bytes.
* Returns 0 on unsupported platform
*/
uint64_t getMemoryAmount();

View File

@ -160,4 +160,13 @@ std::ostream & operator<<(std::ostream & stream, const std::experimental::option
}
#include <exception>
std::ostream & operator<<(std::ostream & stream, const std::exception & what)
{
stream << "exception{" << what.what() << "}";
return stream;
}
// TODO: add more types

View File

@ -1,6 +1,7 @@
#pragma once
#include <string>
#include <string.h>
#include <algorithm>
#include <type_traits>

View File

@ -0,0 +1,97 @@
#include "common/getMemoryAmount.h"
// http://nadeausoftware.com/articles/2012/09/c_c_tip_how_get_physical_memory_size_system
/*
* Author: David Robert Nadeau
* Site: http://NadeauSoftware.com/
* License: Creative Commons Attribution 3.0 Unported License
* http://creativecommons.org/licenses/by/3.0/deed.en_US
*/
#if defined(WIN32) || defined(_WIN32)
#include <Windows.h>
#else
#include <unistd.h>
#include <sys/types.h>
#include <sys/param.h>
#if defined(BSD)
#include <sys/sysctl.h>
#endif
#endif
/**
* Returns the size of physical memory (RAM) in bytes.
* Returns 0 on unsupported platform
*/
uint64_t getMemoryAmount()
{
#if defined(_WIN32) && (defined(__CYGWIN__) || defined(__CYGWIN32__))
/* Cygwin under Windows. ------------------------------------ */
/* New 64-bit MEMORYSTATUSEX isn't available. Use old 32.bit */
MEMORYSTATUS status;
status.dwLength = sizeof(status);
GlobalMemoryStatus( &status );
return status.dwTotalPhys;
#elif defined(WIN32) || defined(_WIN32)
/* Windows. ------------------------------------------------- */
/* Use new 64-bit MEMORYSTATUSEX, not old 32-bit MEMORYSTATUS */
MEMORYSTATUSEX status;
status.dwLength = sizeof(status);
GlobalMemoryStatusEx( &status );
return status.ullTotalPhys;
#else
/* UNIX variants. ------------------------------------------- */
/* Prefer sysctl() over sysconf() except sysctl() HW_REALMEM and HW_PHYSMEM */
#if defined(CTL_HW) && (defined(HW_MEMSIZE) || defined(HW_PHYSMEM64))
int mib[2];
mib[0] = CTL_HW;
#if defined(HW_MEMSIZE)
mib[1] = HW_MEMSIZE; /* OSX. --------------------- */
#elif defined(HW_PHYSMEM64)
mib[1] = HW_PHYSMEM64; /* NetBSD, OpenBSD. --------- */
#endif
uint64_t size = 0; /* 64-bit */
size_t len = sizeof(size);
if ( sysctl( mib, 2, &size, &len, NULL, 0 ) == 0 ) {
return size;
}
return 0; /* Failed? */
#elif defined(_SC_AIX_REALMEM)
/* AIX. ----------------------------------------------------- */
return sysconf( _SC_AIX_REALMEM ) * 1024;
#elif defined(_SC_PHYS_PAGES) && defined(_SC_PAGESIZE)
/* FreeBSD, Linux, OpenBSD, and Solaris. -------------------- */
return (uint64_t)sysconf( _SC_PHYS_PAGES )
* (uint64_t)sysconf( _SC_PAGESIZE );
#elif defined(_SC_PHYS_PAGES) && defined(_SC_PAGE_SIZE)
/* Legacy. -------------------------------------------------- */
return (uint64_t)sysconf( _SC_PHYS_PAGES )
* (uint64_t)sysconf( _SC_PAGE_SIZE );
#elif defined(CTL_HW) && (defined(HW_PHYSMEM) || defined(HW_REALMEM))
/* DragonFly BSD, FreeBSD, NetBSD, OpenBSD, and OSX. -------- */
int mib[2];
mib[0] = CTL_HW;
#if defined(HW_REALMEM)
mib[1] = HW_REALMEM; /* FreeBSD. ----------------- */
#elif defined(HW_PYSMEM)
mib[1] = HW_PHYSMEM; /* Others. ------------------ */
#endif
unsigned int size = 0; /* 32-bit */
size_t len = sizeof( size );
if ( sysctl( mib, 2, &size, &len, NULL, 0 ) == 0 ) {
return size;
}
return 0; /* Failed? */
#endif /* sysctl and sysconf variants */
#endif
}