mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-03 13:02:00 +00:00
Separate metrics output for queries
This commit is contained in:
parent
09ca5e0032
commit
70fb951831
@ -191,10 +191,16 @@ std::ostream & operator<<(std::ostream & stream, const JSONString & jsonObj)
|
||||
return stream;
|
||||
}
|
||||
|
||||
enum PriorityType { min, max };
|
||||
|
||||
struct CriterionWithPriority {
|
||||
std::string priority = "";
|
||||
size_t value = 0;
|
||||
bool fulfilled = false;
|
||||
PriorityType priority;
|
||||
size_t value;
|
||||
bool fulfilled;
|
||||
|
||||
CriterionWithPriority()
|
||||
: value(0), fulfilled(false) {}
|
||||
CriterionWithPriority(const CriterionWithPriority &) = default;
|
||||
};
|
||||
|
||||
/// Termination criterions. The running test will be terminated in either of two conditions:
|
||||
@ -206,7 +212,7 @@ private:
|
||||
using AbstractConfiguration = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
|
||||
using Keys = std::vector<std::string>;
|
||||
|
||||
void initializeStruct(const std::string priority,
|
||||
void initializeStruct(const std::string & priority,
|
||||
const AbstractConfiguration & stopCriterionsView)
|
||||
{
|
||||
Keys keys;
|
||||
@ -215,25 +221,25 @@ private:
|
||||
for (const std::string & key : keys) {
|
||||
if (key == "timeout_ms") {
|
||||
timeout_ms.value = stopCriterionsView->getUInt64(priority + ".timeout_ms");
|
||||
timeout_ms.priority = priority;
|
||||
timeout_ms.priority = (priority == "min" ? min : max);
|
||||
} else if (key == "rows_read") {
|
||||
rows_read.value = stopCriterionsView->getUInt64(priority + ".rows_read");
|
||||
rows_read.priority = priority;
|
||||
rows_read.priority = (priority == "min" ? min : max);
|
||||
} else if (key == "bytes_read_uncompressed") {
|
||||
bytes_read_uncompressed.value = stopCriterionsView->getUInt64(priority + ".bytes_read_uncompressed");
|
||||
bytes_read_uncompressed.priority = priority;
|
||||
bytes_read_uncompressed.priority = (priority == "min" ? min : max);
|
||||
} else if (key == "iterations") {
|
||||
iterations.value = stopCriterionsView->getUInt64(priority + ".iterations");
|
||||
iterations.priority = priority;
|
||||
iterations.priority = (priority == "min" ? min : max);
|
||||
} else if (key == "min_time_not_changing_for_ms") {
|
||||
min_time_not_changing_for_ms.value = stopCriterionsView->getUInt64(priority + ".min_time_not_changing_for_ms");
|
||||
min_time_not_changing_for_ms.priority = priority;
|
||||
min_time_not_changing_for_ms.priority = (priority == "min" ? min : max);
|
||||
} else if (key == "max_speed_not_changing_for_ms") {
|
||||
max_speed_not_changing_for_ms.value = stopCriterionsView->getUInt64(priority + ".max_speed_not_changing_for_ms");
|
||||
max_speed_not_changing_for_ms.priority = priority;
|
||||
max_speed_not_changing_for_ms.priority = (priority == "min" ? min : max);
|
||||
} else if (key == "average_speed_not_changing_for_ms") {
|
||||
average_speed_not_changing_for_ms.value = stopCriterionsView->getUInt64(priority + ".average_speed_not_changing_for_ms");
|
||||
average_speed_not_changing_for_ms.priority = priority;
|
||||
average_speed_not_changing_for_ms.priority = (priority == "min" ? min : max);
|
||||
} else {
|
||||
throw Poco::Exception("Met unkown stop criterion: " + key, 1);
|
||||
}
|
||||
@ -248,6 +254,21 @@ public:
|
||||
: number_of_initialized_min(0), number_of_initialized_max(0),
|
||||
fulfilled_criterions_min(0), fulfilled_criterions_max(0) {}
|
||||
|
||||
StopCriterions(const StopCriterions &anotherCriterions)
|
||||
:
|
||||
timeout_ms(anotherCriterions.timeout_ms),
|
||||
rows_read(anotherCriterions.rows_read),
|
||||
bytes_read_uncompressed(anotherCriterions.bytes_read_uncompressed),
|
||||
iterations(anotherCriterions.iterations),
|
||||
min_time_not_changing_for_ms(anotherCriterions.min_time_not_changing_for_ms),
|
||||
max_speed_not_changing_for_ms(anotherCriterions.max_speed_not_changing_for_ms),
|
||||
average_speed_not_changing_for_ms(anotherCriterions.average_speed_not_changing_for_ms),
|
||||
|
||||
number_of_initialized_min(anotherCriterions.number_of_initialized_min.load()),
|
||||
number_of_initialized_max(anotherCriterions.number_of_initialized_max.load()),
|
||||
fulfilled_criterions_min(anotherCriterions.fulfilled_criterions_min.load()),
|
||||
fulfilled_criterions_max(anotherCriterions.fulfilled_criterions_max.load()) {}
|
||||
|
||||
void loadFromConfig(const AbstractConfiguration & stopCriterionsView)
|
||||
{
|
||||
if (stopCriterionsView->has("min")) {
|
||||
@ -273,13 +294,13 @@ public:
|
||||
fulfilled_criterions_max = 0;
|
||||
}
|
||||
|
||||
struct CriterionWithPriority timeout_ms;
|
||||
struct CriterionWithPriority rows_read;
|
||||
struct CriterionWithPriority bytes_read_uncompressed;
|
||||
struct CriterionWithPriority iterations;
|
||||
struct CriterionWithPriority min_time_not_changing_for_ms;
|
||||
struct CriterionWithPriority max_speed_not_changing_for_ms;
|
||||
struct CriterionWithPriority average_speed_not_changing_for_ms;
|
||||
CriterionWithPriority timeout_ms;
|
||||
CriterionWithPriority rows_read;
|
||||
CriterionWithPriority bytes_read_uncompressed;
|
||||
CriterionWithPriority iterations;
|
||||
CriterionWithPriority min_time_not_changing_for_ms;
|
||||
CriterionWithPriority max_speed_not_changing_for_ms;
|
||||
CriterionWithPriority average_speed_not_changing_for_ms;
|
||||
|
||||
/// Hereafter 'min' and 'max', in context of critetions, mean a level of importance
|
||||
/// Number of initialized properties met in configuration
|
||||
@ -315,11 +336,11 @@ struct Stats
|
||||
|
||||
double avg_rows_speed_value = 0;
|
||||
double avg_rows_speed_first = 0;
|
||||
double avg_rows_speed_precision = 0.001;
|
||||
static double avg_rows_speed_precision;
|
||||
|
||||
double avg_bytes_speed_value = 0;
|
||||
double avg_bytes_speed_first = 0;
|
||||
double avg_bytes_speed_precision = 0.001;
|
||||
static double avg_bytes_speed_precision;
|
||||
|
||||
size_t number_of_rows_speed_info_batches = 0;
|
||||
size_t number_of_bytes_speed_info_batches = 0;
|
||||
@ -470,6 +491,9 @@ struct Stats
|
||||
}
|
||||
};
|
||||
|
||||
double Stats::avg_rows_speed_precision = 0.001;
|
||||
double Stats::avg_bytes_speed_precision = 0.001;
|
||||
|
||||
class PerformanceTest
|
||||
{
|
||||
public:
|
||||
@ -494,7 +518,7 @@ public:
|
||||
testsConfigurations(input_files.size())
|
||||
{
|
||||
if (input_files.size() < 1) {
|
||||
throw Poco::Exception("No tests were specified", 1);
|
||||
throw Poco::Exception("No tests were specified", 0);
|
||||
}
|
||||
|
||||
std::cerr << std::fixed << std::setprecision(3);
|
||||
@ -507,10 +531,11 @@ private:
|
||||
std::string testName;
|
||||
|
||||
using Query = std::string;
|
||||
using Queries = std::vector<std::string>;
|
||||
using Queries = std::vector<Query>;
|
||||
using QueriesWithIndexes = std::vector<std::pair<Query, size_t>>;
|
||||
Queries queries;
|
||||
|
||||
using Queue = ConcurrentBoundedQueue<Query>;
|
||||
using Queue = ConcurrentBoundedQueue<std::pair<Query, size_t>>;
|
||||
Queue queue;
|
||||
|
||||
using Keys = std::vector<std::string>;
|
||||
@ -521,7 +546,7 @@ private:
|
||||
|
||||
InterruptListener interrupt_listener;
|
||||
bool gotSIGINT = false;
|
||||
std::vector<RemoteBlockInputStream*> streams;
|
||||
std::vector<std::shared_ptr<RemoteBlockInputStream>> streams;
|
||||
|
||||
double average_speed_precision = 0.001;
|
||||
|
||||
@ -536,14 +561,15 @@ private:
|
||||
using StringKeyValue = std::map<std::string, std::string>;
|
||||
std::vector<StringKeyValue> substitutionsMaps;
|
||||
|
||||
struct StopCriterions stopCriterions;
|
||||
std::vector<StopCriterions> stopCriterions;
|
||||
|
||||
#define incFulfilledCriterions(CRITERION) \
|
||||
if (! stopCriterions.CRITERION.fulfilled) {\
|
||||
stopCriterions.CRITERION.priority == "min" \
|
||||
? ++stopCriterions.fulfilled_criterions_min \
|
||||
: ++stopCriterions.fulfilled_criterions_max; \
|
||||
stopCriterions.CRITERION.fulfilled = true; \
|
||||
// TODO: create enum class instead of string
|
||||
#define incFulfilledCriterions(index, CRITERION) \
|
||||
if (! stopCriterions[index].CRITERION.fulfilled) {\
|
||||
stopCriterions[index].CRITERION.priority == min \
|
||||
? ++stopCriterions[index].fulfilled_criterions_min \
|
||||
: ++stopCriterions[index].fulfilled_criterions_max; \
|
||||
stopCriterions[index].CRITERION.fulfilled = true; \
|
||||
}
|
||||
|
||||
enum ExecutionType { loop, once };
|
||||
@ -604,12 +630,12 @@ private:
|
||||
|
||||
if (std::find(configSettings.begin(), configSettings.end(),
|
||||
"average_rows_speed_precision") != configSettings.end()) {
|
||||
statistics.back().avg_rows_speed_precision = testConfig->getDouble("settings.average_rows_speed_precision");
|
||||
Stats::avg_rows_speed_precision = testConfig->getDouble("settings.average_rows_speed_precision");
|
||||
}
|
||||
|
||||
if (std::find(configSettings.begin(), configSettings.end(),
|
||||
"average_bytes_speed_precision") != configSettings.end()) {
|
||||
statistics.back().avg_bytes_speed_precision = testConfig->getDouble("settings.average_bytes_speed_precision");
|
||||
Stats::avg_bytes_speed_precision = testConfig->getDouble("settings.average_bytes_speed_precision");
|
||||
}
|
||||
}
|
||||
|
||||
@ -644,7 +670,7 @@ private:
|
||||
|
||||
if (! testConfig->has("type")) {
|
||||
throw Poco::Exception("Missing type property in config: " +
|
||||
testName);
|
||||
testName, 1);
|
||||
}
|
||||
|
||||
std::string configExecType = testConfig->getString("type");
|
||||
@ -656,43 +682,51 @@ private:
|
||||
throw Poco::Exception("Unknown type " + configExecType + " in :" +
|
||||
testName, 1);
|
||||
|
||||
if (testConfig->has("stop")) {
|
||||
AbstractConfig stopCriterionsView(testConfig
|
||||
->createView("stop"));
|
||||
stopCriterions.loadFromConfig(stopCriterionsView);
|
||||
} else {
|
||||
throw Poco::Exception("No termination conditions were found", 1);
|
||||
}
|
||||
|
||||
if (testConfig->has("timesToRun")) {
|
||||
timesToRun = testConfig->getUInt("timesToRun");
|
||||
}
|
||||
|
||||
for (size_t numberOfLaunch = 0; numberOfLaunch < timesToRun; ++numberOfLaunch) {
|
||||
stopCriterions.reset();
|
||||
statistics.emplace_back();
|
||||
stopCriterions.resize(timesToRun * queries.size());
|
||||
|
||||
if (execType == loop) {
|
||||
runLoopQuery(queries[0]);
|
||||
} else {
|
||||
runQueries(queries);
|
||||
if (testConfig->has("stop")) {
|
||||
AbstractConfig stopCriterionsView(testConfig
|
||||
->createView("stop"));
|
||||
for (StopCriterions & stopCriterion : stopCriterions) {
|
||||
stopCriterion.loadFromConfig(stopCriterionsView);
|
||||
}
|
||||
|
||||
statistics.back().setTotalTime();
|
||||
} else {
|
||||
throw Poco::Exception("No termination conditions were found in config", 1);
|
||||
}
|
||||
|
||||
AbstractConfig metricsView(testConfig->createView("metric"));
|
||||
|
||||
Keys metrics;
|
||||
metricsView->keys(metrics);
|
||||
if (metrics.size() > 1) {
|
||||
throw Poco::Exception("More than 1 main metric is not allowed");
|
||||
throw Poco::Exception("More than 1 main metric is not allowed", 1);
|
||||
}
|
||||
|
||||
if (metrics.size() == 1) {
|
||||
checkMetricInput(metrics[0]);
|
||||
}
|
||||
|
||||
statistics.resize(timesToRun * queries.size());
|
||||
for (size_t numberOfLaunch = 0; numberOfLaunch < timesToRun; ++numberOfLaunch) {
|
||||
QueriesWithIndexes queriesWithIndexes;
|
||||
|
||||
for (size_t queryIndex = 0; queryIndex < queries.size(); ++queryIndex) {
|
||||
size_t statisticIndex = numberOfLaunch * queries.size() + queryIndex;
|
||||
stopCriterions[statisticIndex].reset();
|
||||
|
||||
queriesWithIndexes.push_back({queries[queryIndex], statisticIndex});
|
||||
}
|
||||
|
||||
runQueries(queriesWithIndexes);
|
||||
}
|
||||
|
||||
if (metrics.size() == 1) {
|
||||
minOutput(metrics[0]);
|
||||
} else {
|
||||
}
|
||||
else {
|
||||
constructTotalInfo();
|
||||
}
|
||||
}
|
||||
@ -711,66 +745,18 @@ private:
|
||||
if (execType == loop) {
|
||||
if (std::find(infiniteMetrics.begin(), infiniteMetrics.end(), main_metric) != infiniteMetrics.end()) {
|
||||
throw Poco::Exception("Wrong type of main metric for loop "
|
||||
"execution type");
|
||||
"execution type", 1);
|
||||
}
|
||||
} else {
|
||||
if (std::find(loopMetrics.begin(), loopMetrics.end(), main_metric) != loopMetrics.end()) {
|
||||
throw Poco::Exception("Wrong type of main metric for "
|
||||
"inifinite execution type");
|
||||
"inifinite execution type", 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void runLoopQuery(const Query & query)
|
||||
void runQueries(const QueriesWithIndexes & queriesWithIndexes)
|
||||
{
|
||||
statistics.back().clear();
|
||||
|
||||
size_t max_iterations = stopCriterions.iterations.value;
|
||||
size_t i = -1;
|
||||
|
||||
while (! gotSIGINT) {
|
||||
++i;
|
||||
|
||||
pool.schedule(std::bind(
|
||||
&PerformanceTest::thread,
|
||||
this,
|
||||
connections.IConnectionPool::get()
|
||||
));
|
||||
|
||||
queue.push(query);
|
||||
queue.push(""); /// asking thread to stop
|
||||
pool.wait();
|
||||
|
||||
/// check stop criterions
|
||||
if (max_iterations && i >= max_iterations) {
|
||||
incFulfilledCriterions(iterations);
|
||||
}
|
||||
|
||||
if (stopCriterions.number_of_initialized_min &&
|
||||
(stopCriterions.fulfilled_criterions_min >=
|
||||
stopCriterions.number_of_initialized_min)) {
|
||||
/// All 'min' criterions are fulfilled
|
||||
// TODO:
|
||||
std::cout << "All 'min' criterions are fulfilled" << std::endl;
|
||||
break;
|
||||
}
|
||||
|
||||
if (stopCriterions.number_of_initialized_max &&
|
||||
stopCriterions.fulfilled_criterions_max) {
|
||||
/// Some 'max' criterions are fulfilled
|
||||
|
||||
// TODO:
|
||||
std::cout << stopCriterions.fulfilled_criterions_max
|
||||
<< "'max' criterions are fulfilled" << std::endl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void runQueries(const Queries & queries)
|
||||
{
|
||||
statistics.back().clear();
|
||||
|
||||
for (size_t i = 0; i < concurrency; ++i) {
|
||||
pool.schedule(std::bind(
|
||||
&PerformanceTest::thread,
|
||||
@ -779,13 +765,16 @@ private:
|
||||
));
|
||||
}
|
||||
|
||||
for (const Query & query : queries) {
|
||||
queue.push(query);
|
||||
for (const std::pair<Query, const size_t> & queryAndIndex : queriesWithIndexes) {
|
||||
Query query = queryAndIndex.first;
|
||||
const size_t statisticIndex = queryAndIndex.second;
|
||||
|
||||
queue.push({query, statisticIndex});
|
||||
}
|
||||
|
||||
for (size_t i = 0; i != concurrency; ++i) {
|
||||
for (size_t i = 0; i < concurrency; ++i) {
|
||||
/// Genlty asking threads to stop
|
||||
queue.push("");
|
||||
queue.push({"", std::numeric_limits<size_t>::max()});
|
||||
}
|
||||
|
||||
pool.wait();
|
||||
@ -794,24 +783,60 @@ private:
|
||||
void thread(ConnectionPool::Entry & connection)
|
||||
{
|
||||
Query query;
|
||||
size_t statisticIndex;
|
||||
|
||||
std::pair<Query, size_t> queryAndIndex;
|
||||
|
||||
while (true) {
|
||||
queue.pop(query);
|
||||
queue.pop(queryAndIndex);
|
||||
query = queryAndIndex.first;
|
||||
statisticIndex = queryAndIndex.second;
|
||||
|
||||
/// Empty query means end of execution
|
||||
if (query.empty())
|
||||
break;
|
||||
|
||||
execute(connection, query);
|
||||
size_t max_iterations = stopCriterions[statisticIndex].iterations.value;
|
||||
size_t iteration = 0;
|
||||
|
||||
statistics[statisticIndex].clear();
|
||||
execute(connection, query, statisticIndex);
|
||||
|
||||
if (execType == loop) {
|
||||
while (! gotSIGINT) {
|
||||
++iteration;
|
||||
|
||||
/// check stop criterions
|
||||
if (max_iterations && iteration >= max_iterations) {
|
||||
incFulfilledCriterions(statisticIndex, iterations);
|
||||
}
|
||||
|
||||
if (stopCriterions[statisticIndex].number_of_initialized_min &&
|
||||
(stopCriterions[statisticIndex].fulfilled_criterions_min >=
|
||||
stopCriterions[statisticIndex].number_of_initialized_min)) {
|
||||
/// All 'min' criterions are fulfilled
|
||||
break;
|
||||
}
|
||||
|
||||
if (stopCriterions[statisticIndex].number_of_initialized_max &&
|
||||
stopCriterions[statisticIndex].fulfilled_criterions_max) {
|
||||
/// Some 'max' criterions are fulfilled
|
||||
break;
|
||||
}
|
||||
|
||||
execute(connection, query, statisticIndex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void execute(ConnectionPool::Entry & connection, const Query & query)
|
||||
void execute(ConnectionPool::Entry & connection, const Query & query,
|
||||
const size_t statisticIndex)
|
||||
{
|
||||
InterruptListener thread_interrupt_listener;
|
||||
statistics.back().watch_per_query.restart();
|
||||
statistics[statisticIndex].watch_per_query.restart();
|
||||
|
||||
RemoteBlockInputStream * stream = new RemoteBlockInputStream(
|
||||
std::shared_ptr<RemoteBlockInputStream> stream = std::make_shared<RemoteBlockInputStream>(
|
||||
connection, query, &settings, nullptr, Tables()/*, query_processing_stage*/
|
||||
);
|
||||
|
||||
@ -820,15 +845,18 @@ private:
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
streams.push_back(stream);
|
||||
stream_index = streams.size() - 1;
|
||||
// TODO: remove them later?
|
||||
}
|
||||
|
||||
Progress progress;
|
||||
stream->setProgressCallback(
|
||||
[&progress, &stream, &thread_interrupt_listener, this]
|
||||
[&progress, &stream, &thread_interrupt_listener, statisticIndex, this]
|
||||
(const Progress & value) {
|
||||
progress.incrementPiecewiseAtomically(value);
|
||||
|
||||
this->checkFulfilledCriterionsAndUpdate(progress, stream, thread_interrupt_listener);
|
||||
this->checkFulfilledCriterionsAndUpdate(
|
||||
progress, stream, thread_interrupt_listener, statisticIndex
|
||||
);
|
||||
});
|
||||
|
||||
stream->readPrefix();
|
||||
@ -837,104 +865,94 @@ private:
|
||||
stream->readSuffix();
|
||||
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
streams[stream_index].reset();
|
||||
|
||||
streams.erase(streams.begin() + stream_index);
|
||||
delete stream;
|
||||
|
||||
statistics.back().updateQueryInfo();
|
||||
|
||||
// const BlockStreamProfileInfo & info = stream->getProfileInfo();
|
||||
// double seconds = watch.elapsedSeconds();
|
||||
// std::lock_guard<std::mutex> lock(mutex);
|
||||
// info_per_interval.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
|
||||
// statistics.back().add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
|
||||
statistics[statisticIndex].updateQueryInfo();
|
||||
statistics[statisticIndex].setTotalTime();
|
||||
}
|
||||
|
||||
void checkFulfilledCriterionsAndUpdate(const Progress & progress,
|
||||
RemoteBlockInputStream * stream,
|
||||
InterruptListener & thread_interrupt_listener)
|
||||
const std::shared_ptr<RemoteBlockInputStream> & stream,
|
||||
InterruptListener & thread_interrupt_listener,
|
||||
const size_t statisticIndex)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
statistics.back().add(progress.rows, progress.bytes);
|
||||
statistics[statisticIndex].add(progress.rows, progress.bytes);
|
||||
|
||||
size_t max_rows_to_read = stopCriterions.rows_read.value;
|
||||
if (max_rows_to_read && statistics.back().rows_read >= max_rows_to_read) {
|
||||
incFulfilledCriterions(rows_read);
|
||||
size_t max_rows_to_read = stopCriterions[statisticIndex].rows_read.value;
|
||||
if (max_rows_to_read && statistics[statisticIndex].rows_read >= max_rows_to_read) {
|
||||
incFulfilledCriterions(statisticIndex, rows_read);
|
||||
}
|
||||
|
||||
size_t max_bytes_to_read = stopCriterions.bytes_read_uncompressed.value;
|
||||
if (max_bytes_to_read && statistics.back().bytes_read >= max_bytes_to_read) {
|
||||
incFulfilledCriterions(bytes_read_uncompressed);
|
||||
size_t max_bytes_to_read = stopCriterions[statisticIndex].bytes_read_uncompressed.value;
|
||||
if (max_bytes_to_read && statistics[statisticIndex].bytes_read >= max_bytes_to_read) {
|
||||
incFulfilledCriterions(statisticIndex, bytes_read_uncompressed);
|
||||
}
|
||||
|
||||
if (UInt64 max_timeout_ms = stopCriterions.timeout_ms.value) {
|
||||
if (UInt64 max_timeout_ms = stopCriterions[statisticIndex].timeout_ms.value) {
|
||||
/// cast nanoseconds to ms
|
||||
if ((statistics.back().watch.elapsed() / (1000 * 1000)) > max_timeout_ms) {
|
||||
incFulfilledCriterions(timeout_ms);
|
||||
if ((statistics[statisticIndex].watch.elapsed() / (1000 * 1000)) > max_timeout_ms) {
|
||||
incFulfilledCriterions(statisticIndex, timeout_ms);
|
||||
}
|
||||
}
|
||||
|
||||
size_t min_time_not_changing_for_ms = stopCriterions
|
||||
size_t min_time_not_changing_for_ms = stopCriterions[statisticIndex]
|
||||
.min_time_not_changing_for_ms.value;
|
||||
if (min_time_not_changing_for_ms) {
|
||||
size_t min_time_did_not_change_for = statistics.back()
|
||||
size_t min_time_did_not_change_for = statistics[statisticIndex]
|
||||
.min_time_watch
|
||||
.elapsed() / (1000 * 1000);
|
||||
|
||||
if (min_time_did_not_change_for >= min_time_not_changing_for_ms) {
|
||||
incFulfilledCriterions(min_time_not_changing_for_ms);
|
||||
incFulfilledCriterions(statisticIndex, min_time_not_changing_for_ms);
|
||||
}
|
||||
}
|
||||
|
||||
size_t max_speed_not_changing_for_ms = stopCriterions
|
||||
size_t max_speed_not_changing_for_ms = stopCriterions[statisticIndex]
|
||||
.max_speed_not_changing_for_ms
|
||||
.value;
|
||||
if (max_speed_not_changing_for_ms) {
|
||||
UInt64 speed_not_changing_time = statistics.back()
|
||||
UInt64 speed_not_changing_time = statistics[statisticIndex]
|
||||
.max_rows_speed_watch
|
||||
.elapsed() / (1000 * 1000);
|
||||
if (speed_not_changing_time >= max_speed_not_changing_for_ms) {
|
||||
incFulfilledCriterions(max_speed_not_changing_for_ms);
|
||||
incFulfilledCriterions(statisticIndex, max_speed_not_changing_for_ms);
|
||||
}
|
||||
}
|
||||
|
||||
size_t average_speed_not_changing_for_ms = stopCriterions
|
||||
size_t average_speed_not_changing_for_ms = stopCriterions[statisticIndex]
|
||||
.average_speed_not_changing_for_ms
|
||||
.value;
|
||||
if (average_speed_not_changing_for_ms) {
|
||||
UInt64 speed_not_changing_time = statistics.back()
|
||||
UInt64 speed_not_changing_time = statistics[statisticIndex]
|
||||
.avg_rows_speed_watch
|
||||
.elapsed() / (1000 * 1000);
|
||||
if (speed_not_changing_time >= average_speed_not_changing_for_ms) {
|
||||
incFulfilledCriterions(average_speed_not_changing_for_ms);
|
||||
incFulfilledCriterions(statisticIndex, average_speed_not_changing_for_ms);
|
||||
}
|
||||
}
|
||||
|
||||
if (stopCriterions.number_of_initialized_min &&
|
||||
(stopCriterions.fulfilled_criterions_min >=
|
||||
stopCriterions.number_of_initialized_min)) {
|
||||
if (stopCriterions[statisticIndex].number_of_initialized_min &&
|
||||
(stopCriterions[statisticIndex].fulfilled_criterions_min >=
|
||||
stopCriterions[statisticIndex].number_of_initialized_min)) {
|
||||
/// All 'min' criterions are fulfilled
|
||||
// TODO:
|
||||
std::cout << "All 'min' criterions are fulfilled" << std::endl;
|
||||
stream->cancel();
|
||||
}
|
||||
|
||||
if (stopCriterions.number_of_initialized_max &&
|
||||
stopCriterions.fulfilled_criterions_max) {
|
||||
if (stopCriterions[statisticIndex].number_of_initialized_max &&
|
||||
stopCriterions[statisticIndex].fulfilled_criterions_max) {
|
||||
/// Some 'max' criterions are fulfilled
|
||||
|
||||
// TODO:
|
||||
std::cout << stopCriterions.fulfilled_criterions_max
|
||||
<< "'max' criterions are fulfilled" << std::endl;
|
||||
stream->cancel();
|
||||
}
|
||||
|
||||
if (thread_interrupt_listener.check()) { /// SIGINT
|
||||
gotSIGINT = true;
|
||||
|
||||
for (RemoteBlockInputStream * stream : streams) {
|
||||
stream->cancel();
|
||||
for (const std::shared_ptr<RemoteBlockInputStream> & stream : streams) {
|
||||
if (stream) {
|
||||
stream->cancel();
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << "got SIGNINT; stopping streams" << std::endl;
|
||||
@ -1038,7 +1056,7 @@ public:
|
||||
void constructTotalInfo()
|
||||
{
|
||||
JSONString jsonOutput;
|
||||
std::string hostname = "";
|
||||
std::string hostname;
|
||||
|
||||
char hostname_buffer[256];
|
||||
if (gethostname(hostname_buffer, 256) == 0) {
|
||||
@ -1046,11 +1064,11 @@ public:
|
||||
}
|
||||
|
||||
jsonOutput["hostname"].set(hostname);
|
||||
jsonOutput["Number of CPUs: "].set(sysconf(_SC_NPROCESSORS_ONLN));
|
||||
jsonOutput["Number of CPUs"].set(sysconf(_SC_NPROCESSORS_ONLN));
|
||||
jsonOutput["test_name"].set(testName);
|
||||
|
||||
if (substitutions.size()) {
|
||||
JSONString jsonParameters;
|
||||
JSONString jsonParameters(2); /// here, 2 is the size of \t padding
|
||||
|
||||
for (auto it = substitutions.begin(); it != substitutions.end(); ++it) {
|
||||
std::string parameter = it->first;
|
||||
@ -1071,10 +1089,22 @@ public:
|
||||
jsonOutput["parameters"].set(jsonParameters);
|
||||
}
|
||||
|
||||
std::vector<JSONString> runInfos(timesToRun);
|
||||
for (size_t numberOfLaunch = 0; numberOfLaunch < timesToRun; ++numberOfLaunch) {
|
||||
std::vector<JSONString> runInfos(statistics.size());
|
||||
for (size_t numberOfLaunch = 0; numberOfLaunch < statistics.size(); ++numberOfLaunch) {
|
||||
JSONString runJSON;
|
||||
|
||||
size_t queryIndex = numberOfLaunch % queries.size();
|
||||
if (substitutionsMaps.size()) {
|
||||
JSONString parameters(4);
|
||||
|
||||
for (auto it = substitutionsMaps[queryIndex].begin();
|
||||
it != substitutionsMaps[queryIndex].end(); ++it) {
|
||||
parameters[it->first].set(it->second);
|
||||
}
|
||||
|
||||
runJSON["parameters"].set(parameters);
|
||||
}
|
||||
|
||||
if (execType == loop) {
|
||||
runJSON["min_time"].set(std::to_string(statistics[numberOfLaunch].min_time / 1000)
|
||||
+ "." + std::to_string(statistics[numberOfLaunch].min_time % 1000) + "s");
|
||||
@ -1111,13 +1141,23 @@ public:
|
||||
|
||||
void minOutput(const std::string & main_metric)
|
||||
{
|
||||
// TODO: remove
|
||||
std::cout << "test" << std::endl;
|
||||
|
||||
for (size_t numberOfLaunch = 0; numberOfLaunch < timesToRun; ++numberOfLaunch) {
|
||||
std::cout << "run " << numberOfLaunch + 1 << ": ";
|
||||
std::cout << main_metric << " = " << statistics[numberOfLaunch].getStatisticByName(main_metric);
|
||||
std::cout << std::endl;
|
||||
for (size_t queryIndex = 0; queryIndex < queries.size(); ++queryIndex) {
|
||||
std::cout << testName << ", ";
|
||||
|
||||
if (substitutionsMaps.size()) {
|
||||
for (auto it = substitutionsMaps[queryIndex].begin();
|
||||
it != substitutionsMaps[queryIndex].end(); ++it) {
|
||||
std::cout << it->first << " = " << it->second << ", ";
|
||||
}
|
||||
}
|
||||
|
||||
std::cout << "run " << numberOfLaunch + 1 << ": ";
|
||||
std::cout << main_metric << " = ";
|
||||
std::cout << statistics[numberOfLaunch * queries.size() + queryIndex]
|
||||
.getStatisticByName(main_metric);
|
||||
std::cout << std::endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
Loading…
Reference in New Issue
Block a user