Execute queries without terminate conditions

This commit is contained in:
ivanzhukov 2017-01-31 19:48:36 +03:00
parent 4e5362b246
commit 1986ceb819

View File

@ -3,12 +3,15 @@
#include <sys/stat.h>
#include <boost/program_options.hpp>
#include <DB/Common/ThreadPool.h>
#include <DB/Core/Types.h>
#include <DB/Client/ConnectionPool.h>
#include <DB/Common/ThreadPool.h>
#include <DB/Common/ConcurrentBoundedQueue.h>
#include <DB/Core/Types.h>
#include <DB/DataStreams/RemoteBlockInputStream.h>
#include <DB/Interpreters/Settings.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/ReadBufferFromFileDescriptor.h>
#include <DB/Interpreters/Settings.h>
#include <Poco/AutoPtr.h>
#include <Poco/XML/XMLStream.h>
#include <Poco/SAX/InputSource.h>
@ -48,7 +51,7 @@ public:
const std::vector<std::string> & names_regexp,
const std::vector<std::string> & without_names_regexp
):
concurrency(concurrency_),
concurrency(concurrency_), queue(concurrency_),
connections(concurrency, host_, port_, default_database_, user_, password_),
pool(concurrency),
testsConfigurations(input_files.size())
@ -63,8 +66,18 @@ public:
private:
unsigned concurrency;
size_t max_iterations = 1;
using Query = std::string;
using Queries = std::vector<std::string>;
Queries queries;
using Queue = ConcurrentBoundedQueue<Query>;
Queue queue;
ConnectionPool connections;
ThreadPool pool;
ThreadPool pool;
Settings settings;
using XMLConfiguration = Poco::Util::XMLConfiguration;
using AbstractConfig = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
@ -73,6 +86,9 @@ private:
using StringToVector = std::map< std::string, std::vector<std::string> >;
std::vector<Config> testsConfigurations;
enum ExecutionType { loop, once };
ExecutionType execType;
void readTestsConfiguration(const Paths & input_files)
{
testsConfigurations.resize(input_files.size());
@ -95,43 +111,48 @@ private:
void runTest(Config & testConfig)
{
std::cout << "Running: " << testConfig->getString("name") << "\n";
std::string testName = testConfig->getString("name");
std::cout << "Running: " << testName << "\n";
Settings settingsPerTest;
/// Preprocess configuration file
using Keys = std::vector<std::string>;
if (testConfig->has("settings")) {
Keys settings;
testConfig->keys("settings", settings);
Keys configSettings;
testConfig->keys("settings", configSettings);
/// This macro goes through all settings in the Settings.h
/// and, if found any settings in test's xml configuration
/// with the same name, sets its value to settingsPerTest
/// with the same name, sets its value to settings
std::vector<std::string>::iterator it;
#define EXTRACT_SETTING(TYPE, NAME, DEFAULT) \
it = std::find(settings.begin(), settings.end(), #NAME); \
if (it != settings.end()) \
settingsPerTest.set( \
it = std::find(configSettings.begin(), configSettings.end(), #NAME); \
if (it != configSettings.end()) \
settings.set( \
#NAME, testConfig->getString("settings."#NAME) \
);
APPLY_FOR_SETTINGS(EXTRACT_SETTING)
APPLY_FOR_LIMITS(EXTRACT_SETTING)
#undef EXTRACT_SETTING
if (std::find(settings.begin(), settings.end(), "profile") !=
settings.end()) {
// proceed profile settings in a proper way
if (std::find(configSettings.begin(), configSettings.end(), "profile") !=
configSettings.end()) {
// TODO: proceed profile settings in a proper way
}
}
std::string query;
Query query;
if (testConfig->has("query")) {
query = testConfig->getString("query");
if (! testConfig->has("query")) {
throw Poco::Exception("Missing query field in test's config: " +
testName, 1);
}
if (query.empty()) {
throw Poco::Exception("The query is empty", 1);
}
query = testConfig->getString("query");
if (query.empty()) {
throw Poco::Exception("The query is empty in test's config: " +
testName, 1);
}
if (testConfig->has("substitutions")) {
@ -142,8 +163,98 @@ private:
StringToVector substitutions;
constructSubstitutions(substitutionsView, substitutions);
std::vector<std::string> queries = formatQueries(query, substitutions);
queries = formatQueries(query, substitutions);
} else {
// TODO: probably it will be a good practice to check if
// query string has {substitution pattern}, but no substitution field
// was found in xml configuration
queries.push_back(query);
}
if (! testConfig->has("type")) {
throw Poco::Exception("Missing type property in config: " +
testName);
}
std::string configExecType = testConfig->getString("type");
if (configExecType == "loop")
execType = loop;
else if (configExecType == "once")
execType = once;
else
throw Poco::Exception("Unknown type " + configExecType + " in :" +
testName, 1);
for (const Query & query : queries) {
std::cout << query << std::endl;
runQuery(query);
}
}
void runQuery(const Query & query)
{
// TODO: proceed terminationConditions
for (size_t i = 0; i < concurrency; ++i) {
pool.schedule(std::bind(
&PerformanceTest::thread,
this,
connections.IConnectionPool::get()
));
}
for (size_t i = 0; (execType == loop) || i < max_iterations; ++i) {
// TODO: start timer and terminate after time exceeds
queue.push(query);
}
for (size_t i = 0; i != concurrency; ++i) {
/// Genlty asking threads to stop
queue.push("");
}
pool.wait();
}
void thread(ConnectionPool::Entry & connection)
{
Query query;
while (true) {
queue.pop(query);
/// Empty query means end of execution
if (query.empty())
break;
execute(connection, query);
}
}
void execute(ConnectionPool::Entry & connection, const Query & query)
{
// Stopwatch watch;
RemoteBlockInputStream stream(connection, query, &settings, nullptr,
Tables()/*, query_processing_stage*/);
// Progress progress;
// stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
stream.readPrefix();
while (Block block = stream.read()) //{}
for (auto column : block.getColumns()) {
std::cout << column.name << std::endl;
}
stream.readSuffix();
// const BlockStreamProfileInfo & info = stream.getProfileInfo();
// double seconds = watch.elapsedSeconds();
// std::lock_guard<std::mutex> lock(mutex);
// info_per_interval.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
// info_total.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
}
void constructSubstitutions(AbstractConfig & substitutionsView,
@ -205,7 +316,7 @@ private:
for (auto value = values.begin(); value != values.end(); ++value) {
/// Copy query string for each unique permutation
std::string query = template_query;
Query query = template_query;
size_t substrPos = 0;
while (substrPos != std::string::npos) {