Something working

This commit is contained in:
alesapin 2021-04-13 14:55:08 +03:00
parent 13d3e59b6a
commit b115b8af3d
11 changed files with 421 additions and 211 deletions

View File

@ -116,6 +116,7 @@ struct Request
virtual ~Request() = default;
virtual String getPath() const = 0;
virtual void addRootPath(const String & /* root_path */) {}
virtual size_t bytesSize() const { return 0; }
};
struct Response;
@ -131,6 +132,7 @@ struct Response
Response & operator=(const Response &) = default;
virtual ~Response() = default;
virtual void removeRootPath(const String & /* root_path */) {}
virtual size_t bytesSize() const { return 0; }
};
struct WatchResponse : virtual Response
@ -140,6 +142,8 @@ struct WatchResponse : virtual Response
String path;
void removeRootPath(const String & root_path) override;
size_t bytesSize() const override { return path.size() + sizeof(type) + sizeof(state); }
};
using WatchCallback = std::function<void(const WatchResponse &)>;
@ -154,6 +158,9 @@ struct CreateRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size() + data.size()
+ sizeof(is_ephemeral) + sizeof(is_sequential) + acls.size() * sizeof(ACL); }
};
struct CreateResponse : virtual Response
@ -161,6 +168,8 @@ struct CreateResponse : virtual Response
String path_created;
void removeRootPath(const String & root_path) override;
size_t bytesSize() const override { return path_created.size(); }
};
struct RemoveRequest : virtual Request
@ -170,6 +179,8 @@ struct RemoveRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size() + sizeof(version); }
};
struct RemoveResponse : virtual Response
@ -182,11 +193,15 @@ struct ExistsRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size(); }
};
struct ExistsResponse : virtual Response
{
Stat stat;
size_t bytesSize() const override { return sizeof(Stat); }
};
struct GetRequest : virtual Request
@ -195,12 +210,16 @@ struct GetRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size(); }
};
struct GetResponse : virtual Response
{
String data;
Stat stat;
size_t bytesSize() const override { return data.size() + sizeof(stat); }
};
struct SetRequest : virtual Request
@ -211,11 +230,15 @@ struct SetRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return data.size() + data.size() + sizeof(version); }
};
struct SetResponse : virtual Response
{
Stat stat;
size_t bytesSize() const override { return sizeof(stat); }
};
struct ListRequest : virtual Request
@ -224,12 +247,22 @@ struct ListRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size(); }
};
struct ListResponse : virtual Response
{
std::vector<String> names;
Stat stat;
size_t bytesSize() const override
{
size_t size = sizeof(stat);
for (const auto & name : names)
size += name.size();
return size;
}
};
struct CheckRequest : virtual Request
@ -239,6 +272,8 @@ struct CheckRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size() + sizeof(version); }
};
struct CheckResponse : virtual Response
@ -251,6 +286,14 @@ struct MultiRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return {}; }
size_t bytesSize() const override
{
size_t size = 0;
for (const auto & request : requests)
size += request->bytesSize();
return size;
}
};
struct MultiResponse : virtual Response
@ -258,6 +301,14 @@ struct MultiResponse : virtual Response
Responses responses;
void removeRootPath(const String & root_path) override;
size_t bytesSize() const override
{
size_t size = 0;
for (const auto & response : responses)
size += response->bytesSize();
return size;
}
};
/// This response may be received only as an element of responses in MultiResponse.

View File

