This commit is contained in:
Alexander Kuzmenkov 2020-06-20 02:03:13 +03:00
parent fbecf42dfc
commit 96368b7d0c

View File

@ -15,6 +15,7 @@
#include <AggregateFunctions/ReservoirSampler.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <boost/program_options.hpp>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/Exception.h>
#include <Common/randomSeed.h>
#include <Common/clearPasswordFromCommandLine.h>
@ -52,71 +53,6 @@ namespace ErrorCodes
extern const int EMPTY_DATA_PASSED;
}
template <typename Event>
class EventQueue
{
std::mutex mutex;
std::condition_variable condvar;
std::deque<Event> queue;
bool is_closed = false;
const size_t max_queue_size;
public:
EventQueue(size_t max_queue_size_) : max_queue_size(max_queue_size_) {}
template <typename... Args>
bool push(Args && ... args)
{
std::unique_lock lock(mutex);
condvar.wait(lock,
[this]() { return queue.size() < max_queue_size || is_closed; });
if (is_closed)
{
return false;
}
queue.push_back(std::forward<Args>(args)...);
condvar.notify_all();
return true;
}
bool pop(Event & event)
{
std::unique_lock lock(mutex);
condvar.wait(lock, [this]() { return queue.size() > 0 || is_closed; });
if (queue.size() > 0)
{
event = queue.front();
queue.pop_front();
condvar.notify_all();
return true;
}
assert(is_closed);
return false;
}
void close()
{
std::unique_lock lock(mutex);
is_closed = true;
condvar.notify_all();
}
void clearAndClose()
{
std::unique_lock lock(mutex);
is_closed = true;
queue.clear();
condvar.notify_all();
}
};
class Benchmark : public Poco::Util::Application
{
public:
@ -128,7 +64,7 @@ public:
const String & query_id_, bool continue_on_errors_,
bool print_stacktrace_, const Settings & settings_)
:
concurrency(concurrency_), delay(delay_), queue(concurrency_ * 2), randomize(randomize_),
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), confidence(confidence_), query_id(query_id_),
continue_on_errors(continue_on_errors_),
@ -204,7 +140,8 @@ private:
using Queries = std::vector<Query>;
Queries queries;
EventQueue<Query> queue;
using Queue = ConcurrentBoundedQueue<Query>;
Queue queue;
using ConnectionPoolUniq = std::unique_ptr<ConnectionPool>;
using ConnectionPoolUniqs = std::vector<ConnectionPoolUniq>;
@ -224,6 +161,9 @@ private:
Context global_context;
QueryProcessingStage::Enum query_processing_stage;
/// Don't execute new queries after timelimit or SIGINT or exception
std::atomic<bool> shutdown{false};
std::atomic<size_t> queries_executed{0};
struct Stats
@ -306,29 +246,36 @@ private:
/// Try push new query and check cancellation conditions
bool tryPushQueryInteractively(const String & query, InterruptListener & interrupt_listener)
{
if (!queue.push(query))
{
/// An exception occurred in a worker
return false;
}
bool inserted = false;
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
while (!inserted)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
}
inserted = queue.tryPush(query, 100);
if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
return false;
}
if (shutdown)
{
/// An exception occurred in a worker
return false;
}
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
{
printNumberOfQueriesExecuted(queries_executed);
cumulative ? report(comparison_info_total) : report(comparison_info_per_interval);
delay_watch.restart();
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
}
if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
return false;
}
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
{
printNumberOfQueriesExecuted(queries_executed);
cumulative ? report(comparison_info_total) : report(comparison_info_per_interval);
delay_watch.restart();
}
}
return true;
@ -369,13 +316,10 @@ private:
if (!tryPushQueryInteractively(queries[query_index], interrupt_listener))
{
// A stop condition ocurred, so clear all the queries that are
// in queue.
queue.clearAndClose();
shutdown = true;
break;
}
}
queue.close();
pool.wait();
total_watch.stop();
@ -407,9 +351,17 @@ private:
while (true)
{
if (!queue.pop(query))
bool extracted = false;
while (!extracted)
{
return;
extracted = queue.tryPop(query, 100);
if (shutdown
|| (max_iterations && queries_executed == max_iterations))
{
return;
}
}
const auto connection_index = distribution(generator);
@ -423,7 +375,7 @@ private:
<< query << "'.\n";
if (!continue_on_errors)
{
queue.clearAndClose();
shutdown = true;
throw;
}
else
@ -586,8 +538,7 @@ public:
~Benchmark() override
{
queue.clearAndClose();
pool.wait();
shutdown = true;
}
};