Use new connectionPool inerface

This commit is contained in:
ivanzhukov 2017-05-05 15:41:18 +03:00
parent 73fa7905f0
commit f4c6219244

View File

@ -6,7 +6,7 @@
#include <sys/stat.h>
#include <AggregateFunctions/ReservoirSampler.h>
#include <Client/ConnectionPool.h>
#include <Client/Connection.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
@ -38,6 +38,8 @@ namespace ErrorCodes
extern const int UNKNOWN_EXCEPTION;
}
const std::string four_spaces = " ";
bool isNumber(const std::string & str)
{
if (str.empty())
@ -137,7 +139,7 @@ public:
{
for (size_t i = 0; i < _padding + 1; ++i)
{
content[current_key] += "\t";
content[current_key] += four_spaces;
}
content[current_key] += runInfos[i].constructOutput(_padding + 2);
@ -151,7 +153,7 @@ public:
for (size_t i = 0; i < _padding; ++i)
{
content[current_key] += "\t";
content[current_key] += four_spaces;
}
content[current_key] += ']';
current_key = "";
@ -187,7 +189,7 @@ public:
output += "\n";
for (size_t i = 0; i < padding; ++i)
{
output += "\t";
output += four_spaces;
}
std::string key = '\"' + it->first + '\"';
@ -199,7 +201,7 @@ public:
output += "\n";
for (size_t i = 0; i < padding - 1; ++i)
{
output += "\t";
output += four_spaces;
}
output += "}";
return output;
@ -315,10 +317,10 @@ public:
max_speed_not_changing_for_ms(anotherCriterions.max_speed_not_changing_for_ms),
average_speed_not_changing_for_ms(anotherCriterions.average_speed_not_changing_for_ms),
number_of_initialized_min(anotherCriterions.number_of_initialized_min.load()),
number_of_initialized_max(anotherCriterions.number_of_initialized_max.load()),
fulfilled_criterions_min(anotherCriterions.fulfilled_criterions_min.load()),
fulfilled_criterions_max(anotherCriterions.fulfilled_criterions_max.load())
number_of_initialized_min(anotherCriterions.number_of_initialized_min),
number_of_initialized_max(anotherCriterions.number_of_initialized_max),
fulfilled_criterions_min(anotherCriterions.fulfilled_criterions_min),
fulfilled_criterions_max(anotherCriterions.fulfilled_criterions_max)
{
}
@ -359,11 +361,11 @@ public:
/// Hereafter 'min' and 'max', in context of critetions, mean a level of importance
/// Number of initialized properties met in configuration
std::atomic<size_t> number_of_initialized_min;
std::atomic<size_t> number_of_initialized_max;
size_t number_of_initialized_min;
size_t number_of_initialized_max;
std::atomic<size_t> fulfilled_criterions_min;
std::atomic<size_t> fulfilled_criterions_max;
size_t fulfilled_criterions_min;
size_t fulfilled_criterions_max;
};
struct Stats
@ -400,7 +402,7 @@ struct Stats
size_t number_of_rows_speed_info_batches = 0;
size_t number_of_bytes_speed_info_batches = 0;
bool ready = false; // check if a thread completed its work
bool ready = false; // check if a query wasn't interrupted by SIGINT
std::string getStatisticByName(const std::string & statisticName)
{
@ -414,14 +416,14 @@ struct Stats
for (double percent = 10; percent <= 90; percent += 10)
{
result += "\t" + std::to_string((percent / 100));
result += four_spaces + std::to_string((percent / 100));
result += ": " + std::to_string(sampler.quantileInterpolated(percent / 100.0));
result += "\n";
}
result += "\t0.95: " + std::to_string(sampler.quantileInterpolated(95 / 100.0)) + "\n";
result += "\t0.99: " + std::to_string(sampler.quantileInterpolated(99 / 100.0)) + "\n";
result += "\t0.999: " + std::to_string(sampler.quantileInterpolated(99.9 / 100.)) + "\n";
result += "\t0.9999: " + std::to_string(sampler.quantileInterpolated(99.99 / 100.));
result += four_spaces + "0.95: " + std::to_string(sampler.quantileInterpolated(95 / 100.0)) + "\n";
result += four_spaces + "0.99: " + std::to_string(sampler.quantileInterpolated(99 / 100.0)) + "\n";
result += four_spaces + "0.999: " + std::to_string(sampler.quantileInterpolated(99.9 / 100.)) + "\n";
result += four_spaces + "0.9999: " + std::to_string(sampler.quantileInterpolated(99.99 / 100.));
return result;
}
@ -577,7 +579,7 @@ double Stats::avg_bytes_speed_precision = 0.001;
class PerformanceTest
{
public:
PerformanceTest(const unsigned concurrency_,
PerformanceTest(
const String & host_,
const UInt16 port_,
const String & default_database_,
@ -590,10 +592,7 @@ public:
const std::vector<std::string> & without_names,
const std::vector<std::string> & names_regexp,
const std::vector<std::string> & without_names_regexp)
: concurrency(concurrency_),
queue(concurrency_),
connections(concurrency, host_, port_, default_database_, user_, password_),
pool(concurrency),
: connection(host_, port_, default_database_, user_, password_),
testsConfigurations(input_files.size()),
gotSIGINT(false)
{
@ -616,17 +615,13 @@ private:
using QueriesWithIndexes = std::vector<std::pair<Query, size_t>>;
Queries queries;
using Queue = ConcurrentBoundedQueue<std::pair<Query, size_t>>;
Queue queue;
Connection connection;
using Keys = std::vector<std::string>;
ConnectionPool connections;
ThreadPool pool;
Settings settings;
InterruptListener interrupt_listener;
std::vector<std::shared_ptr<RemoteBlockInputStream>> streams;
double average_speed_precision = 0.001;
@ -641,8 +636,9 @@ private:
using StringKeyValue = std::map<std::string, std::string>;
std::vector<StringKeyValue> substitutionsMaps;
std::atomic<bool> gotSIGINT;
bool gotSIGINT;
std::vector<StopCriterions> stopCriterions;
std::string main_metric;
// TODO: create enum class instead of string
#define incFulfilledCriterions(index, CRITERION) \
@ -662,7 +658,6 @@ private:
size_t timesToRun = 1;
std::vector<Stats> statistics;
std::mutex mutex;
void readTestsConfiguration(const Paths & input_files)
{
@ -790,18 +785,16 @@ private:
throw Poco::Exception("No termination conditions were found in config", 1);
}
AbstractConfig metricsView(testConfig->createView("metric"));
AbstractConfig metricsView(testConfig->createView("metrics"));
Keys metrics;
metricsView->keys(metrics);
if (metrics.size() > 1)
{
throw Poco::Exception("More than 1 main metric is not allowed", 1);
}
if (metrics.size() == 1)
if (std::find(metrics.begin(), metrics.end(), "main_metric") != metrics.end())
{
checkMetricInput(metrics[0]);
main_metric = metricsView->getString("main_metric");
metrics.erase( std::remove(metrics.begin(), metrics.end(), "main_metric"), metrics.end() );
}
checkMetricInput(metrics);
statistics.resize(timesToRun * queries.size());
for (size_t numberOfLaunch = 0; numberOfLaunch < timesToRun; ++numberOfLaunch)
@ -831,82 +824,48 @@ private:
constructTotalInfo();
}
void checkMetricInput(const std::string & main_metric) const
void checkMetricInput(const Strings & metrics) const
{
std::vector<std::string> loopMetrics
= {"min_time", "quantiles", "total_time", "queries_per_second", "rows_per_second", "bytes_per_second"};
std::vector<std::string> infiniteMetrics
std::vector<std::string> nonLoopMetrics
= {"max_rows_per_second", "max_bytes_per_second", "avg_rows_per_second", "avg_bytes_per_second"};
if (execType == loop)
{
if (std::find(infiniteMetrics.begin(), infiniteMetrics.end(), main_metric) != infiniteMetrics.end())
for (const std::string & metric : metrics)
{
throw Poco::Exception("Wrong type of main metric for loop "
"execution type",
1);
if (std::find(nonLoopMetrics.begin(), nonLoopMetrics.end(), metric) != nonLoopMetrics.end())
{
throw Poco::Exception("Wrong type of metric for loop execution type (" + metric + ")", 1);
}
}
}
else
{
if (std::find(loopMetrics.begin(), loopMetrics.end(), main_metric) != loopMetrics.end())
for (const std::string & metric : metrics)
{
throw Poco::Exception("Wrong type of main metric for "
"inifinite execution type",
1);
if (std::find(loopMetrics.begin(), loopMetrics.end(), metric) != loopMetrics.end())
{
throw Poco::Exception("Wrong type of metric for non-loop execution type (" + metric + ")", 1);
}
}
}
}
void runQueries(const QueriesWithIndexes & queriesWithIndexes)
{
for (size_t i = 0; i < concurrency; ++i)
{
pool.schedule(std::bind(&PerformanceTest::thread, this, connections.IConnectionPool::get()));
}
for (const std::pair<Query, const size_t> & queryAndIndex : queriesWithIndexes)
{
Query query = queryAndIndex.first;
const size_t statisticIndex = queryAndIndex.second;
queue.push({query, statisticIndex});
}
for (size_t i = 0; i < concurrency; ++i)
{
/// Genlty asking threads to stop
queue.push({"", std::numeric_limits<size_t>::max()});
}
pool.wait();
}
void thread(ConnectionPool::Entry & connection)
{
InterruptListener thread_interrupt_listener;
Query query;
size_t statisticIndex;
std::pair<Query, size_t> queryAndIndex;
while (true)
{
queue.pop(queryAndIndex);
query = queryAndIndex.first;
statisticIndex = queryAndIndex.second;
/// Empty query means end of execution
if (query.empty())
break;
size_t max_iterations = stopCriterions[statisticIndex].iterations.value;
size_t iteration = 0;
statistics[statisticIndex].clear();
execute(connection, thread_interrupt_listener, query, statisticIndex);
execute(query, statisticIndex);
if (execType == loop)
{
@ -934,7 +893,7 @@ private:
break;
}
execute(connection, thread_interrupt_listener, query, statisticIndex);
execute(query, statisticIndex);
}
}
@ -945,48 +904,32 @@ private:
}
}
void execute(
ConnectionPool::Entry & connection, InterruptListener & thread_interrupt_listener, const Query & query, const size_t statisticIndex)
void execute(const Query & query, const size_t statisticIndex)
{
statistics[statisticIndex].watch_per_query.restart();
std::shared_ptr<RemoteBlockInputStream> stream
= std::make_shared<RemoteBlockInputStream>(*connection, query, &settings, nullptr, Tables() /*, query_processing_stage*/
);
size_t stream_index;
{
std::lock_guard<std::mutex> lock(mutex);
streams.push_back(stream);
stream_index = streams.size() - 1;
}
RemoteBlockInputStream stream(connection, query, &settings, nullptr, Tables() /*, query_processing_stage*/);
Progress progress;
stream->setProgressCallback([&progress, &stream, &thread_interrupt_listener, statisticIndex, this](const Progress & value) {
stream.setProgressCallback([&progress, &stream, statisticIndex, this](const Progress & value) {
progress.incrementPiecewiseAtomically(value);
this->checkFulfilledCriterionsAndUpdate(progress, stream, thread_interrupt_listener, statisticIndex);
this->checkFulfilledCriterionsAndUpdate(progress, stream, statisticIndex);
});
stream->readPrefix();
while (Block block = stream->read())
stream.readPrefix();
while (Block block = stream.read())
;
stream->readSuffix();
std::lock_guard<std::mutex> lock(mutex);
streams[stream_index].reset();
stream.readSuffix();
statistics[statisticIndex].updateQueryInfo();
statistics[statisticIndex].setTotalTime();
}
void checkFulfilledCriterionsAndUpdate(const Progress & progress,
const std::shared_ptr<RemoteBlockInputStream> & stream,
InterruptListener & thread_interrupt_listener,
RemoteBlockInputStream & stream,
const size_t statisticIndex)
{
std::lock_guard<std::mutex> lock(mutex);
statistics[statisticIndex].add(progress.rows, progress.bytes);
size_t max_rows_to_read = stopCriterions[statisticIndex].rows_read.value;
@ -1045,26 +988,19 @@ private:
&& (stopCriterions[statisticIndex].fulfilled_criterions_min >= stopCriterions[statisticIndex].number_of_initialized_min))
{
/// All 'min' criterions are fulfilled
stream->cancel();
stream.cancel();
}
if (stopCriterions[statisticIndex].number_of_initialized_max && stopCriterions[statisticIndex].fulfilled_criterions_max)
{
/// Some 'max' criterions are fulfilled
stream->cancel();
stream.cancel();
}
if (thread_interrupt_listener.check())
{ /// SIGINT
if (interrupt_listener.check())
{
gotSIGINT = true;
for (const std::shared_ptr<RemoteBlockInputStream> & stream : streams)
{
if (stream)
{
stream->cancel();
}
}
stream.cancel();
}
}
@ -1250,7 +1186,7 @@ public:
jsonOutput["runs"].set(runInfos);
std::cout << jsonOutput << std::endl;
std::cout << std::endl << jsonOutput << std::endl;
}
void minOutput(const std::string & main_metric)
@ -1290,15 +1226,19 @@ int mainEntryClickhousePerformanceTest(int argc, char ** argv)
using Strings = std::vector<std::string>;
boost::program_options::options_description desc("Allowed options");
desc.add_options()("help", "produce help message")("concurrency,c",
value<unsigned>()->default_value(1),
"number of parallel queries")("host,h", value<std::string>()->default_value("localhost"), "")(
"port", value<UInt16>()->default_value(9000), "")("user", value<std::string>()->default_value("default"), "")(
"password", value<std::string>()->default_value(""), "")("database", value<std::string>()->default_value("default"), "")(
"tag", value<Strings>(), "Run only tests with tag")("without-tag", value<Strings>(), "Do not run tests with tag")(
"name", value<Strings>(), "Run tests with specific name")("without-name", value<Strings>(), "Do not run tests with name")(
"name-regexp", value<Strings>(), "Run tests with names matching regexp")(
"without-name-regexp", value<Strings>(), "Do not run tests with names matching regexp");
desc.add_options()
("help", "produce help message")
("host,h", value<std::string>()->default_value("localhost"), "")
("port", value<UInt16>()->default_value(9000), "")
("user", value<std::string>()->default_value("default"), "")
("password", value<std::string>()->default_value(""), "")
("database", value<std::string>()->default_value("default"), "")
("tag", value<Strings>(), "Run only tests with tag")
("without-tag", value<Strings>(), "Do not run tests with tag")
("name", value<Strings>(), "Run tests with specific name")
("without-name", value<Strings>(), "Do not run tests with name")
("name-regexp", value<Strings>(), "Run tests with names matching regexp")
("without-name-regexp", value<Strings>(), "Do not run tests with names matching regexp");
/// These options will not be displayed in --help
boost::program_options::options_description hidden("Hidden options");
@ -1367,7 +1307,7 @@ int mainEntryClickhousePerformanceTest(int argc, char ** argv)
skip_matching_regexp = options["without-name-regexp"].as<Strings>();
}
PerformanceTest performanceTest(options["concurrency"].as<unsigned>(),
PerformanceTest performanceTest(
options["host"].as<std::string>(),
options["port"].as<UInt16>(),
options["database"].as<std::string>(),