@ -32,6 +32,8 @@ struct ZooKeeperResponse : virtual Response
virtual void writeImpl(WriteBuffer &) const = 0;
virtual void write(WriteBuffer & out) const;
virtual OpNum getOpNum() const = 0;
//size_t bytesSize() const override { return Response::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
using ZooKeeperResponsePtr = std::shared_ptr<ZooKeeperResponse>;
@ -84,6 +86,8 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest
void readImpl(ReadBuffer & in) override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return ZooKeeperRequest::bytesSize() + path.size(); }
};
struct ZooKeeperSyncResponse final : ZooKeeperResponse
@ -92,6 +96,8 @@ struct ZooKeeperSyncResponse final : ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Sync; }
size_t bytesSize() const override { return path.size(); }
};
struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse
@ -128,6 +134,9 @@ struct ZooKeeperAuthRequest final : ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return ZooKeeperRequest::bytesSize() + sizeof(xid) +
sizeof(type) + scheme.size() + data.size(); }
};
struct ZooKeeperAuthResponse final : ZooKeeperResponse
@ -136,6 +145,8 @@ struct ZooKeeperAuthResponse final : ZooKeeperResponse
void writeImpl(WriteBuffer &) const override {}
OpNum getOpNum() const override { return OpNum::Auth; }
size_t bytesSize() const override { return ZooKeeperResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperCloseRequest final : ZooKeeperRequest
@ -172,6 +183,8 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return CreateRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
@ -181,6 +194,8 @@ struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Create; }
size_t bytesSize() const override { return CreateResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
@ -194,6 +209,8 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return RemoveRequest::bytesSize() + sizeof(xid); }
};
struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
@ -201,6 +218,8 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
OpNum getOpNum() const override { return OpNum::Remove; }
size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
@ -211,6 +230,8 @@ struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return !has_watch; }
size_t bytesSize() const override { return ExistsRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
@ -218,6 +239,8 @@ struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Exists; }
size_t bytesSize() const override { return ExistsResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
@ -228,6 +251,8 @@ struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return !has_watch; }
size_t bytesSize() const override { return GetRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse
@ -235,6 +260,8 @@ struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Get; }
size_t bytesSize() const override { return GetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
@ -247,6 +274,8 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
void readImpl(ReadBuffer & in) override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return SetRequest::bytesSize() + sizeof(xid); }
};
struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
@ -254,6 +283,8 @@ struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Set; }
size_t bytesSize() const override { return SetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
@ -263,6 +294,8 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
void readImpl(ReadBuffer & in) override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return !has_watch; }
size_t bytesSize() const override { return ListRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest
@ -275,6 +308,8 @@ struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::List; }
size_t bytesSize() const override { return ListResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
@ -293,6 +328,8 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return !has_watch; }
size_t bytesSize() const override { return CheckRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
@ -300,6 +337,8 @@ struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
OpNum getOpNum() const override { return OpNum::Check; }
size_t bytesSize() const override { return CheckResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
/// This response may be received only as an element of responses in MultiResponse.
@ -309,6 +348,8 @@ struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Error; }
size_t bytesSize() const override { return ErrorResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
@ -323,6 +364,8 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override;
size_t bytesSize() const override { return MultiRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
@ -346,6 +389,7 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
void writeImpl(WriteBuffer & out) const override;
size_t bytesSize() const override { return MultiResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
class ZooKeeperRequestFactory final : private boost::noncopyable

View File

@ -22,13 +22,13 @@ struct Settings;
M(Milliseconds, heart_beat_interval_ms, 500, "Heartbeat interval between quorum nodes", 0) \
M(Milliseconds, election_timeout_lower_bound_ms, 1000, "Lower bound of election timer (avoid too often leader elections)", 0) \
M(Milliseconds, election_timeout_upper_bound_ms, 2000, "Lower bound of election timer (avoid too often leader elections)", 0) \
M(UInt64, reserved_log_items, 10000, "How many log items to store (don't remove during compaction)", 0) \
M(UInt64, snapshot_distance, 10000, "How many log items we have to collect to write new snapshot", 0) \
M(UInt64, reserved_log_items, 100000, "How many log items to store (don't remove during compaction)", 0) \
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How many time we will until RAFT shutdown", 0) \
M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 10000, "How many records will be stored in one log storage file", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \

View File

@ -405,8 +405,6 @@ struct KeeperStorageListRequest final : public KeeperStorageRequest
response.names.insert(response.names.end(), it->value.children.begin(), it->value.children.end());
std::sort(response.names.begin(), response.names.end());
response.stat = it->value.stat;
response.error = Coordination::Error::ZOK;
}

View File

@ -8,6 +8,7 @@
#include <Coordination/SnapshotableHashTable.h>
#include <unordered_map>
#include <unordered_set>
#include <set>
#include <vector>
namespace DB
@ -17,7 +18,7 @@ using namespace DB;
struct KeeperStorageRequest;
using KeeperStorageRequestPtr = std::shared_ptr<KeeperStorageRequest>;
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
using ChildrenSet = std::unordered_set<std::string>;
using ChildrenSet = std::set<std::string>;
using SessionAndTimeout = std::unordered_map<int64_t, int64_t>;
struct KeeperStorageSnapshot;

View File

@ -5,10 +5,21 @@
using namespace Coordination;
using namespace zkutil;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace
{
std::string generateRandomString(size_t length)
{
if (length == 0)
return "";
static const auto & chrs = "0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
@ -39,9 +50,25 @@ std::string generateRandomData(size_t size)
return generateRandomString(size);
}
void CreateRequestGenerator::startup(Coordination::ZooKeeper & zookeeper)
{
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto create_callback = [promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
zookeeper.create(path_prefix, "", false, false, default_acls, create_callback);
future.get();
}
ZooKeeperRequestPtr CreateRequestGenerator::generate()
{
auto request = std::make_shared<ZooKeeperCreateRequest>();
request->acls = default_acls;
size_t plength = 5;
if (path_length)
plength = *path_length;
@ -58,3 +85,156 @@ ZooKeeperRequestPtr CreateRequestGenerator::generate()
return request;
}
void GetRequestGenerator::startup(Coordination::ZooKeeper & zookeeper)
{
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto create_callback = [promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
zookeeper.create(path_prefix, "", false, false, default_acls, create_callback);
future.get();
size_t total_nodes = 1;
if (num_nodes)
total_nodes = *num_nodes;
for (size_t i = 0; i < total_nodes; ++i)
{
auto path = generateRandomPath(path_prefix, 5);
while (std::find(paths_to_get.begin(), paths_to_get.end(), path) != paths_to_get.end())
path = generateRandomPath(path_prefix, 5);
auto create_promise = std::make_shared<std::promise<void>>();
auto create_future = create_promise->get_future();
auto callback = [create_promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
create_promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
create_promise->set_value();
};
std::string data;
if (nodes_data_size)
data = generateRandomString(*nodes_data_size);
zookeeper.create(path, data, false, false, default_acls, callback);
create_future.get();
paths_to_get.push_back(path);
}
}
Coordination::ZooKeeperRequestPtr GetRequestGenerator::generate()
{
auto request = std::make_shared<ZooKeeperGetRequest>();
size_t path_index = distribution(rng);
request->path = paths_to_get[path_index];
return request;
}
void ListRequestGenerator::startup(Coordination::ZooKeeper & zookeeper)
{
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto create_callback = [promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
zookeeper.create(path_prefix, "", false, false, default_acls, create_callback);
future.get();
size_t total_nodes = 1;
if (num_nodes)
total_nodes = *num_nodes;
size_t path_length = 5;
if (paths_length)
path_length = *paths_length;
for (size_t i = 0; i < total_nodes; ++i)
{
auto path = generateRandomPath(path_prefix, path_length);
auto create_promise = std::make_shared<std::promise<void>>();
auto create_future = create_promise->get_future();
auto callback = [create_promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
create_promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
create_promise->set_value();
};
zookeeper.create(path, "", false, false, default_acls, callback);
create_future.get();
}
}
Coordination::ZooKeeperRequestPtr ListRequestGenerator::generate()
{
auto request = std::make_shared<ZooKeeperListRequest>();
request->path = path_prefix;
return request;
}
std::unique_ptr<IGenerator> getGenerator(const std::string & name)
{
if (name == "create_no_data")
{
return std::make_unique<CreateRequestGenerator>();
}
else if (name == "create_small_data")
{
return std::make_unique<CreateRequestGenerator>("/create_generator", 5, 32);
}
else if (name == "create_medium_data")
{
return std::make_unique<CreateRequestGenerator>("/create_generator", 5, 1024);
}
else if (name == "create_big_data")
{
return std::make_unique<CreateRequestGenerator>("/create_generator", 5, 512 * 1024);
}
else if (name == "get_no_data")
{
return std::make_unique<GetRequestGenerator>("/create_generator", 10, 0);
}
else if (name == "get_small_data")
{
return std::make_unique<GetRequestGenerator>("/create_generator", 10, 32);
}
else if (name == "get_medium_data")
{
return std::make_unique<GetRequestGenerator>("/create_generator", 10, 1024);
}
else if (name == "get_big_data")
{
return std::make_unique<GetRequestGenerator>("/create_generator", 10, 512 * 1024);
}
else if (name == "list_no_nodes")
{
return std::make_unique<ListRequestGenerator>("/list_generator", 0, 1);
}
else if (name == "list_few_nodes")
{
return std::make_unique<ListRequestGenerator>("/list_generator", 10, 5);
}
else if (name == "list_medium_nodes")
{
return std::make_unique<ListRequestGenerator>("/list_generator", 1000, 5);
}
else if (name == "list_a_lot_nodes")
{
return std::make_unique<ListRequestGenerator>("/list_generator", 100000, 5);
}
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown generator {}", name);
}

View File

@ -5,6 +5,9 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <functional>
#include <optional>
#include <pcg-random/pcg_random.hpp>
#include <Common/randomSeed.h>
std::string generateRandomPath(const std::string & prefix, size_t length = 5);
@ -13,17 +16,28 @@ std::string generateRandomData(size_t size);
class IGenerator
{
public:
IGenerator()
{
Coordination::ACL acl;
acl.permissions = Coordination::ACL::All;
acl.scheme = "world";
acl.id = "anyone";
default_acls.emplace_back(std::move(acl));
}
virtual void startup(Coordination::ZooKeeper & /*zookeeper*/) {}
virtual Coordination::ZooKeeperRequestPtr generate() = 0;
virtual void teardown(Coordination::ZooKeeper & /*zookeeper*/) {}
virtual ~IGenerator() = default;
Coordination::ACLs default_acls;
};
class CreateRequestGenerator final : public IGenerator
{
public:
explicit CreateRequestGenerator(
std::string path_prefix_ = "/",
std::string path_prefix_ = "/create_generator",
std::optional<uint64_t> path_length_ = std::nullopt,
std::optional<uint64_t> data_size_ = std::nullopt)
: path_prefix(path_prefix_)
@ -31,6 +45,7 @@ public:
, data_size(data_size_)
{}
void startup(Coordination::ZooKeeper & zookeeper) override;
Coordination::ZooKeeperRequestPtr generate() override;
private:
@ -39,3 +54,54 @@ private:
std::optional<uint64_t> data_size;
std::unordered_set<std::string> paths_created;
};
class GetRequestGenerator final : public IGenerator
{
public:
explicit GetRequestGenerator(
std::string path_prefix_ = "/get_generator",
std::optional<uint64_t> num_nodes_ = std::nullopt,
std::optional<uint64_t> nodes_data_size_ = std::nullopt)
: path_prefix(path_prefix_)
, num_nodes(num_nodes_)
, nodes_data_size(nodes_data_size_)
, rng(randomSeed())
, distribution(0, num_nodes ? *num_nodes - 1 : 0)
{}
void startup(Coordination::ZooKeeper & zookeeper) override;
Coordination::ZooKeeperRequestPtr generate() override;
private:
std::string path_prefix;
std::optional<uint64_t> num_nodes;
std::optional<uint64_t> nodes_data_size;
std::vector<std::string> paths_to_get;
pcg64 rng;
std::uniform_int_distribution<size_t> distribution;
};
class ListRequestGenerator final : public IGenerator
{
public:
explicit ListRequestGenerator(
std::string path_prefix_ = "/list_generator",
std::optional<uint64_t> num_nodes_ = std::nullopt,
std::optional<uint64_t> paths_length_ = std::nullopt)
: path_prefix(path_prefix_)
, num_nodes(num_nodes_)
, paths_length(paths_length_)
{}
void startup(Coordination::ZooKeeper & zookeeper) override;
Coordination::ZooKeeperRequestPtr generate() override;
private:
std::string path_prefix;
std::optional<uint64_t> num_nodes;
std::optional<uint64_t> paths_length;
};
std::unique_ptr<IGenerator> getGenerator(const std::string & name);

View File

@ -15,15 +15,6 @@
using Ports = std::vector<UInt16>;
using Strings = std::vector<std::string>;
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_BLOCK_SIGNAL;
extern const int EMPTY_DATA_PASSED;
}
}
class Runner
{
public:
@ -38,7 +29,7 @@ public:
: concurrency(concurrency_)
, pool(concurrency)
, hosts_strings(hosts_strings_)
, generator(generator_name == "create_no_data" ? std::make_unique<CreateRequestGenerator>("/") : nullptr)
, generator(getGenerator(generator_name))
, max_time(max_time_)
, delay(delay_)
, continue_on_error(continue_on_error_)
@ -48,158 +39,16 @@ public:
{
}
void thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> & zookeepers)
{
Coordination::ZooKeeperRequestPtr request;
/// Randomly choosing connection index
pcg64 rng(randomSeed());
std::uniform_int_distribution<size_t> distribution(0, zookeepers.size() - 1);
void thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> & zookeepers);
/// In these threads we do not accept INT signal.
sigset_t sig_set;
if (sigemptyset(&sig_set)
|| sigaddset(&sig_set, SIGINT)
|| pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
{
DB::throwFromErrno("Cannot block signal.", DB::ErrorCodes::CANNOT_BLOCK_SIGNAL);
}
while (true)
{
bool extracted = false;
while (!extracted)
{
extracted = queue.tryPop(request, 100);
if (shutdown
|| (max_iterations && requests_executed >= max_iterations))
{
return;
}
}
const auto connection_index = distribution(rng);
auto & zk = zookeepers[connection_index];
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
Coordination::ResponseCallback callback = [promise](const Coordination::Response & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
Stopwatch watch;
zk->executeGenericRequest(request, callback);
try
{
future.get();
double seconds = watch.elapsedSeconds();
std::lock_guard lock(mutex);
info->add(seconds, 0, 0);
}
catch (...)
{
if (!continue_on_error)
{
shutdown = true;
throw;
}
std::cerr << DB::getCurrentExceptionMessage(true, true /*check embedded stack trace*/) << std::endl;
}
++requests_executed;
}
}
void printNumberOfRequestsExecuted(size_t num)
{
std::cerr << "Requests executed: " << num << ".\n";
}
bool tryPushRequestInteractively(const Coordination::ZooKeeperRequestPtr & request, DB::InterruptListener & interrupt_listener)
{
bool inserted = false;
bool tryPushRequestInteractively(const Coordination::ZooKeeperRequestPtr & request, DB::InterruptListener & interrupt_listener);
while (!inserted)
{
inserted = queue.tryPush(request, 100);
if (shutdown)
{
/// An exception occurred in a worker
return false;
}
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
}
if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
return false;
}
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
{
printNumberOfRequestsExecuted(requests_executed);
std::lock_guard lock(mutex);
report(info, concurrency);
delay_watch.restart();
}
}
return true;
}
void runBenchmark()
{
try
{
for (size_t i = 0; i < concurrency; ++i)
{
auto connections = getConnections();
pool.scheduleOrThrowOnError([this, connections]() mutable { thread(connections); });
}
}
catch (...)
{
pool.wait();
throw;
}
DB::InterruptListener interrupt_listener;
delay_watch.restart();
/// Push queries into queue
for (size_t i = 0; !max_iterations || i < max_iterations; ++i)
{
if (!tryPushRequestInteractively(generator->generate(), interrupt_listener))
{
shutdown = true;
break;
}
}
pool.wait();
total_watch.stop();
printNumberOfRequestsExecuted(requests_executed);
std::lock_guard lock(mutex);
report(info, concurrency);
}
void runBenchmark();
private:
@ -226,24 +75,5 @@ private:
using Queue = ConcurrentBoundedQueue<Coordination::ZooKeeperRequestPtr>;
Queue queue;
std::vector<std::shared_ptr<Coordination::ZooKeeper>> getConnections()
{
std::vector<std::shared_ptr<Coordination::ZooKeeper>> zookeepers;
for (const auto & host_string : hosts_strings)
{
Coordination::ZooKeeper::Node node{Poco::Net::SocketAddress{host_string}, false};
std::vector<Coordination::ZooKeeper::Node> nodes;
nodes.push_back(node);
zookeepers.emplace_back(std::make_shared<Coordination::ZooKeeper>(
nodes,
"/",
"",
"",
Poco::Timespan(0, 30000 * 1000),
Poco::Timespan(0, 1000 * 1000),
Poco::Timespan(0, 10000 * 1000)));
}
return zookeepers;
}
std::vector<std::shared_ptr<Coordination::ZooKeeper>> getConnections();
};

View File

@ -6,35 +6,62 @@ void report(std::shared_ptr<Stats> & info, size_t concurrency)
std::cerr << "\n";
/// Avoid zeros, nans or exceptions
if (0 == info->requests)
if (0 == info->read_requests && 0 == info->write_requests)
return;
double seconds = info->work_time / concurrency;
double read_seconds = info->read_work_time / concurrency;
double write_seconds = info->write_work_time / concurrency;
std::cerr << "requests " << info->requests << ", ";
std::cerr << "read requests " << info->read_requests << ", write requests " << info->write_requests << ", ";
if (info->errors)
{
std::cerr << "errors " << info->errors << ", ";
}
std::cerr
<< "RPS: " << (info->requests / seconds) << ", "
<< "Read MiB/s: " << (info->requests_read_bytes / seconds / 1048576) << ", "
<< "Write MiB/s: " << (info->requests_write_bytes / seconds / 1048576) << ". "
if (0 != info->read_requests)
{
std::cerr
<< "Read RPS: " << (info->read_requests / read_seconds) << ", "
<< "Read MiB/s: " << (info->requests_read_bytes / read_seconds / 1048576);
if (0 != info->write_requests)
std::cerr << ", ";
}
if (0 != info->write_requests)
{
std::cerr
<< "Write RPS: " << (info->write_requests / write_seconds) << ", "
<< "Write MiB/s: " << (info->requests_write_bytes / write_seconds / 1048576) << ". "
<< "\n";
}
std::cerr << "\n";
auto print_percentile = [&](double percent)
auto print_percentile = [&](double percent, Stats::Sampler & sampler)
{
std::cerr << percent << "%\t\t";
std::cerr << info->sampler.quantileNearest(percent / 100.0) << " sec.\t";
std::cerr << sampler.quantileNearest(percent / 100.0) << " sec.\t";
std::cerr << "\n";
};
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent);
if (0 != info->read_requests)
{
std::cerr << "Read sampler:\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent, info->read_sampler);
print_percentile(95);
print_percentile(99);
print_percentile(99.9);
print_percentile(99.99);
print_percentile(95, info->read_sampler);
print_percentile(99, info->read_sampler);
print_percentile(99.9, info->read_sampler);
print_percentile(99.99, info->read_sampler);
}
if (0 != info->write_requests)
{
std::cerr << "Write sampler:\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent, info->write_sampler);
print_percentile(95, info->write_sampler);
print_percentile(99, info->write_sampler);
print_percentile(99.9, info->write_sampler);
print_percentile(99.99, info->write_sampler);
}
}

View File

@ -7,31 +7,44 @@
struct Stats
{
std::atomic<size_t> requests{0};
std::atomic<size_t> read_requests{0};
std::atomic<size_t> write_requests{0};
size_t errors = 0;
size_t requests_write_bytes = 0;
size_t requests_read_bytes = 0;
double work_time = 0;
double read_work_time = 0;
double write_work_time = 0;
using Sampler = ReservoirSampler<double>;
Sampler sampler {1 << 16};
Sampler read_sampler {1 << 16};
Sampler write_sampler {1 << 16};
void add(double seconds, size_t request_read_bytes_inc, size_t request_write_bytes_inc)
void addRead(double seconds, size_t requests_inc, size_t bytes_inc)
{
++requests;
work_time += seconds;
requests_read_bytes += request_read_bytes_inc;
requests_write_bytes += request_write_bytes_inc;
sampler.insert(seconds);
read_work_time += seconds;
read_requests += requests_inc;
requests_read_bytes += bytes_inc;
read_sampler.insert(seconds);
}
void addWrite(double seconds, size_t requests_inc, size_t bytes_inc)
{
write_work_time += seconds;
write_requests += requests_inc;
requests_write_bytes += bytes_inc;
write_sampler.insert(seconds);
}
void clear()
{
requests = 0;
work_time = 0;
read_requests = 0;
write_requests = 0;
read_work_time = 0;
write_work_time = 0;
requests_read_bytes = 0;
requests_write_bytes = 0;
sampler.clear();
read_sampler.clear();
write_sampler.clear();
}
};

View File

@ -20,7 +20,7 @@ int main(int argc, char *argv[])
boost::program_options::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
desc.add_options()
("help", "produce help message")
("generator", value<std::string>()->default_value(""), "query to execute")
("generator", value<std::string>()->default_value("create_small_data"), "query to execute")
("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries")
("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")