Dont print corrupted statistics

This commit is contained in:
ivanzhukov 2017-04-07 22:49:22 +03:00
parent dca10863c7
commit 26606a1556

View File

@ -347,6 +347,8 @@ struct Stats
size_t number_of_rows_speed_info_batches = 0;
size_t number_of_bytes_speed_info_batches = 0;
bool ready = false; // check if a thread completed its work
std::string getStatisticByName(const std::string & statisticName) {
if (statisticName == "min_time") {
return std::to_string(min_time) + "ms";
@ -517,7 +519,8 @@ public:
concurrency(concurrency_), queue(concurrency_),
connections(concurrency, host_, port_, default_database_, user_, password_),
pool(concurrency),
testsConfigurations(input_files.size())
testsConfigurations(input_files.size()),
gotSIGINT(false)
{
if (input_files.size() < 1) {
throw Poco::Exception("No tests were specified", 0);
@ -547,7 +550,6 @@ private:
Settings settings;
InterruptListener interrupt_listener;
bool gotSIGINT = false;
std::vector<std::shared_ptr<RemoteBlockInputStream>> streams;
double average_speed_precision = 0.001;
@ -563,6 +565,7 @@ private:
using StringKeyValue = std::map<std::string, std::string>;
std::vector<StringKeyValue> substitutionsMaps;
std::atomic<bool> gotSIGINT;
std::vector<StopCriterions> stopCriterions;
// TODO: create enum class instead of string
@ -722,6 +725,14 @@ private:
queriesWithIndexes.push_back({queries[queryIndex], statisticIndex});
}
if (interrupt_listener.check()) {
gotSIGINT = true;
}
if (gotSIGINT) {
break;
}
runQueries(queriesWithIndexes);
}
@ -784,6 +795,8 @@ private:
void thread(ConnectionPool::Entry & connection)
{
InterruptListener thread_interrupt_listener;
Query query;
size_t statisticIndex;
@ -802,7 +815,7 @@ private:
size_t iteration = 0;
statistics[statisticIndex].clear();
execute(connection, query, statisticIndex);
execute(connection, thread_interrupt_listener, query, statisticIndex);
if (execType == loop) {
while (! gotSIGINT) {
@ -826,16 +839,20 @@ private:
break;
}
execute(connection, query, statisticIndex);
execute(connection, thread_interrupt_listener, query, statisticIndex);
}
}
if (! gotSIGINT) {
statistics[statisticIndex].ready = true;
}
}
}
void execute(ConnectionPool::Entry & connection, const Query & query,
const size_t statisticIndex)
void execute(ConnectionPool::Entry & connection,
InterruptListener & thread_interrupt_listener,
const Query & query, const size_t statisticIndex)
{
InterruptListener thread_interrupt_listener;
statistics[statisticIndex].watch_per_query.restart();
std::shared_ptr<RemoteBlockInputStream> stream = std::make_shared<RemoteBlockInputStream>(
@ -847,7 +864,6 @@ private:
std::lock_guard<std::mutex> lock(mutex);
streams.push_back(stream);
stream_index = streams.size() - 1;
// TODO: remove them later?
}
Progress progress;
@ -956,8 +972,6 @@ private:
stream->cancel();
}
}
std::cout << "got SIGNINT; stopping streams" << std::endl;
}
}
@ -1091,8 +1105,11 @@ public:
jsonOutput["parameters"].set(jsonParameters);
}
std::vector<JSONString> runInfos(statistics.size());
std::vector<JSONString> runInfos;
for (size_t numberOfLaunch = 0; numberOfLaunch < statistics.size(); ++numberOfLaunch) {
if (!statistics[numberOfLaunch].ready)
continue;
JSONString runJSON;
size_t queryIndex = numberOfLaunch % queries.size();
@ -1133,7 +1150,7 @@ public:
runJSON["avg_bytes_per_second"].set(statistics[numberOfLaunch].avg_bytes_speed_value);
}
runInfos[numberOfLaunch] = runJSON;
runInfos.push_back(runJSON);
}
jsonOutput["runs"].set(runInfos);