This commit is contained in:
alesapin 2021-04-12 18:40:42 +03:00
parent fea0ec5046
commit 05959d482b
7 changed files with 205 additions and 62 deletions

View File

@ -32,6 +32,7 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
add_subdirectory (db-generator)
add_subdirectory (wal-dump)
add_subdirectory (check-mysql-binlog)
add_subdirectory (keeper-bench)
if (USE_NURAFT)
add_subdirectory (keeper-data-dumper)

View File

@ -2,6 +2,8 @@
#include <random>
#include <filesystem>
using namespace Coordination;
using namespace zkutil;
namespace
{

View File

@ -1,11 +1,10 @@
#pragma once
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <functional>
#include <optional>
using namespace Coordination;
using namespace zkutil;
std::string generateRandomPath(const std::string & prefix, size_t length = 5);
@ -14,9 +13,9 @@ std::string generateRandomData(size_t size);
class IGenerator
{
public:
virtual void startup(ZooKeeper & /*zookeeper*/) {}
virtual ZooKeeperRequestPtr generate() = 0;
virtual void teardown(ZooKeeper & /*zookeeper*/) {}
virtual void startup(Coordination::ZooKeeper & /*zookeeper*/) {}
virtual Coordination::ZooKeeperRequestPtr generate() = 0;
virtual void teardown(Coordination::ZooKeeper & /*zookeeper*/) {}
virtual ~IGenerator() = default;
};
@ -32,7 +31,7 @@ public:
, data_size(data_size_)
{}
ZooKeeperRequestPtr generate() override;
Coordination::ZooKeeperRequestPtr generate() override;
private:
std::string path_prefix;

View File

@ -7,8 +7,13 @@
#include <Common/ThreadPool.h>
#include <pcg-random/pcg_random.hpp>
#include <Common/randomSeed.h>
#include <Common/InterruptListener.h>
#include <Core/Types.h>
#include "Stats.h"
using Ports = std::vector<UInt16>;
using Strings = std::vector<std::string>;
namespace DB
{
@ -22,22 +27,30 @@ namespace ErrorCodes
class Runner
{
public:
Runner(size_t concurrency_, const std::string & generator_name,
const Strings & hosts_strings_,
bool continue_on_error_)
Runner(
size_t concurrency_,
const std::string & generator_name,
const Strings & hosts_strings_,
double max_time_,
double delay_,
bool continue_on_error_,
size_t max_iterations_)
: concurrency(concurrency_)
, pool(concurrency)
, hosts_strings(hosts_strings_)
, generator(generator_name == "create_no_data" ? nullptr : std::make_unique<CreateRequestGenerator>("/"))
, generator(generator_name == "create_no_data" ? std::make_unique<CreateRequestGenerator>("/") : nullptr)
, max_time(max_time_)
, delay(delay_)
, continue_on_error(continue_on_error_)
, max_iterations(max_iterations_)
, info(std::make_shared<Stats>())
, queue(concurrency)
{
}
void thread(std::vector<Coordination::ZooKeeper> & zookeepers)
void thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> & zookeepers)
{
ZooKeeperRequestPtr request;
Coordination::ZooKeeperRequestPtr request;
/// Randomly choosing connection index
pcg64 rng(randomSeed());
std::uniform_int_distribution<size_t> distribution(0, zookeepers.size() - 1);
@ -48,7 +61,7 @@ public:
|| sigaddset(&sig_set, SIGINT)
|| pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
{
throwFromErrno("Cannot block signal.", ErrorCodes::CANNOT_BLOCK_SIGNAL);
DB::throwFromErrno("Cannot block signal.", DB::ErrorCodes::CANNOT_BLOCK_SIGNAL);
}
while (true)
@ -60,33 +73,37 @@ public:
extracted = queue.tryPop(request, 100);
if (shutdown
|| (max_iterations && requests_executed == max_iterations))
|| (max_iterations && requests_executed >= max_iterations))
{
return;
}
}
const auto connection_index = distribution(generator);
const auto connection_index = distribution(rng);
auto & zk = zookeepers[connection_index];
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
ResponseCallback callback = [promise](const Response & response)
Coordination::ResponseCallback callback = [promise](const Coordination::Response & response)
{
if (response.error != Coordination::Error::OK)
promise->set_exception(std::make_exception_ptr(KeeperException(response.error)));
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
Stopwatch watch;
zk.executeGenericRequest(request, callback);
zk->executeGenericRequest(request, callback);
try
{
future.get();
double seconds = watch.elapsedSeconds();
std::lock_guard lock(mutex);
info->add(seconds, 0, 0);
}
catch (...)
{
@ -95,14 +112,94 @@ public:
shutdown = true;
throw;
}
std::cerr << getCurrentExceptionMessage(true,
true /*check embedded stack trace*/) << std::endl;
std::cerr << DB::getCurrentExceptionMessage(true, true /*check embedded stack trace*/) << std::endl;
}
++requests_executed;
}
}
void printNumberOfRequestsExecuted(size_t num)
{
std::cerr << "Requests executed: " << num << ".\n";
}
bool tryPushRequestInteractively(const Coordination::ZooKeeperRequestPtr & request, DB::InterruptListener & interrupt_listener)
{
bool inserted = false;
while (!inserted)
{
inserted = queue.tryPush(request, 100);
if (shutdown)
{
/// An exception occurred in a worker
return false;
}
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
}
if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
return false;
}
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
{
printNumberOfRequestsExecuted(requests_executed);
std::lock_guard lock(mutex);
report(info, concurrency);
delay_watch.restart();
}
}
return true;
}
void runBenchmark()
{
try
{
for (size_t i = 0; i < concurrency; ++i)
{
auto connections = getConnections();
pool.scheduleOrThrowOnError([this, connections]() mutable { thread(connections); });
}
}
catch (...)
{
pool.wait();
throw;
}
DB::InterruptListener interrupt_listener;
delay_watch.restart();
/// Push queries into queue
for (size_t i = 0; !max_iterations || i < max_iterations; ++i)
{
if (!tryPushRequestInteractively(generator->generate(), interrupt_listener))
{
shutdown = true;
break;
}
}
pool.wait();
total_watch.stop();
printNumberOfRequestsExecuted(requests_executed);
std::lock_guard lock(mutex);
report(info, concurrency);
}
private:
@ -112,30 +209,39 @@ private:
ThreadPool pool;
Strings hosts_strings;
std::unique_ptr<IGenerator> generator;
double max_time;
double delay;
bool continue_on_error;
std::atomic<size_t> max_iterations;
std::atomic<size_t> requests_executed;
std::atomic<bool> shutdown;
using Queue = ConcurrentBoundedQueue<ZooKeeperRequestPtr>;
std::shared_ptr<Stats> info;
Stopwatch total_watch;
Stopwatch delay_watch;
std::mutex mutex;
using Queue = ConcurrentBoundedQueue<Coordination::ZooKeeperRequestPtr>;
Queue queue;
std::vector<Coordination::ZooKeeper> getConnections()
std::vector<std::shared_ptr<Coordination::ZooKeeper>> getConnections()
{
std::vector<Coordination::ZooKeeper> zookeepers;
std::vector<std::shared_ptr<Coordination::ZooKeeper>> zookeepers;
for (const auto & host_string : hosts_strings)
{
Coordination::ZooKeeper::Node node{Poco::Net::SocketAddress{host_string}, false};
std::vector<Coordination::ZooKeeper::Node> nodes;
nodes.push_back(node);
zookeepers.emplace_back(
zookeepers.emplace_back(std::make_shared<Coordination::ZooKeeper>(
nodes,
"/",
"",
"",
Poco::Timespan(0, 30000 * 1000),
Poco::Timespan(0, 1000 * 1000),
Poco::Timespan(0, 10000 * 1000));
Poco::Timespan(0, 10000 * 1000)));
}
return zookeepers;

View File

@ -1,41 +1,32 @@
#include "Stats.h"
#include <iostream>
void report(MultiStats & infos, size_t concurrency, bool cumulative)
void report(std::shared_ptr<Stats> & info, size_t concurrency)
{
std::cerr << "\n";
for (size_t i = 0; i < infos.size(); ++i)
/// Avoid zeros, nans or exceptions
if (0 == info->requests)
return;
double seconds = info->work_time / concurrency;
std::cerr << "requests " << info->requests << ", ";
if (info->errors)
{
const auto & info = infos[i];
/// Avoid zeros, nans or exceptions
if (0 == info->queries)
return;
double seconds = info->work_time / concurrency;
std::cerr
<< "connection " << i << ", "
<< "queries " << info->queries << ", ";
if (info->errors)
{
std::cerr << "errors " << info->errors << ", ";
}
std::cerr
<< "RPS: " << (info->requests / seconds) << ", "
<< "Read MiB/s: " << (info->requests_read_bytes / seconds / 1048576) << ", "
<< "Write MiB/s: " << (info->requests_write_bytes / seconds / 1048576) << ". "
<< "\n";
std::cerr << "errors " << info->errors << ", ";
}
std::cerr
<< "RPS: " << (info->requests / seconds) << ", "
<< "Read MiB/s: " << (info->requests_read_bytes / seconds / 1048576) << ", "
<< "Write MiB/s: " << (info->requests_write_bytes / seconds / 1048576) << ". "
<< "\n";
std::cerr << "\n";
auto print_percentile = [&](double percent)
{
std::cerr << percent << "%\t\t";
for (const auto & info : infos)
{
std::cerr << info->sampler.quantileNearest(percent / 100.0) << " sec.\t";
}
std::cerr << info->sampler.quantileNearest(percent / 100.0) << " sec.\t";
std::cerr << "\n";
};
@ -46,10 +37,4 @@ void report(MultiStats & infos, size_t concurrency, bool cumulative)
print_percentile(99);
print_percentile(99.9);
print_percentile(99.99);
if (!cumulative)
{
for (auto & info : infos)
info->clear();
}
}

View File

@ -35,6 +35,5 @@ struct Stats
}
};
using MultiStats = std::vector<std::shared_ptr<Stats>>;
void report(MultiStats & infos, size_t concurrency, bool cumulative);
void report(std::shared_ptr<Stats> & info, size_t concurrency);

