mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 00:22:29 +00:00
Add support for time criterions
This commit is contained in:
parent
c4c6960c75
commit
3568946082
@ -1,9 +1,11 @@
|
||||
#include <iostream>
|
||||
#include <limits>
|
||||
#include <unistd.h>
|
||||
|
||||
#include <sys/stat.h>
|
||||
#include <boost/program_options.hpp>
|
||||
|
||||
#include <DB/AggregateFunctions/ReservoirSampler.h>
|
||||
#include <DB/Client/ConnectionPool.h>
|
||||
#include <DB/Common/ConcurrentBoundedQueue.h>
|
||||
#include <DB/Common/Stopwatch.h>
|
||||
@ -21,6 +23,7 @@
|
||||
#include <Poco/Util/XMLConfiguration.h>
|
||||
#include <Poco/Exception.h>
|
||||
|
||||
#include "InterruptListener.h"
|
||||
|
||||
/** Tests launcher for ClickHouse.
|
||||
* The tool walks through given or default folder in order to find files with
|
||||
@ -36,9 +39,10 @@ namespace ErrorCodes
|
||||
extern const int UNKNOWN_EXCEPTION;
|
||||
}
|
||||
|
||||
struct criterionWithPriority {
|
||||
std::string priority = "";
|
||||
size_t value = 0;
|
||||
struct CriterionWithPriority {
|
||||
std::string priority = "";
|
||||
size_t value = 0;
|
||||
bool fulfilled = false;
|
||||
};
|
||||
|
||||
/// Termination criterions. The running test will be terminated in either of two conditions:
|
||||
@ -103,13 +107,13 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
struct criterionWithPriority timeout_ms;
|
||||
struct criterionWithPriority read_rows;
|
||||
struct criterionWithPriority bytes_read_uncompressed;
|
||||
struct criterionWithPriority iterations;
|
||||
struct criterionWithPriority min_time_not_changing_for_ms;
|
||||
struct criterionWithPriority max_speed_not_changing_for_ms;
|
||||
struct criterionWithPriority average_speed_not_changing_for_ms;
|
||||
struct CriterionWithPriority timeout_ms;
|
||||
struct CriterionWithPriority read_rows;
|
||||
struct CriterionWithPriority bytes_read_uncompressed;
|
||||
struct CriterionWithPriority iterations;
|
||||
struct CriterionWithPriority min_time_not_changing_for_ms;
|
||||
struct CriterionWithPriority max_speed_not_changing_for_ms;
|
||||
struct CriterionWithPriority average_speed_not_changing_for_ms;
|
||||
|
||||
/// Hereafter 'min' and 'max', in context of critetions, mean a level of importance
|
||||
/// Number of initialized properties met in configuration
|
||||
@ -123,39 +127,46 @@ public:
|
||||
struct Stats
|
||||
{
|
||||
Stopwatch watch;
|
||||
Stopwatch watch_per_query;
|
||||
Stopwatch min_time_watch;
|
||||
Stopwatch max_speed_watch;
|
||||
Stopwatch average_speed_watch;
|
||||
// size_t queries; TODO: Do I need this?
|
||||
size_t queries;
|
||||
size_t read_rows;
|
||||
size_t read_bytes;
|
||||
|
||||
using Sampler = ReservoirSampler<double>;
|
||||
Sampler sampler {1 << 16};
|
||||
|
||||
/// min_time in ms
|
||||
UInt64 min_time = std::numeric_limits<UInt64>::max();
|
||||
size_t max_speed = 0;
|
||||
size_t average_speed = 0;
|
||||
UInt64 min_time = std::numeric_limits<UInt64>::max();
|
||||
double total_time = 0;
|
||||
double max_speed = 0;
|
||||
double average_speed_value = 0;
|
||||
double average_speed_first = 0;
|
||||
double average_speed_precision = 0.001;
|
||||
size_t number_of_speed_info_batches = 0;
|
||||
|
||||
void update_min_time(const UInt64 min_time_candidate)
|
||||
{
|
||||
// TODO:
|
||||
std::cout << "current min_time: " << min_time << std::endl;
|
||||
std::cout << "min_time candidate: " << min_time_candidate << std::endl;
|
||||
std::cout << std::endl;
|
||||
|
||||
if (min_time_candidate < min_time) {
|
||||
min_time = min_time_candidate;
|
||||
min_time_watch.restart();
|
||||
}
|
||||
}
|
||||
|
||||
void update_average_speed(const size_t new_speed_info)
|
||||
void update_average_speed(const double new_speed_info)
|
||||
{
|
||||
size_t new_average_speed = ((average_speed * number_of_speed_info_batches)
|
||||
+ new_speed_info);
|
||||
new_average_speed /= (++number_of_speed_info_batches);
|
||||
if (new_average_speed != average_speed) {
|
||||
average_speed = new_average_speed;
|
||||
average_speed_value = ((average_speed_value * number_of_speed_info_batches)
|
||||
+ new_speed_info);
|
||||
average_speed_value /= (++number_of_speed_info_batches);
|
||||
|
||||
if (average_speed_first == 0) {
|
||||
average_speed_first = average_speed_value;
|
||||
}
|
||||
|
||||
if (abs(average_speed_value - average_speed_first) >= average_speed_precision) {
|
||||
average_speed_first = average_speed_value;
|
||||
average_speed_watch.restart();
|
||||
}
|
||||
}
|
||||
@ -173,17 +184,44 @@ struct Stats
|
||||
read_rows += read_rows_inc;
|
||||
read_bytes += read_bytes_inc;
|
||||
|
||||
size_t new_speed = read_rows_inc / watch.elapsedSeconds();
|
||||
double new_speed = read_rows_inc / watch_per_query.elapsedSeconds();
|
||||
update_max_speed(new_speed);
|
||||
update_average_speed(new_speed);
|
||||
}
|
||||
|
||||
void updateQueryInfo()
|
||||
{
|
||||
++queries;
|
||||
sampler.insert(watch_per_query.elapsedSeconds());
|
||||
update_min_time(watch_per_query.elapsed() / (1000 * 1000)); /// ns to ms
|
||||
}
|
||||
|
||||
void setTotalTime()
|
||||
{
|
||||
total_time = watch.elapsedSeconds();
|
||||
}
|
||||
|
||||
void clear()
|
||||
{
|
||||
watch.restart();
|
||||
watch_per_query.restart();
|
||||
min_time_watch.restart();
|
||||
max_speed_watch.restart();
|
||||
average_speed_watch.restart();
|
||||
|
||||
sampler.clear();
|
||||
|
||||
queries = 0;
|
||||
read_rows = 0;
|
||||
read_bytes = 0;
|
||||
|
||||
min_time = std::numeric_limits<UInt64>::max();
|
||||
total_time = 0;
|
||||
max_speed = 0;
|
||||
average_speed_value = 0;
|
||||
average_speed_first = 0;
|
||||
average_speed_precision = 0.001;
|
||||
number_of_speed_info_batches = 0;
|
||||
}
|
||||
};
|
||||
|
||||
@ -220,6 +258,7 @@ public:
|
||||
|
||||
private:
|
||||
unsigned concurrency;
|
||||
std::string testName;
|
||||
|
||||
using Query = std::string;
|
||||
using Queries = std::vector<std::string>;
|
||||
@ -232,6 +271,12 @@ private:
|
||||
ThreadPool pool;
|
||||
Settings settings;
|
||||
|
||||
InterruptListener interrupt_listener;
|
||||
bool gotSIGINT = false;
|
||||
std::vector<RemoteBlockInputStream*> streams;
|
||||
|
||||
double average_speed_precision = 0.001;
|
||||
|
||||
using XMLConfiguration = Poco::Util::XMLConfiguration;
|
||||
using AbstractConfig = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
|
||||
using Config = Poco::AutoPtr<XMLConfiguration>;
|
||||
@ -242,9 +287,12 @@ private:
|
||||
struct StopCriterions stopCriterions;
|
||||
|
||||
#define incFulfilledCriterions(CRITERION) \
|
||||
stopCriterions.CRITERION.priority == "min" \
|
||||
? ++stopCriterions.fulfilled_criterions_min \
|
||||
: ++stopCriterions.fulfilled_criterions_max;
|
||||
if (! stopCriterions.CRITERION.fulfilled) {\
|
||||
stopCriterions.CRITERION.priority == "min" \
|
||||
? ++stopCriterions.fulfilled_criterions_min \
|
||||
: ++stopCriterions.fulfilled_criterions_max; \
|
||||
stopCriterions.CRITERION.fulfilled = true; \
|
||||
}
|
||||
|
||||
enum ExecutionType { loop, once };
|
||||
ExecutionType execType;
|
||||
@ -275,7 +323,7 @@ private:
|
||||
|
||||
void runTest(Config & testConfig)
|
||||
{
|
||||
std::string testName = testConfig->getString("name");
|
||||
testName = testConfig->getString("name");
|
||||
std::cout << "Running: " << testName << "\n";
|
||||
|
||||
/// Preprocess configuration file
|
||||
@ -303,6 +351,11 @@ private:
|
||||
configSettings.end()) {
|
||||
// TODO: proceed profile settings in a proper way
|
||||
}
|
||||
|
||||
if (std::find(configSettings.begin(), configSettings.end(),
|
||||
"average_speed_precision") != configSettings.end()) {
|
||||
info_total.average_speed_precision = testConfig->getDouble("settings.average_speed_precision");
|
||||
}
|
||||
}
|
||||
|
||||
Query query;
|
||||
@ -364,17 +417,18 @@ private:
|
||||
runQueries(queries);
|
||||
}
|
||||
|
||||
info_total.setTotalTime();
|
||||
constructTotalInfo();
|
||||
}
|
||||
|
||||
void runLoopQuery(const Query & query)
|
||||
{
|
||||
info_total.watch.restart();
|
||||
info_total.min_time_watch.restart();
|
||||
info_total.clear();
|
||||
|
||||
size_t max_iterations = stopCriterions.iterations.value;
|
||||
size_t i = -1;
|
||||
|
||||
while (true) {
|
||||
while (! gotSIGINT) {
|
||||
++i;
|
||||
|
||||
pool.schedule(std::bind(
|
||||
@ -415,8 +469,7 @@ private:
|
||||
|
||||
void runQueries(const Queries & queries)
|
||||
{
|
||||
info_total.watch.restart();
|
||||
info_total.min_time_watch.restart();
|
||||
info_total.clear();
|
||||
|
||||
for (size_t i = 0; i < concurrency; ++i) {
|
||||
pool.schedule(std::bind(
|
||||
@ -440,8 +493,6 @@ private:
|
||||
|
||||
void thread(ConnectionPool::Entry & connection)
|
||||
{
|
||||
Stats info_per_query;
|
||||
|
||||
Query query;
|
||||
|
||||
while (true) {
|
||||
@ -451,65 +502,61 @@ private:
|
||||
if (query.empty())
|
||||
break;
|
||||
|
||||
execute(connection, query, info_per_query);
|
||||
execute(connection, query);
|
||||
}
|
||||
}
|
||||
|
||||
void execute(ConnectionPool::Entry & connection, const Query & query,
|
||||
Stats & info_per_query)
|
||||
void execute(ConnectionPool::Entry & connection, const Query & query)
|
||||
{
|
||||
// TODO:
|
||||
std::cout << "execute? " << query << std::endl;
|
||||
InterruptListener thread_interrupt_listener;
|
||||
info_total.watch_per_query.restart();
|
||||
|
||||
RemoteBlockInputStream stream(connection, query, &settings, nullptr,
|
||||
Tables()/*, query_processing_stage*/);
|
||||
RemoteBlockInputStream * stream = new RemoteBlockInputStream(
|
||||
connection, query, &settings, nullptr, Tables()/*, query_processing_stage*/
|
||||
);
|
||||
|
||||
size_t stream_index;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
streams.push_back(stream);
|
||||
stream_index = streams.size() - 1;
|
||||
}
|
||||
|
||||
Progress progress;
|
||||
stream.setProgressCallback(
|
||||
[&progress, &stream, &info_per_query, this]
|
||||
stream->setProgressCallback(
|
||||
[&progress, &stream, &thread_interrupt_listener, this]
|
||||
(const Progress & value) {
|
||||
// TODO:
|
||||
std::cout << "got some progress" << std::endl;
|
||||
|
||||
progress.incrementPiecewiseAtomically(value);
|
||||
|
||||
this->checkFulfilledCriterionsAndUpdate(progress, stream, info_per_query);
|
||||
this->checkFulfilledCriterionsAndUpdate(progress, stream, thread_interrupt_listener);
|
||||
});
|
||||
|
||||
info_per_query.watch.restart();
|
||||
info_per_query.average_speed_watch.restart();
|
||||
info_per_query.max_speed_watch.restart();
|
||||
|
||||
stream.readPrefix();
|
||||
while (Block block = stream.read())
|
||||
stream->readPrefix();
|
||||
while (Block block = stream->read())
|
||||
;
|
||||
stream.readSuffix();
|
||||
stream->readSuffix();
|
||||
|
||||
// cast nanoseconds to ms
|
||||
UInt64 queryExecutionTime = info_per_query.min_time_watch.elapsed()
|
||||
/ (1000 * 1000);
|
||||
info_total.update_min_time(queryExecutionTime);
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
// const BlockStreamProfileInfo & info = stream.getProfileInfo();
|
||||
streams.erase(streams.begin() + stream_index);
|
||||
delete stream;
|
||||
|
||||
info_total.updateQueryInfo();
|
||||
|
||||
// const BlockStreamProfileInfo & info = stream->getProfileInfo();
|
||||
// double seconds = watch.elapsedSeconds();
|
||||
|
||||
// std::lock_guard<std::mutex> lock(mutex);
|
||||
// info_per_interval.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
|
||||
// info_total.add(seconds, progress.rows, progress.bytes, info.rows, info.bytes);
|
||||
}
|
||||
|
||||
void checkFulfilledCriterionsAndUpdate(const Progress & progress,
|
||||
RemoteBlockInputStream & stream,
|
||||
Stats & info_per_query)
|
||||
RemoteBlockInputStream * stream,
|
||||
InterruptListener & thread_interrupt_listener)
|
||||
{
|
||||
// TODO:
|
||||
std::cout << "im checking" << std::endl;
|
||||
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
info_total.add(progress.rows, progress.bytes);
|
||||
info_per_query.add(progress.rows, progress.bytes);
|
||||
|
||||
size_t max_rows_to_read = stopCriterions.read_rows.value;
|
||||
if (max_rows_to_read && info_total.read_rows >= max_rows_to_read) {
|
||||
@ -531,19 +578,11 @@ private:
|
||||
size_t min_time_not_changing_for_ms = stopCriterions
|
||||
.min_time_not_changing_for_ms.value;
|
||||
if (min_time_not_changing_for_ms) {
|
||||
// TODO:
|
||||
std::cout << "min time is in attention" << std::endl;
|
||||
|
||||
size_t min_time_did_not_change_for = info_total
|
||||
.min_time_watch
|
||||
.elapsed() / (1000 * 1000);
|
||||
// TODO:
|
||||
std::cout << "min_time_did_not_change_for: " << min_time_did_not_change_for << std::endl;
|
||||
std::cout << "min_time_not_changing_for_ms: " << min_time_not_changing_for_ms << std::endl;
|
||||
|
||||
if (min_time_did_not_change_for >= min_time_not_changing_for_ms) {
|
||||
// TODO:
|
||||
std::cout << "min time yeahh" << std::endl;
|
||||
incFulfilledCriterions(min_time_not_changing_for_ms);
|
||||
}
|
||||
}
|
||||
@ -552,7 +591,7 @@ private:
|
||||
.max_speed_not_changing_for_ms
|
||||
.value;
|
||||
if (max_speed_not_changing_for_ms) {
|
||||
UInt64 speed_not_changing_time = info_per_query
|
||||
UInt64 speed_not_changing_time = info_total
|
||||
.max_speed_watch
|
||||
.elapsed() / (1000 * 1000);
|
||||
if (speed_not_changing_time >= max_speed_not_changing_for_ms) {
|
||||
@ -564,7 +603,7 @@ private:
|
||||
.average_speed_not_changing_for_ms
|
||||
.value;
|
||||
if (average_speed_not_changing_for_ms) {
|
||||
UInt64 speed_not_changing_time = info_per_query
|
||||
UInt64 speed_not_changing_time = info_total
|
||||
.average_speed_watch
|
||||
.elapsed() / (1000 * 1000);
|
||||
if (speed_not_changing_time >= average_speed_not_changing_for_ms) {
|
||||
@ -578,7 +617,7 @@ private:
|
||||
/// All 'min' criterions are fulfilled
|
||||
// TODO:
|
||||
std::cout << "All 'min' criterions are fulfilled" << std::endl;
|
||||
stream.cancel();
|
||||
stream->cancel();
|
||||
}
|
||||
|
||||
if (stopCriterions.number_of_initialized_max &&
|
||||
@ -588,7 +627,17 @@ private:
|
||||
// TODO:
|
||||
std::cout << stopCriterions.fulfilled_criterions_max
|
||||
<< "'max' criterions are fulfilled" << std::endl;
|
||||
stream.cancel();
|
||||
stream->cancel();
|
||||
}
|
||||
|
||||
if (thread_interrupt_listener.check()) { /// SIGINT
|
||||
gotSIGINT = true;
|
||||
|
||||
for (RemoteBlockInputStream * stream : streams) {
|
||||
stream->cancel();
|
||||
}
|
||||
|
||||
std::cout << "got SIGNINT; stopping streams" << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
@ -678,6 +727,41 @@ private:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
void constructTotalInfo()
|
||||
{
|
||||
std::string hostname = "null";
|
||||
|
||||
char hostname_buffer[256];
|
||||
if (gethostname(hostname_buffer, 256) == 0) {
|
||||
hostname = std::string(hostname_buffer);
|
||||
}
|
||||
|
||||
std::cout << "total info: " << std::endl;
|
||||
std::cout << "hostname: " << hostname << std::endl;
|
||||
std::cout << "Number of CPUs: " << sysconf(_SC_NPROCESSORS_ONLN) << std::endl;
|
||||
std::cout << "test_name: " << testName << std::endl;
|
||||
std::cout << "??main_metric: total_time??" << std::endl;
|
||||
std::cout << "parameters: {some substitutions here...}" << std::endl;
|
||||
|
||||
if (execType == loop) {
|
||||
std::cout << "min_time: " << info_total.min_time / 1000
|
||||
<< "." << info_total.min_time % 1000 << "s" << std::endl;
|
||||
|
||||
// TODO: <quantile>90</quantile>
|
||||
|
||||
std::cout << "total_time: " << info_total.total_time << "s" << std::endl;
|
||||
std::cout << "queries_per_second: " << double(info_total.queries) / info_total.total_time << std::endl;
|
||||
std::cout << "rows_per_second: " << double(info_total.read_rows) / info_total.total_time << std::endl;
|
||||
std::cout << "bytes_per_second: " << double(info_total.read_bytes) / info_total.total_time << std::endl;
|
||||
} else {
|
||||
std::cout << " max_rows_per_second: " << info_total.max_speed << std::endl;
|
||||
// std::cout << " max_bytes_per_second: " << << std::endl;
|
||||
std::cout << " avg_rows_per_second: " << info_total.average_speed_value << std::endl;
|
||||
// std::cout << " avg_bytes_per_second: " << << std::endl;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user