report the number of errors

This commit is contained in:
Alexander Kuzmenkov 2020-06-20 01:41:15 +03:00
parent cec65ce30d
commit fbecf42dfc
4 changed files with 161 additions and 67 deletions

View File

@ -323,15 +323,16 @@ if args.report == 'main':
print_test_times()
def print_benchmark_results():
left_json = json.load(open('benchmark/website-left.json'));
right_json = json.load(open('benchmark/website-right.json'));
left_qps = next(iter(left_json.values()))["statistics"]["QPS"]
right_qps = next(iter(right_json.values()))["statistics"]["QPS"]
relative_diff = (right_qps - left_qps) / left_qps;
times_diff = max(right_qps, left_qps) / max(0.01, min(right_qps, left_qps))
json_reports = [json.load(open(f'benchmark/website-{x}.json')) for x in ['left', 'right']]
stats = [next(iter(x.values()))["statistics"] for x in json_reports]
qps = [x["QPS"] for x in stats]
errors = [x["num_errors"] for x in stats]
relative_diff = (qps[1] - qps[0]) / max(0.01, qps[0]);
times_diff = max(qps) / max(0.01, min(qps))
print(tableStart('Concurrent benchmarks'))
print(tableHeader(['Benchmark', 'Old, queries/s', 'New, queries/s', 'Relative difference', 'Times difference']))
row = ['website', f'{left_qps:.3f}', f'{right_qps:.3f}', f'{relative_diff:.3f}', f'x{times_diff:.3f}']
all_rows = []
row = ['website', f'{qps[0]:.3f}', f'{qps[1]:.3f}', f'{relative_diff:.3f}', f'x{times_diff:.3f}']
attrs = ['' for r in row]
if abs(relative_diff) > 0.1:
# More queries per second is better.
@ -341,7 +342,23 @@ if args.report == 'main':
attrs[3] = f'style="background: {color_bad}"'
else:
attrs[3] = ''
all_rows.append((rows, attrs));
print(tableRow(row, attrs))
if max(errors):
attrs = ['' for r in row]
row[1] = f'{errors[0]:.3f}'
row[2] = f'{errors[1]:.3f}'
if errors[0]:
attrs[1] += f' style="background: {color_bad}" '
if errors[1]:
attrs[2] += f' style="background: {color_bad}" '
all_rows[0][1] += " colspan=2 "
for row, attrs in all_rows:
print(tableRow(row, attrs))
print(tableEnd())
try:

View File

