performance-test: allow same stop condition to be present in all_of and any_of sections [#CLICKHOUSE-3086]

This commit is contained in:
Alexey Zatelepin 2017-06-21 21:41:50 +03:00 committed by alexey-milovidov
parent 1783d233e2
commit 374e92f76e

View File

@ -117,125 +117,38 @@ public:
}
};
enum class PriorityType
{
AllOf,
AnyOf
};
struct CriterionWithPriority
{
PriorityType priority;
size_t value;
bool fulfilled;
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
CriterionWithPriority() : value(0), fulfilled(false)
{
}
CriterionWithPriority(const CriterionWithPriority &) = default;
};
/// Termination criterions. The running test will be terminated in either of two conditions:
/// 1. All criterions marked 'all_of' are fulfilled
/// or
/// 2. Any criterion marked 'any_of' is fulfilled
class StopCriterions
{
private:
using AbstractConfiguration = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
using Keys = std::vector<String>;
void initializeStruct(const String & priority, const AbstractConfiguration & stop_criterions_view)
/// A set of supported stop conditions.
struct StopConditionsSet
{
void loadFromConfig(const ConfigurationPtr & stop_conditions_view)
{
using Keys = std::vector<String>;
Keys keys;
stop_criterions_view->keys(priority, keys);
PriorityType priority_type = (priority == "all_of" ? PriorityType::AllOf : PriorityType::AnyOf);
stop_conditions_view->keys(keys);
for (const String & key : keys)
{
if (key == "total_time_ms")
{
total_time_ms.value = stop_criterions_view->getUInt64(priority + ".total_time_ms");
total_time_ms.priority = priority_type;
}
total_time_ms.value = stop_conditions_view->getUInt64(key);
else if (key == "rows_read")
{
rows_read.value = stop_criterions_view->getUInt64(priority + ".rows_read");
rows_read.priority = priority_type;
}
rows_read.value = stop_conditions_view->getUInt64(key);
else if (key == "bytes_read_uncompressed")
{
bytes_read_uncompressed.value = stop_criterions_view->getUInt64(priority + ".bytes_read_uncompressed");
bytes_read_uncompressed.priority = priority_type;
}
bytes_read_uncompressed.value = stop_conditions_view->getUInt64(key);
else if (key == "iterations")
{
iterations.value = stop_criterions_view->getUInt64(priority + ".iterations");
iterations.priority = priority_type;
}
iterations.value = stop_conditions_view->getUInt64(key);
else if (key == "min_time_not_changing_for_ms")
{
min_time_not_changing_for_ms.value = stop_criterions_view->getUInt64(priority + ".min_time_not_changing_for_ms");
min_time_not_changing_for_ms.priority = priority_type;
}
min_time_not_changing_for_ms.value = stop_conditions_view->getUInt64(key);
else if (key == "max_speed_not_changing_for_ms")
{
max_speed_not_changing_for_ms.value = stop_criterions_view->getUInt64(priority + ".max_speed_not_changing_for_ms");
max_speed_not_changing_for_ms.priority = priority_type;
}
max_speed_not_changing_for_ms.value = stop_conditions_view->getUInt64(key);
else if (key == "average_speed_not_changing_for_ms")
{
average_speed_not_changing_for_ms.value = stop_criterions_view->getUInt64(priority + ".average_speed_not_changing_for_ms");
average_speed_not_changing_for_ms.priority = priority_type;
}
average_speed_not_changing_for_ms.value = stop_conditions_view->getUInt64(key);
else
{
throw DB::Exception("Met unkown stop criterion: " + key, 1);
}
throw DB::Exception("Met unkown stop condition: " + key, 1);
if (priority == "all_of")
{
++number_of_initialized_all_of;
};
if (priority == "any_of")
{
++number_of_initialized_any_of;
};
}
}
public:
StopCriterions() : number_of_initialized_all_of(0), number_of_initialized_any_of(0), fulfilled_criterions_all_of(0), fulfilled_criterions_any_of(0)
{
}
StopCriterions(const StopCriterions & another_criterions)
: total_time_ms(another_criterions.total_time_ms),
rows_read(another_criterions.rows_read),
bytes_read_uncompressed(another_criterions.bytes_read_uncompressed),
iterations(another_criterions.iterations),
min_time_not_changing_for_ms(another_criterions.min_time_not_changing_for_ms),
max_speed_not_changing_for_ms(another_criterions.max_speed_not_changing_for_ms),
average_speed_not_changing_for_ms(another_criterions.average_speed_not_changing_for_ms),
number_of_initialized_all_of(another_criterions.number_of_initialized_all_of),
number_of_initialized_any_of(another_criterions.number_of_initialized_any_of),
fulfilled_criterions_all_of(another_criterions.fulfilled_criterions_all_of),
fulfilled_criterions_any_of(another_criterions.fulfilled_criterions_any_of)
{
}
void loadFromConfig(const AbstractConfiguration & stop_criterions_view)
{
if (stop_criterions_view->has("all_of"))
{
initializeStruct("all_of", stop_criterions_view);
}
if (stop_criterions_view->has("any_of"))
{
initializeStruct("any_of", stop_criterions_view);
++initialized_count;
}
}
@ -249,25 +162,98 @@ public:
max_speed_not_changing_for_ms.fulfilled = false;
average_speed_not_changing_for_ms.fulfilled = false;
fulfilled_criterions_all_of = 0;
fulfilled_criterions_any_of = 0;
fulfilled_count = 0;
}
CriterionWithPriority total_time_ms;
CriterionWithPriority rows_read;
CriterionWithPriority bytes_read_uncompressed;
CriterionWithPriority iterations;
CriterionWithPriority min_time_not_changing_for_ms;
CriterionWithPriority max_speed_not_changing_for_ms;
CriterionWithPriority average_speed_not_changing_for_ms;
/// Note: only conditions with UInt64 minimal thresholds are supported.
/// I.e. condition is fulfilled when value is exceeded.
struct StopCondition
{
UInt64 value = 0;
bool fulfilled = false;
};
/// Hereafter 'all_of' and 'any_of', in context of critetions, mean a level of importance
/// Number of initialized properties met in configuration
size_t number_of_initialized_all_of;
size_t number_of_initialized_any_of;
void report(UInt64 value, StopCondition & condition)
{
if (condition.value && !condition.fulfilled && value >= condition.value)
{
condition.fulfilled = true;
++fulfilled_count;
}
}
size_t fulfilled_criterions_all_of;
size_t fulfilled_criterions_any_of;
StopCondition total_time_ms;
StopCondition rows_read;
StopCondition bytes_read_uncompressed;
StopCondition iterations;
StopCondition min_time_not_changing_for_ms;
StopCondition max_speed_not_changing_for_ms;
StopCondition average_speed_not_changing_for_ms;
size_t initialized_count = 0;
size_t fulfilled_count = 0;
};
/// Stop conditions for a test run. The running test will be terminated in either of two conditions:
/// 1. All conditions marked 'all_of' are fulfilled
/// or
/// 2. Any condition marked 'any_of' is fulfilled
class TestStopConditions
{
public:
void loadFromConfig(ConfigurationPtr & stop_conditions_config)
{
if (stop_conditions_config->has("all_of"))
{
ConfigurationPtr config_all_of(stop_conditions_config->createView("all_of"));
conditions_all_of.loadFromConfig(config_all_of);
}
if (stop_conditions_config->has("any_of"))
{
ConfigurationPtr config_any_of(stop_conditions_config->createView("any_of"));
conditions_any_of.loadFromConfig(config_any_of);
}
}
bool empty() const
{
return !conditions_all_of.initialized_count && !conditions_any_of.initialized_count;
}
#define DEFINE_REPORT_FUNC(FUNC_NAME, CONDITION) \
void FUNC_NAME(UInt64 value) \
{ \
conditions_all_of.report(value, conditions_all_of.CONDITION); \
conditions_any_of.report(value, conditions_any_of.CONDITION); \
} \
DEFINE_REPORT_FUNC(reportTotalTime, total_time_ms);
DEFINE_REPORT_FUNC(reportRowsRead, rows_read);
DEFINE_REPORT_FUNC(reportBytesReadUncompressed, bytes_read_uncompressed);
DEFINE_REPORT_FUNC(reportIterations, iterations);
DEFINE_REPORT_FUNC(reportMinTimeNotChangingFor, min_time_not_changing_for_ms);
DEFINE_REPORT_FUNC(reportMaxSpeedNotChangingFor, max_speed_not_changing_for_ms);
DEFINE_REPORT_FUNC(reportAverageSpeedNotChangingFor, average_speed_not_changing_for_ms);
#undef REPORT
bool areFulfilled() const
{
return
(conditions_all_of.initialized_count
&& conditions_all_of.fulfilled_count >= conditions_all_of.initialized_count)
|| (conditions_any_of.initialized_count && conditions_any_of.fulfilled_count);
}
void reset()
{
conditions_all_of.reset();
conditions_any_of.reset();
}
private:
StopConditionsSet conditions_all_of;
StopConditionsSet conditions_any_of;
};
struct Stats
@ -553,8 +539,7 @@ private:
InterruptListener interrupt_listener;
using XMLConfiguration = Poco::Util::XMLConfiguration;
using AbstractConfig = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
using Config = Poco::AutoPtr<XMLConfiguration>;
using XMLConfigurationPtr = Poco::AutoPtr<XMLConfiguration>;
using Paths = std::vector<String>;
using StringToVector = std::map<String, std::vector<String>>;
@ -564,13 +549,13 @@ private:
std::vector<StringKeyValue> substitutions_maps;
bool gotSIGINT;
std::vector<StopCriterions> stop_criterions_by_run;
std::vector<TestStopConditions> stop_conditions_by_run;
String main_metric;
bool lite_output;
String profiles_file;
Strings input_files;
std::vector<Config> tests_configurations;
std::vector<XMLConfigurationPtr> tests_configurations;
Strings tests_tags;
Strings skip_tags;
@ -579,14 +564,6 @@ private:
Strings tests_names_regexp;
Strings skip_names_regexp;
#define INC_FULFILLED_CRITERIONS(stop_criterions, CRITERION) \
if (!stop_criterions.CRITERION.fulfilled) \
{ \
stop_criterions.CRITERION.priority == PriorityType::AllOf ? ++stop_criterions.fulfilled_criterions_all_of \
: ++stop_criterions.fulfilled_criterions_any_of; \
stop_criterions.CRITERION.fulfilled = true; \
}
enum class ExecutionType
{
Loop,
@ -605,9 +582,12 @@ private:
std::vector<Stats> statistics_by_run;
/// Removes configurations that has a given value. If leave is true, the logic is reversed.
void removeConfigurationsIf(std::vector<Config> & configs, FilterType filter_type, const Strings & values, bool leave = false)
void removeConfigurationsIf(
std::vector<XMLConfigurationPtr> & configs,
FilterType filter_type, const Strings & values, bool leave = false)
{
auto checker = [&filter_type, &values, &leave](Config & config) {
auto checker = [&filter_type, &values, &leave](XMLConfigurationPtr & config)
{
if (values.size() == 0)
return false;
@ -650,7 +630,7 @@ private:
return remove_or_not;
};
std::vector<Config>::iterator new_end = std::remove_if(configs.begin(), configs.end(), checker);
auto new_end = std::remove_if(configs.begin(), configs.end(), checker);
configs.erase(new_end, configs.end());
}
@ -670,7 +650,7 @@ private:
}
/// Checks specified preconditions per test (process cache, table existence, etc.)
bool checkPreconditions(const Config & config)
bool checkPreconditions(const XMLConfigurationPtr & config)
{
if (!config->has("preconditions"))
return true;
@ -756,7 +736,7 @@ private:
for (size_t i = 0; i != input_files.size(); ++i)
{
const String path = input_files[i];
tests_configurations[i] = Config(new XMLConfiguration(path));
tests_configurations[i] = XMLConfigurationPtr(new XMLConfiguration(path));
}
filterConfigurations();
@ -798,9 +778,9 @@ private:
}
}
void extractSettings(const Config & config, const String & key,
const Strings & settings_list,
std::map<String, String> settings_to_apply)
void extractSettings(
const XMLConfigurationPtr & config, const String & key,
const Strings & settings_list, std::map<String, String> & settings_to_apply)
{
for (const String & setup : settings_list)
{
@ -815,7 +795,7 @@ private:
}
}
String runTest(Config & test_config)
String runTest(XMLConfigurationPtr & test_config)
{
queries.clear();
@ -834,7 +814,7 @@ private:
if (!profiles_file.empty())
{
String profile_name = test_config->getString("settings.profile");
Config profiles_config(new XMLConfiguration(profiles_file));
XMLConfigurationPtr profiles_config(new XMLConfiguration(profiles_file));
Keys profile_settings;
profiles_config->keys("profiles." + profile_name, profile_settings);
@ -932,7 +912,7 @@ private:
throw DB::Exception("Only one query is allowed when using substitutions", 1);
/// Make "subconfig" of inner xml block
AbstractConfig substitutions_view(test_config->createView("substitutions"));
ConfigurationPtr substitutions_view(test_config->createView("substitutions"));
constructSubstitutions(substitutions_view, substitutions);
queries = formatQueries(queries[0], substitutions);
@ -956,22 +936,21 @@ private:
times_to_run = test_config->getUInt("times_to_run");
}
stop_criterions_by_run.resize(times_to_run * queries.size());
TestStopConditions stop_conditions_template;
if (test_config->has("stop_conditions"))
{
AbstractConfig stop_criterions_view(test_config->createView("stop_conditions"));
for (StopCriterions & stop_criterions : stop_criterions_by_run)
{
stop_criterions.loadFromConfig(stop_criterions_view);
}
}
else
{
throw DB::Exception("No termination conditions were found in config", 1);
ConfigurationPtr stop_conditions_config(test_config->createView("stop_conditions"));
stop_conditions_template.loadFromConfig(stop_conditions_config);
}
AbstractConfig metrics_view(test_config->createView("metrics"));
if (stop_conditions_template.empty())
throw DB::Exception("No termination conditions were found in config", 1);
for (size_t i = 0; i < times_to_run * queries.size(); ++i)
stop_conditions_by_run.push_back(stop_conditions_template);
ConfigurationPtr metrics_view(test_config->createView("metrics"));
Keys metrics;
metrics_view->keys(metrics);
@ -1005,7 +984,7 @@ private:
for (size_t query_index = 0; query_index < queries.size(); ++query_index)
{
size_t statistic_index = number_of_launch * queries.size() + query_index;
stop_criterions_by_run[statistic_index].reset();
stop_conditions_by_run[statistic_index].reset();
queries_with_indexes.push_back({queries[query_index], statistic_index});
}
@ -1062,43 +1041,24 @@ private:
Query query = query_and_index.first;
const size_t run_index = query_and_index.second;
StopCriterions & stop_criterions = stop_criterions_by_run[run_index];
size_t max_iterations = stop_criterions.iterations.value;
size_t iteration = 0;
TestStopConditions & stop_conditions = stop_conditions_by_run[run_index];
Stats & statistics = statistics_by_run[run_index];
size_t iteration = 0;
statistics.clear();
execute(query, statistics, stop_criterions);
execute(query, statistics, stop_conditions);
if (exec_type == ExecutionType::Loop)
{
while (!gotSIGINT)
{
++iteration;
/// check stop criterions
if (max_iterations && iteration >= max_iterations)
{
INC_FULFILLED_CRITERIONS(stop_criterions, iterations);
}
if (stop_criterions.number_of_initialized_all_of
&& (stop_criterions.fulfilled_criterions_all_of
>= stop_criterions.number_of_initialized_all_of))
{
/// All 'all_of' criterions are fulfilled
stop_conditions.reportIterations(iteration);
if (stop_conditions.areFulfilled())
break;
}
if (stop_criterions.number_of_initialized_any_of && stop_criterions.fulfilled_criterions_any_of)
{
/// Some 'any_of' criterions are fulfilled
break;
}
execute(query, statistics, stop_criterions);
execute(query, statistics, stop_conditions);
}
}
@ -1109,7 +1069,7 @@ private:
}
}
void execute(const Query & query, Stats & statistics, StopCriterions & stop_criterions)
void execute(const Query & query, Stats & statistics, TestStopConditions & stop_conditions)
{
statistics.watch_per_query.restart();
statistics.last_query_was_cancelled = false;
@ -1118,10 +1078,10 @@ private:
Progress progress;
stream.setProgressCallback(
[&progress, &stream, &statistics, &stop_criterions, this](const Progress & value)
[&progress, &stream, &statistics, &stop_conditions, this](const Progress & value)
{
progress.incrementPiecewiseAtomically(value);
this->checkFulfilledCriterionsAndUpdate(progress, stream, statistics, stop_criterions);
this->checkFulfilledConditionsAndUpdate(progress, stream, statistics, stop_conditions);
});
stream.readPrefix();
@ -1135,77 +1095,25 @@ private:
statistics.setTotalTime();
}
void checkFulfilledCriterionsAndUpdate(
void checkFulfilledConditionsAndUpdate(
const Progress & progress,
RemoteBlockInputStream & stream,
Stats & statistics,
StopCriterions & stop_criterions)
TestStopConditions & stop_conditions)
{
statistics.add(progress.rows, progress.bytes);
size_t max_rows_to_read = stop_criterions.rows_read.value;
if (max_rows_to_read && statistics.rows_read >= max_rows_to_read)
{
INC_FULFILLED_CRITERIONS(stop_criterions, rows_read);
}
stop_conditions.reportRowsRead(statistics.rows_read);
stop_conditions.reportBytesReadUncompressed(statistics.bytes_read);
stop_conditions.reportTotalTime(statistics.watch.elapsed() / (1000 * 1000));
stop_conditions.reportMinTimeNotChangingFor(statistics.min_time_watch.elapsed() / (1000 * 1000));
stop_conditions.reportMaxSpeedNotChangingFor(
statistics.max_rows_speed_watch.elapsed() / (1000 * 1000));
stop_conditions.reportAverageSpeedNotChangingFor(
statistics.avg_rows_speed_watch.elapsed() / (1000 * 1000));
size_t max_bytes_to_read = stop_criterions.bytes_read_uncompressed.value;
if (max_bytes_to_read && statistics.bytes_read >= max_bytes_to_read)
if (stop_conditions.areFulfilled())
{
INC_FULFILLED_CRITERIONS(stop_criterions, bytes_read_uncompressed);
}
if (UInt64 max_total_time_ms = stop_criterions.total_time_ms.value)
{
/// cast nanoseconds to ms
if ((statistics.watch.elapsed() / (1000 * 1000)) > max_total_time_ms)
{
INC_FULFILLED_CRITERIONS(stop_criterions, total_time_ms);
}
}
size_t min_time_not_changing_for_ms = stop_criterions.min_time_not_changing_for_ms.value;
if (min_time_not_changing_for_ms)
{
size_t min_time_did_not_change_for = statistics.min_time_watch.elapsed() / (1000 * 1000);
if (min_time_did_not_change_for >= min_time_not_changing_for_ms)
{
INC_FULFILLED_CRITERIONS(stop_criterions, min_time_not_changing_for_ms);
}
}
size_t max_speed_not_changing_for_ms = stop_criterions.max_speed_not_changing_for_ms.value;
if (max_speed_not_changing_for_ms)
{
UInt64 speed_not_changing_time = statistics.max_rows_speed_watch.elapsed() / (1000 * 1000);
if (speed_not_changing_time >= max_speed_not_changing_for_ms)
{
INC_FULFILLED_CRITERIONS(stop_criterions, max_speed_not_changing_for_ms);
}
}
size_t average_speed_not_changing_for_ms = stop_criterions.average_speed_not_changing_for_ms.value;
if (average_speed_not_changing_for_ms)
{
UInt64 speed_not_changing_time = statistics.avg_rows_speed_watch.elapsed() / (1000 * 1000);
if (speed_not_changing_time >= average_speed_not_changing_for_ms)
{
INC_FULFILLED_CRITERIONS(stop_criterions, average_speed_not_changing_for_ms);
}
}
if (stop_criterions.number_of_initialized_all_of
&& (stop_criterions.fulfilled_criterions_all_of >= stop_criterions.number_of_initialized_all_of))
{
/// All 'all_of' criterions are fulfilled
statistics.last_query_was_cancelled = true;
stream.cancel();
}
if (stop_criterions.number_of_initialized_any_of && stop_criterions.fulfilled_criterions_any_of)
{
/// Some 'any_of' criterions are fulfilled
statistics.last_query_was_cancelled = true;
stream.cancel();
}
@ -1218,14 +1126,15 @@ private:
}
}
void constructSubstitutions(AbstractConfig & substitutions_view, StringToVector & substitutions)
void constructSubstitutions(ConfigurationPtr & substitutions_view, StringToVector & substitutions)
{
Keys xml_substitutions;
substitutions_view->keys(xml_substitutions);
for (size_t i = 0; i != xml_substitutions.size(); ++i)
{
const AbstractConfig xml_substitution(substitutions_view->createView("substitution[" + std::to_string(i) + "]"));
const ConfigurationPtr xml_substitution(
substitutions_view->createView("substitution[" + std::to_string(i) + "]"));
/// Property values for substitution will be stored in a vector
/// accessible by property name