View File

@ -1,10 +1,61 @@
#include <iostream>
#include <boost/program_options.hpp>
#include "Runner.h"
#include "Stats.h"
#include "Generator.h"
#include <Common/TerminalSize.h>
#include <Core/Types.h>
using namespace std;
int main(int argc, char *argv[])
{
bool print_stacktrace = true;
return 0;
try
{
using boost::program_options::value;
boost::program_options::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
desc.add_options()
("help", "produce help message")
("generator", value<std::string>()->default_value(""), "query to execute")
("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries")
("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("hosts,h", value<Strings>()->multitoken(), "")
("continue_on_errors", "continue testing even if a query fails")
("reconnect", "establish new connection for every query")
;
boost::program_options::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
boost::program_options::notify(options);
if (options.count("help"))
{
std::cout << "Usage: " << argv[0] << " [options] < queries.txt\n";
std::cout << desc << "\n";
return 1;
}
Runner runner(options["concurrency"].as<unsigned>(),
options["generator"].as<std::string>(),
options["hosts"].as<Strings>(),
options["timelimit"].as<double>(),
options["delay"].as<double>(),
options.count("continue_on_errors"),
options["iterations"].as<size_t>());
runner.runBenchmark();
return 0;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(print_stacktrace, true) << std::endl;
return DB::getCurrentExceptionCode();
}
}