@ -15,7 +15,6 @@
#include <AggregateFunctions/ReservoirSampler.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <boost/program_options.hpp>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/Exception.h>
#include <Common/randomSeed.h>
#include <Common/clearPasswordFromCommandLine.h>
@ -53,6 +52,71 @@ namespace ErrorCodes
extern const int EMPTY_DATA_PASSED;
}
template <typename Event>
class EventQueue
{
std::mutex mutex;
std::condition_variable condvar;
std::deque<Event> queue;
bool is_closed = false;
const size_t max_queue_size;
public:
EventQueue(size_t max_queue_size_) : max_queue_size(max_queue_size_) {}
template <typename... Args>
bool push(Args && ... args)
{
std::unique_lock lock(mutex);
condvar.wait(lock,
[this]() { return queue.size() < max_queue_size || is_closed; });
if (is_closed)
{
return false;
}
queue.push_back(std::forward<Args>(args)...);
condvar.notify_all();
return true;
}
bool pop(Event & event)
{
std::unique_lock lock(mutex);
condvar.wait(lock, [this]() { return queue.size() > 0 || is_closed; });
if (queue.size() > 0)
{
event = queue.front();
queue.pop_front();
condvar.notify_all();
return true;
}
assert(is_closed);
return false;
}
void close()
{
std::unique_lock lock(mutex);
is_closed = true;
condvar.notify_all();
}
void clearAndClose()
{
std::unique_lock lock(mutex);
is_closed = true;
queue.clear();
condvar.notify_all();
}
};
class Benchmark : public Poco::Util::Application
{
public:
@ -64,7 +128,7 @@ public:
const String & query_id_, bool continue_on_errors_,
bool print_stacktrace_, const Settings & settings_)
:
concurrency(concurrency_), delay(delay_), queue(concurrency), randomize(randomize_),
concurrency(concurrency_), delay(delay_), queue(concurrency_ * 2), randomize(randomize_),
cumulative(cumulative_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), confidence(confidence_), query_id(query_id_),
continue_on_errors(continue_on_errors_),
@ -140,8 +204,7 @@ private:
using Queries = std::vector<Query>;
Queries queries;
using Queue = ConcurrentBoundedQueue<Query>;
Queue queue;
EventQueue<Query> queue;
using ConnectionPoolUniq = std::unique_ptr<ConnectionPool>;
using ConnectionPoolUniqs = std::vector<ConnectionPoolUniq>;
@ -161,14 +224,12 @@ private:
Context global_context;
QueryProcessingStage::Enum query_processing_stage;
/// Don't execute new queries after timelimit or SIGINT or exception
std::atomic<bool> shutdown{false};
std::atomic<size_t> queries_executed{0};
struct Stats
{
std::atomic<size_t> queries{0};
size_t errors = 0;
size_t read_rows = 0;
size_t read_bytes = 0;
size_t result_rows = 0;
@ -245,36 +306,29 @@ private:
/// Try push new query and check cancellation conditions
bool tryPushQueryInteractively(const String & query, InterruptListener & interrupt_listener)
{
bool inserted = false;
while (!inserted)
if (!queue.push(query))
{
inserted = queue.tryPush(query, 100);
/// An exception occurred in a worker
return false;
}
if (shutdown)
{
/// An exception occurred in a worker
return false;
}
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
}
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
}
if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
return false;
}
if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT received.\n";
return false;
}
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
{
printNumberOfQueriesExecuted(queries_executed);
cumulative ? report(comparison_info_total) : report(comparison_info_per_interval);
delay_watch.restart();
}
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
{
printNumberOfQueriesExecuted(queries_executed);
cumulative ? report(comparison_info_total) : report(comparison_info_per_interval);
delay_watch.restart();
}
return true;
@ -315,10 +369,13 @@ private:
if (!tryPushQueryInteractively(queries[query_index], interrupt_listener))
{
shutdown = true;
// A stop condition ocurred, so clear all the queries that are
// in queue.
queue.clearAndClose();
break;
}
}
queue.close();
pool.wait();
total_watch.stop();
@ -350,36 +407,32 @@ private:
while (true)
{
bool extracted = false;
while (!extracted)
if (!queue.pop(query))
{
extracted = queue.tryPop(query, 100);
if (shutdown
|| (max_iterations && queries_executed == max_iterations))
{
return;
}
return;
}
const auto connection_index = distribution(generator);
try
{
execute(connection_entries, query, distribution(generator));
execute(connection_entries, query, connection_index);
}
catch (...)
{
std::cerr << "An error occurred while processing query:\n"
<< query << "\n";
std::cerr << "An error occurred while processing the query '\n"
<< query << "'.\n";
if (!continue_on_errors)
{
shutdown = true;
queue.clearAndClose();
throw;
}
else
{
std::cerr << getCurrentExceptionMessage(print_stacktrace,
true /*check embedded stack trace*/) ;
true /*check embedded stack trace*/) << std::endl;
comparison_info_per_interval[connection_index]->errors++;
comparison_info_total[connection_index]->errors++;
}
}
// Count failed queries toward executed, so that we'd reach
@ -433,7 +486,12 @@ private:
std::cerr
<< connections[i]->getDescription() << ", "
<< "queries " << info->queries << ", "
<< "queries " << info->queries << ", ";
if (info->errors)
{
std::cerr << "errors " << info->errors << ", ";
}
std::cerr
<< "QPS: " << (info->queries / seconds) << ", "
<< "RPS: " << (info->read_rows / seconds) << ", "
<< "MiB/s: " << (info->read_bytes / seconds / 1048576) << ", "
@ -500,18 +558,22 @@ private:
print_key_value("MiBPS", info->read_bytes / info->work_time);
print_key_value("RPS_result", info->result_rows / info->work_time);
print_key_value("MiBPS_result", info->result_bytes / info->work_time);
print_key_value("num_queries", info->queries.load(), false);
print_key_value("num_queries", info->queries.load());
print_key_value("num_errors", info->errors, false);
json_out << "},\n";
json_out << double_quote << "query_time_percentiles" << ": {\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(*info, percent);
if (info->queries != 0)
{
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(*info, percent);
print_percentile(*info, 95);
print_percentile(*info, 99);
print_percentile(*info, 99.9);
print_percentile(*info, 99.99, false);
print_percentile(*info, 95);
print_percentile(*info, 99);
print_percentile(*info, 99.9);
print_percentile(*info, 99.99, false);
}
json_out << "}\n";
json_out << (i == infos.size() - 1 ? "}\n" : "},\n");
@ -524,7 +586,8 @@ public:
~Benchmark() override
{
shutdown = true;
queue.clearAndClose();
pool.wait();
}
};

View File

@ -13,6 +13,13 @@
#include <Poco/Exception.h>
#include <pcg_random.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/// Implementing the Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the sample_count size.
/// Can approximately get quantiles.
@ -236,7 +243,7 @@ private:
ResultType onEmpty() const
{
if (OnEmpty == ReservoirSamplerOnEmpty::THROW)
throw Poco::Exception("Quantile of empty ReservoirSampler");
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Quantile of empty ReservoirSampler");
else
return NanLikeValueConstructor<ResultType, std::is_floating_point_v<ResultType>>::getValue();
}

View File

@ -14,6 +14,13 @@
#include <Common/NaNUtils.h>
#include <Poco/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/// Implementation of Reservoir Sampling algorithm. Incrementally selects from the added objects a random subset of the `sample_count` size.
/// Can approximately get quantiles.
@ -223,7 +230,7 @@ private:
ResultType onEmpty() const
{
if (OnEmpty == ReservoirSamplerDeterministicOnEmpty::THROW)
throw Poco::Exception("Quantile of empty ReservoirSamplerDeterministic");
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Quantile of empty ReservoirSamplerDeterministic");
else
return NanLikeValueConstructor<ResultType, std::is_floating_point_v<ResultType>>::getValue();
}