Merge pull request #23038 from ClickHouse/keeper_bench_mark

Add tool for [Zoo]Keeper benchmark
This commit is contained in:
alesapin 2021-04-19 10:47:29 +03:00 committed by GitHub
commit 14800f0c52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 1523 additions and 160 deletions

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit d2feb5978b979729a07c3ca76eaa4ab94cef4ceb
Subproject commit 377f8e77491d9f66ce8e32e88aae19dffe8dc4d7

View File

@ -116,6 +116,7 @@ struct Request
virtual ~Request() = default;
virtual String getPath() const = 0;
virtual void addRootPath(const String & /* root_path */) {}
virtual size_t bytesSize() const { return 0; }
};
struct Response;
@ -131,6 +132,7 @@ struct Response
Response & operator=(const Response &) = default;
virtual ~Response() = default;
virtual void removeRootPath(const String & /* root_path */) {}
virtual size_t bytesSize() const { return 0; }
};
struct WatchResponse : virtual Response
@ -140,6 +142,8 @@ struct WatchResponse : virtual Response
String path;
void removeRootPath(const String & root_path) override;
size_t bytesSize() const override { return path.size() + sizeof(type) + sizeof(state); }
};
using WatchCallback = std::function<void(const WatchResponse &)>;
@ -154,6 +158,9 @@ struct CreateRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size() + data.size()
+ sizeof(is_ephemeral) + sizeof(is_sequential) + acls.size() * sizeof(ACL); }
};
struct CreateResponse : virtual Response
@ -161,6 +168,8 @@ struct CreateResponse : virtual Response
String path_created;
void removeRootPath(const String & root_path) override;
size_t bytesSize() const override { return path_created.size(); }
};
struct RemoveRequest : virtual Request
@ -170,6 +179,8 @@ struct RemoveRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size() + sizeof(version); }
};
struct RemoveResponse : virtual Response
@ -182,11 +193,15 @@ struct ExistsRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size(); }
};
struct ExistsResponse : virtual Response
{
Stat stat;
size_t bytesSize() const override { return sizeof(Stat); }
};
struct GetRequest : virtual Request
@ -195,12 +210,16 @@ struct GetRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size(); }
};
struct GetResponse : virtual Response
{
String data;
Stat stat;
size_t bytesSize() const override { return data.size() + sizeof(stat); }
};
struct SetRequest : virtual Request
@ -211,11 +230,15 @@ struct SetRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return data.size() + data.size() + sizeof(version); }
};
struct SetResponse : virtual Response
{
Stat stat;
size_t bytesSize() const override { return sizeof(stat); }
};
struct ListRequest : virtual Request
@ -224,12 +247,22 @@ struct ListRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size(); }
};
struct ListResponse : virtual Response
{
std::vector<String> names;
Stat stat;
size_t bytesSize() const override
{
size_t size = sizeof(stat);
for (const auto & name : names)
size += name.size();
return size;
}
};
struct CheckRequest : virtual Request
@ -239,6 +272,8 @@ struct CheckRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
size_t bytesSize() const override { return path.size() + sizeof(version); }
};
struct CheckResponse : virtual Response
@ -251,6 +286,14 @@ struct MultiRequest : virtual Request
void addRootPath(const String & root_path) override;
String getPath() const override { return {}; }
size_t bytesSize() const override
{
size_t size = 0;
for (const auto & request : requests)
size += request->bytesSize();
return size;
}
};
struct MultiResponse : virtual Response
@ -258,6 +301,14 @@ struct MultiResponse : virtual Response
Responses responses;
void removeRootPath(const String & root_path) override;
size_t bytesSize() const override
{
size_t size = 0;
for (const auto & response : responses)
size += response->bytesSize();
return size;
}
};
/// This response may be received only as an element of responses in MultiResponse.

View File

@ -455,6 +455,39 @@ ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return std::m
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const { return std::make_shared<ZooKeeperMultiResponse>(requests); }
ZooKeeperResponsePtr ZooKeeperCloseRequest::makeResponse() const { return std::make_shared<ZooKeeperCloseResponse>(); }
void ZooKeeperSessionIDRequest::writeImpl(WriteBuffer & out) const
{
Coordination::write(internal_id, out);
Coordination::write(session_timeout_ms, out);
Coordination::write(server_id, out);
}
void ZooKeeperSessionIDRequest::readImpl(ReadBuffer & in)
{
Coordination::read(internal_id, in);
Coordination::read(session_timeout_ms, in);
Coordination::read(server_id, in);
}
Coordination::ZooKeeperResponsePtr ZooKeeperSessionIDRequest::makeResponse() const
{
return std::make_shared<ZooKeeperSessionIDResponse>();
}
void ZooKeeperSessionIDResponse::readImpl(ReadBuffer & in)
{
Coordination::read(internal_id, in);
Coordination::read(session_id, in);
Coordination::read(server_id, in);
}
void ZooKeeperSessionIDResponse::writeImpl(WriteBuffer & out) const
{
Coordination::write(internal_id, out);
Coordination::write(session_id, out);
Coordination::write(server_id, out);
}
void ZooKeeperRequestFactory::registerRequest(OpNum op_num, Creator creator)
{
if (!op_num_to_request.try_emplace(op_num, creator).second)
@ -511,6 +544,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::List, ZooKeeperListRequest>(*this);
registerZooKeeperRequest<OpNum::Check, ZooKeeperCheckRequest>(*this);
registerZooKeeperRequest<OpNum::Multi, ZooKeeperMultiRequest>(*this);
registerZooKeeperRequest<OpNum::SessionID, ZooKeeperSessionIDRequest>(*this);
}
}

View File

@ -84,6 +84,8 @@ struct ZooKeeperSyncRequest final : ZooKeeperRequest
void readImpl(ReadBuffer & in) override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return ZooKeeperRequest::bytesSize() + path.size(); }
};
struct ZooKeeperSyncResponse final : ZooKeeperResponse
@ -92,6 +94,8 @@ struct ZooKeeperSyncResponse final : ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Sync; }
size_t bytesSize() const override { return path.size(); }
};
struct ZooKeeperHeartbeatResponse final : ZooKeeperResponse
@ -128,6 +132,9 @@ struct ZooKeeperAuthRequest final : ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return ZooKeeperRequest::bytesSize() + sizeof(xid) +
sizeof(type) + scheme.size() + data.size(); }
};
struct ZooKeeperAuthResponse final : ZooKeeperResponse
@ -136,6 +143,8 @@ struct ZooKeeperAuthResponse final : ZooKeeperResponse
void writeImpl(WriteBuffer &) const override {}
OpNum getOpNum() const override { return OpNum::Auth; }
size_t bytesSize() const override { return ZooKeeperResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperCloseRequest final : ZooKeeperRequest
@ -172,6 +181,8 @@ struct ZooKeeperCreateRequest final : public CreateRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return CreateRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
@ -181,6 +192,8 @@ struct ZooKeeperCreateResponse final : CreateResponse, ZooKeeperResponse
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Create; }
size_t bytesSize() const override { return CreateResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
@ -194,6 +207,8 @@ struct ZooKeeperRemoveRequest final : RemoveRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return RemoveRequest::bytesSize() + sizeof(xid); }
};
struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
@ -201,6 +216,8 @@ struct ZooKeeperRemoveResponse final : RemoveResponse, ZooKeeperResponse
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
OpNum getOpNum() const override { return OpNum::Remove; }
size_t bytesSize() const override { return RemoveResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
@ -211,6 +228,8 @@ struct ZooKeeperExistsRequest final : ExistsRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return !has_watch; }
size_t bytesSize() const override { return ExistsRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
@ -218,6 +237,8 @@ struct ZooKeeperExistsResponse final : ExistsResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Exists; }
size_t bytesSize() const override { return ExistsResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
@ -228,6 +249,8 @@ struct ZooKeeperGetRequest final : GetRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return !has_watch; }
size_t bytesSize() const override { return GetRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse
@ -235,6 +258,8 @@ struct ZooKeeperGetResponse final : GetResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Get; }
size_t bytesSize() const override { return GetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
@ -247,6 +272,8 @@ struct ZooKeeperSetRequest final : SetRequest, ZooKeeperRequest
void readImpl(ReadBuffer & in) override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
size_t bytesSize() const override { return SetRequest::bytesSize() + sizeof(xid); }
};
struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
@ -254,6 +281,8 @@ struct ZooKeeperSetResponse final : SetResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Set; }
size_t bytesSize() const override { return SetResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
@ -263,6 +292,8 @@ struct ZooKeeperListRequest : ListRequest, ZooKeeperRequest
void readImpl(ReadBuffer & in) override;
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return !has_watch; }
size_t bytesSize() const override { return ListRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperSimpleListRequest final : ZooKeeperListRequest
@ -275,6 +306,8 @@ struct ZooKeeperListResponse : ListResponse, ZooKeeperResponse
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::List; }
size_t bytesSize() const override { return ListResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
@ -293,6 +326,8 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return !has_watch; }
size_t bytesSize() const override { return CheckRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
@ -300,6 +335,8 @@ struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
OpNum getOpNum() const override { return OpNum::Check; }
size_t bytesSize() const override { return CheckResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
/// This response may be received only as an element of responses in MultiResponse.
@ -309,6 +346,8 @@ struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse
void writeImpl(WriteBuffer & out) const override;
OpNum getOpNum() const override { return OpNum::Error; }
size_t bytesSize() const override { return ErrorResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
@ -323,6 +362,8 @@ struct ZooKeeperMultiRequest final : MultiRequest, ZooKeeperRequest
ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override;
size_t bytesSize() const override { return MultiRequest::bytesSize() + sizeof(xid) + sizeof(has_watch); }
};
struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
@ -346,6 +387,41 @@ struct ZooKeeperMultiResponse final : MultiResponse, ZooKeeperResponse
void writeImpl(WriteBuffer & out) const override;
size_t bytesSize() const override { return MultiResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
/// Fake internal coordination (keeper) response. Never received from client
/// and never send to client.
struct ZooKeeperSessionIDRequest final : ZooKeeperRequest
{
int64_t internal_id;
int64_t session_timeout_ms;
/// Who requested this session
int32_t server_id;
Coordination::OpNum getOpNum() const override { return OpNum::SessionID; }
String getPath() const override { return {}; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
Coordination::ZooKeeperResponsePtr makeResponse() const override;
bool isReadRequest() const override { return false; }
};
/// Fake internal coordination (keeper) response. Never received from client
/// and never send to client.
struct ZooKeeperSessionIDResponse final : ZooKeeperResponse
{
int64_t internal_id;
int64_t session_id;
/// Who requested this session
int32_t server_id;
void readImpl(ReadBuffer & in) override;
void writeImpl(WriteBuffer & out) const override;
Coordination::OpNum getOpNum() const override { return OpNum::SessionID; }
};
class ZooKeeperRequestFactory final : private boost::noncopyable

View File

@ -21,6 +21,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::Check),
static_cast<int32_t>(OpNum::Multi),
static_cast<int32_t>(OpNum::Auth),
static_cast<int32_t>(OpNum::SessionID),
};
std::string toString(OpNum op_num)
@ -55,6 +56,8 @@ std::string toString(OpNum op_num)
return "Heartbeat";
case OpNum::Auth:
return "Auth";
case OpNum::SessionID:
return "SessionID";
}
int32_t raw_op = static_cast<int32_t>(op_num);
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);

View File

@ -30,6 +30,7 @@ enum class OpNum : int32_t
Check = 13,
Multi = 14,
Auth = 100,
SessionID = 997, /// Special internal request
};
std::string toString(OpNum op_num);

View File

@ -1012,6 +1012,16 @@ void ZooKeeper::pushRequest(RequestInfo && info)
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
}
void ZooKeeper::executeGenericRequest(
const ZooKeeperRequestPtr & request,
ResponseCallback callback)
{
RequestInfo request_info;
request_info.request = request;
request_info.callback = callback;
pushRequest(std::move(request_info));
}
void ZooKeeper::create(
const String & path,

View File

@ -121,6 +121,9 @@ public:
/// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; }
void executeGenericRequest(
const ZooKeeperRequestPtr & request,
ResponseCallback callback);
/// See the documentation about semantics of these methods in IKeeper class.

View File

@ -80,7 +80,7 @@ public:
{}
off_t appendRecord(ChangelogRecord && record, bool sync)
off_t appendRecord(ChangelogRecord && record)
{
off_t result = plain_buf.count();
writeIntBinary(computeRecordChecksum(record), plain_buf);
@ -96,22 +96,20 @@ public:
entries_written++;
if (sync)
plain_buf.sync();
else
plain_buf.next();
return result;
}
void truncateToLength(off_t new_length)
{
flush();
plain_buf.next();
plain_buf.truncate(new_length);
plain_buf.seek(new_length, SEEK_SET);
}
void flush()
void flush(bool force_fsync)
{
plain_buf.next();
if (force_fsync)
plain_buf.sync();
}
@ -247,9 +245,14 @@ private:
ReadBufferFromFile read_buf;
};
Changelog::Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_)
Changelog::Changelog(
const std::string & changelogs_dir_,
uint64_t rotate_interval_,
bool force_sync_,
Poco::Logger * log_)
: changelogs_dir(changelogs_dir_)
, rotate_interval(rotate_interval_)
, force_sync(force_sync_)
, log(log_)
{
namespace fs = std::filesystem;
@ -357,6 +360,9 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
void Changelog::rotate(uint64_t new_start_log_index)
{
/// Flush previous log
flush();
ChangelogFileDescription new_description;
new_description.prefix = DEFAULT_PREFIX;
new_description.from_log_index = new_start_log_index;
@ -387,7 +393,7 @@ ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_e
return record;
}
void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync)
void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry)
{
if (!current_writer)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Changelog must be initialized before appending records");
@ -398,14 +404,14 @@ void Changelog::appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool
if (current_writer->getEntriesWritten() == rotate_interval)
rotate(index);
auto offset = current_writer->appendRecord(buildRecord(index, log_entry), force_sync);
auto offset = current_writer->appendRecord(buildRecord(index, log_entry));
if (!index_to_start_pos.try_emplace(index, offset).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Record with index {} already exists", index);
logs[index] = makeClone(log_entry);
}
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync)
void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
{
if (index_to_start_pos.count(index) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index);
@ -451,7 +457,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry, bool forc
current_writer->setEntriesWritten(entries_written);
appendEntry(index, log_entry, force_sync);
appendEntry(index, log_entry);
}
void Changelog::compact(uint64_t up_to_log_index)
@ -540,7 +546,7 @@ nuraft::ptr<nuraft::buffer> Changelog::serializeEntriesToBuffer(uint64_t index,
return buf_out;
}
void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync)
void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer)
{
buffer.pos(0);
int num_logs = buffer.get_int();
@ -555,23 +561,23 @@ void Changelog::applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer,
LogEntryPtr log_entry = nuraft::log_entry::deserialize(*buf_local);
if (i == 0 && logs.count(cur_index))
writeAt(cur_index, log_entry, force_sync);
writeAt(cur_index, log_entry);
else
appendEntry(cur_index, log_entry, force_sync);
appendEntry(cur_index, log_entry);
}
}
void Changelog::flush()
{
current_writer->flush();
if (current_writer)
current_writer->flush(force_sync);
}
Changelog::~Changelog()
{
try
{
if (current_writer)
current_writer->flush();
flush();
}
catch (...)
{

View File

@ -63,17 +63,17 @@ class Changelog
{
public:
Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, Poco::Logger * log_);
Changelog(const std::string & changelogs_dir_, uint64_t rotate_interval_, bool force_sync_, Poco::Logger * log_);
/// Read changelog from files on changelogs_dir_ skipping all entries before from_log_index
/// Truncate broken entries, remove files after broken entries.
void readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep);
/// Add entry to log with index. Call fsync if force_sync true.
void appendEntry(uint64_t index, const LogEntryPtr & log_entry, bool force_sync);
/// Add entry to log with index.
void appendEntry(uint64_t index, const LogEntryPtr & log_entry);
/// Write entry at index and truncate all subsequent entries.
void writeAt(uint64_t index, const LogEntryPtr & log_entry, bool force_sync);
void writeAt(uint64_t index, const LogEntryPtr & log_entry);
/// Remove log files with to_log_index <= up_to_log_index.
void compact(uint64_t up_to_log_index);
@ -101,9 +101,9 @@ public:
BufferPtr serializeEntriesToBuffer(uint64_t index, int32_t count);
/// Apply entries from buffer overriding existing entries
void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer, bool force_sync);
void applyEntriesFromBuffer(uint64_t index, nuraft::buffer & buffer);
/// Fsync log to disk
/// Fsync latest log to disk and flush buffer
void flush();
uint64_t size() const
@ -124,6 +124,7 @@ private:
private:
const std::string changelogs_dir;
const uint64_t rotate_interval;
const bool force_sync;
Poco::Logger * log;
std::map<uint64_t, ChangelogFileDescription> existing_changelogs;

View File

@ -22,16 +22,17 @@ struct Settings;
M(Milliseconds, heart_beat_interval_ms, 500, "Heartbeat interval between quorum nodes", 0) \
M(Milliseconds, election_timeout_lower_bound_ms, 1000, "Lower bound of election timer (avoid too often leader elections)", 0) \
M(Milliseconds, election_timeout_upper_bound_ms, 2000, "Lower bound of election timer (avoid too often leader elections)", 0) \
M(UInt64, reserved_log_items, 10000, "How many log items to store (don't remove during compaction)", 0) \
M(UInt64, snapshot_distance, 10000, "How many log items we have to collect to write new snapshot", 0) \
M(UInt64, reserved_log_items, 100000, "How many log items to store (don't remove during compaction)", 0) \
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
M(Milliseconds, shutdown_timeout, 5000, "How many time we will until RAFT shutdown", 0) \
M(Milliseconds, startup_timeout, 30000, "How many time we will until RAFT to start", 0) \
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
M(UInt64, rotate_log_storage_interval, 10000, "How many records will be stored in one log storage file", 0) \
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
M(UInt64, snapshots_to_keep, 3, "How many compressed snapshots to keep on disk", 0) \
M(UInt64, stale_log_gap, 10000, "When node became stale and should receive snapshots from leader", 0) \
M(UInt64, fresh_log_gap, 200, "When node became fresh", 0) \
M(UInt64, max_requests_batch_size, 100, "Max size of batch in requests count before it will be sent to RAFT", 0) \
M(Bool, quorum_reads, false, "Execute read requests as writes through whole RAFT consesus with similar speed", 0) \
M(Bool, force_sync, true, "Call fsync on each change in RAFT changelog", 0)

View File

@ -5,9 +5,12 @@ namespace DB
KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_)
: log(&Poco::Logger::get("KeeperLogStore"))
, changelog(changelogs_path, rotate_interval_, log)
, force_sync(force_sync_)
, changelog(changelogs_path, rotate_interval_, force_sync_, log)
{
if (force_sync_)
LOG_INFO(log, "force_sync enabled");
else
LOG_INFO(log, "force_sync disabled");
}
uint64_t KeeperLogStore::start_index() const
@ -38,7 +41,7 @@ uint64_t KeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
uint64_t idx = changelog.getNextEntryIndex();
changelog.appendEntry(idx, entry, force_sync);
changelog.appendEntry(idx, entry);
return idx;
}
@ -46,7 +49,7 @@ uint64_t KeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
void KeeperLogStore::write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
changelog.writeAt(index, entry, force_sync);
changelog.writeAt(index, entry);
}
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> KeeperLogStore::log_entries(uint64_t start, uint64_t end)
@ -93,7 +96,7 @@ bool KeeperLogStore::flush()
void KeeperLogStore::apply_pack(uint64_t index, nuraft::buffer & pack)
{
std::lock_guard lock(changelog_lock);
changelog.applyEntriesFromBuffer(index, pack, force_sync);
changelog.applyEntriesFromBuffer(index, pack);
}
uint64_t KeeperLogStore::size() const
@ -102,4 +105,10 @@ uint64_t KeeperLogStore::size() const
return changelog.size();
}
void KeeperLogStore::end_of_append_batch(uint64_t /*start_index*/, uint64_t /*count*/)
{
std::lock_guard lock(changelog_lock);
changelog.flush();
}
}

View File

@ -42,11 +42,12 @@ public:
uint64_t size() const;
void end_of_append_batch(uint64_t start_index, uint64_t count) override;
private:
mutable std::mutex changelog_lock;
Poco::Logger * log;
Changelog changelog;
bool force_sync;
};
}

View File

@ -24,6 +24,7 @@ namespace ErrorCodes
extern const int RAFT_ERROR;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
extern const int LOGICAL_ERROR;
}
namespace
@ -73,7 +74,6 @@ KeeperServer::KeeperServer(
config.getString("keeper_server.snapshot_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/snapshots"),
coordination_settings))
, state_manager(nuraft::cs_new<KeeperStateManager>(server_id, "keeper_server", config, coordination_settings))
, responses_queue(responses_queue_)
, log(&Poco::Logger::get("KeeperServer"))
{
if (coordination_settings->quorum_reads)
@ -111,7 +111,7 @@ void KeeperServer::startup()
params.auto_forwarding_ = coordination_settings->auto_forwarding;
params.auto_forwarding_req_timeout_ = coordination_settings->operation_timeout_ms.totalMilliseconds() * 2;
params.return_method_ = nuraft::raft_params::blocking;
params.return_method_ = nuraft::raft_params::async_handler;
nuraft::asio_service::options asio_opts{};
if (state_manager->isSecure())
@ -222,75 +222,26 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coord
}
void KeeperServer::putRequest(const KeeperStorage::RequestForSession & request_for_session)
{
auto [session_id, request] = request_for_session;
if (!coordination_settings->quorum_reads && isLeaderAlive() && request->isReadRequest())
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
if (!request_for_session.request->isReadRequest())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot process non-read request locally");
state_machine->processReadRequest(request_for_session);
}
else
RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions)
{
std::vector<nuraft::ptr<nuraft::buffer>> entries;
for (const auto & [session_id, request] : requests_for_sessions)
entries.push_back(getZooKeeperLogEntry(session_id, request));
{
std::lock_guard lock(append_entries_mutex);
auto result = raft_instance->append_entries(entries);
if (!result->get_accepted())
{
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response});
return raft_instance->append_entries(entries);
}
if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
{
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response});
}
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
throw Exception(ErrorCodes::RAFT_ERROR, "Requests result failed with code {} and message: '{}'", result->get_result_code(), result->get_result_str());
}
}
int64_t KeeperServer::getSessionID(int64_t session_timeout_ms)
{
/// Just some sanity check. We don't want to make a lot of clients wait with lock.
if (active_session_id_requests > 10)
throw Exception(ErrorCodes::RAFT_ERROR, "Too many concurrent SessionID requests already in flight");
++active_session_id_requests;
SCOPE_EXIT({ --active_session_id_requests; });
auto entry = nuraft::buffer::alloc(sizeof(int64_t));
/// Just special session request
nuraft::buffer_serializer bs(entry);
bs.put_i64(session_timeout_ms);
std::lock_guard lock(append_entries_mutex);
auto result = raft_instance->append_entries({entry});
if (!result->get_accepted())
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot send session_id request to RAFT");
if (result->get_result_code() != nuraft::cmd_result_code::OK)
throw Exception(ErrorCodes::RAFT_ERROR, "session_id request failed to RAFT");
auto resp = result->get();
if (resp == nullptr)
throw Exception(ErrorCodes::RAFT_ERROR, "Received nullptr as session_id");
nuraft::buffer_serializer bs_resp(resp);
return bs_resp.get_i64();
}
bool KeeperServer::isLeader() const

View File

@ -12,10 +12,12 @@
namespace DB
{
using RaftAppendResult = nuraft::ptr<nuraft::cmd_result<nuraft::ptr<nuraft::buffer>>>;
class KeeperServer
{
private:
int server_id;
const int server_id;
CoordinationSettingsPtr coordination_settings;
@ -29,13 +31,10 @@ private:
std::mutex append_entries_mutex;
ResponsesQueue & responses_queue;
std::mutex initialized_mutex;
std::atomic<bool> initialized_flag = false;
std::condition_variable initialized_cv;
std::atomic<bool> initial_batch_committed = false;
std::atomic<size_t> active_session_id_requests = 0;
Poco::Logger * log;
@ -60,9 +59,9 @@ public:
void startup();
void putRequest(const KeeperStorage::RequestForSession & request);
void putLocalReadRequest(const KeeperStorage::RequestForSession & request);
int64_t getSessionID(int64_t session_timeout_ms);
RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);
std::unordered_set<int64_t> getDeadSessions();
@ -73,6 +72,8 @@ public:
void waitInit();
void shutdown();
int getServerID() const { return server_id; }
};
}

View File

@ -90,25 +90,29 @@ void KeeperStateMachine::init()
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
{
if (data.size() == sizeof(int64_t))
auto request_for_session = parseRequest(data);
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
{
nuraft::buffer_serializer timeout_data(data);
int64_t session_timeout_ms = timeout_data.get_i64();
auto response = nuraft::buffer::alloc(sizeof(int64_t));
const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request_for_session.request);
int64_t session_id;
nuraft::buffer_serializer bs(response);
{
std::lock_guard lock(storage_lock);
session_id = storage->getSessionID(session_timeout_ms);
bs.put_i64(session_id);
session_id = storage->getSessionID(session_id_request.session_timeout_ms);
}
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_timeout_ms);
last_committed_idx = log_idx;
return response;
LOG_DEBUG(log, "Session ID response {} with timeout {}", session_id, session_id_request.session_timeout_ms);
std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>();
response->internal_id = session_id_request.internal_id;
response->session_id = session_id;
response->server_id = session_id_request.server_id;
KeeperStorage::ResponseForSession response_for_session;
response_for_session.session_id = -1;
response_for_session.response = response;
responses_queue.push(response_for_session);
}
else
{
auto request_for_session = parseRequest(data);
KeeperStorage::ResponsesForSessions responses_for_sessions;
{
std::lock_guard lock(storage_lock);
@ -116,11 +120,11 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
for (auto & response_for_session : responses_for_sessions)
responses_queue.push(response_for_session);
}
}
last_committed_idx = log_idx;
return nullptr;
}
}
bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{

View File

@ -405,8 +405,6 @@ struct KeeperStorageListRequest final : public KeeperStorageRequest
response.names.insert(response.names.end(), it->value.children.begin(), it->value.children.end());
std::sort(response.names.begin(), response.names.end());
response.stat = it->value.stat;
response.error = Coordination::Error::ZOK;
}

View File

@ -1,5 +1,9 @@
#include <Coordination/KeeperStorageDispatcher.h>
#include <Common/setThreadName.h>
#include <Common/Stopwatch.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <future>
#include <chrono>
namespace DB
{
@ -17,23 +21,111 @@ KeeperStorageDispatcher::KeeperStorageDispatcher()
{
}
void KeeperStorageDispatcher::requestThread()
{
setThreadName("KeeperReqT");
/// Result of requests batch from previous iteration
RaftAppendResult prev_result = nullptr;
/// Requests from previous iteration. We store them to be able
/// to send errors to the client.
KeeperStorage::RequestsForSessions prev_batch;
while (!shutdown_called)
{
KeeperStorage::RequestForSession request;
UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds());
uint64_t max_batch_size = coordination_settings->max_requests_batch_size;
if (requests_queue.tryPop(request, max_wait))
/// The code below do a very simple thing: batch all write (quorum) requests into vector until
/// previous write batch is not finished or max_batch size achieved. The main complexity goes from
/// the ability to process read requests without quorum (from local state). So when we are collecting
/// requests into a batch we must check that the new request is not read request. Otherwise we have to
/// process all already accumulated write requests, wait them synchronously and only after that process
/// read request. So reads are some kind of "separator" for writes.
try
{
if (requests_queue->tryPop(request, max_wait))
{
if (shutdown_called)
break;
try
KeeperStorage::RequestsForSessions current_batch;
bool has_read_request = false;
/// If new request is not read request or we must to process it through quorum.
/// Otherwise we will process it locally.
if (coordination_settings->quorum_reads || !request.request->isReadRequest())
{
server->putRequest(request);
current_batch.emplace_back(request);
/// Waiting until previous append will be successful, or batch is big enough
/// has_result == false && get_result_code == OK means that our request still not processed.
/// Sometimes NuRaft set errorcode without setting result, so we check both here.
while (prev_result && (!prev_result->has_result() && prev_result->get_result_code() == nuraft::cmd_result_code::OK) && current_batch.size() <= max_batch_size)
{
/// Trying to get batch requests as fast as possible
if (requests_queue->tryPop(request, 1))
{
/// Don't append read request into batch, we have to process them separately
if (!coordination_settings->quorum_reads && request.request->isReadRequest())
{
has_read_request = true;
break;
}
else
{
current_batch.emplace_back(request);
}
}
if (shutdown_called)
break;
}
}
else
has_read_request = true;
if (shutdown_called)
break;
/// Forcefully process all previous pending requests
if (prev_result)
forceWaitAndProcessResult(prev_result, prev_batch);
/// Process collected write requests batch
if (!current_batch.empty())
{
auto result = server->putRequestBatch(current_batch);
if (result)
{
if (has_read_request) /// If we will execute read request next, than we have to process result now
forceWaitAndProcessResult(result, current_batch);
}
else
{
addErrorResponses(current_batch, Coordination::Error::ZRUNTIMEINCONSISTENCY);
current_batch.clear();
}
prev_batch = current_batch;
prev_result = result;
}
/// Read request always goes after write batch (last request)
if (has_read_request)
{
if (server->isLeaderAlive())
server->putLocalReadRequest(request);
else
addErrorResponses({request}, Coordination::Error::ZRUNTIMEINCONSISTENCY);
}
}
}
catch (...)
{
@ -41,7 +133,6 @@ void KeeperStorageDispatcher::requestThread()
}
}
}
}
void KeeperStorageDispatcher::responseThread()
{
@ -94,15 +185,33 @@ void KeeperStorageDispatcher::snapshotThread()
void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(session_to_response_callback_mutex);
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::SessionID)
{
const Coordination::ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
/// Nobody waits for this session id
if (session_id_resp.server_id != server->getServerID() || !new_session_id_response_callback.count(session_id_resp.internal_id))
return;
auto callback = new_session_id_response_callback[session_id_resp.internal_id];
callback(response);
new_session_id_response_callback.erase(session_id_resp.internal_id);
}
else
{
auto session_writer = session_to_response_callback.find(session_id);
if (session_writer == session_to_response_callback.end())
return;
session_writer->second(response);
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
session_to_response_callback.erase(session_writer);
}
}
}
bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
@ -119,8 +228,8 @@ bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr
std::lock_guard lock(push_request_mutex);
/// Put close requests without timeouts
if (request->getOpNum() == Coordination::OpNum::Close)
requests_queue.push(std::move(request_info));
else if (!requests_queue.tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds()))
requests_queue->push(std::move(request_info));
else if (!requests_queue->tryPush(std::move(request_info), coordination_settings->operation_timeout_ms.totalMilliseconds()))
throw Exception("Cannot push request to queue within operation timeout", ErrorCodes::TIMEOUT_EXCEEDED);
return true;
}
@ -131,6 +240,7 @@ void KeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration
int myid = config.getInt("keeper_server.server_id");
coordination_settings->loadFromConfig("keeper_server.coordination_settings", config);
requests_queue = std::make_unique<RequestsQueue>(coordination_settings->max_requests_batch_size);
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
@ -175,7 +285,7 @@ void KeeperStorageDispatcher::shutdown()
session_cleaner_thread.join();
/// FIXME not the best way to notify
requests_queue.push({});
requests_queue->push({});
if (request_thread.joinable())
request_thread.join();
@ -192,7 +302,7 @@ void KeeperStorageDispatcher::shutdown()
server->shutdown();
KeeperStorage::RequestForSession request_for_session;
while (requests_queue.tryPop(request_for_session))
while (requests_queue->tryPop(request_for_session))
{
if (request_for_session.request)
{
@ -249,7 +359,7 @@ void KeeperStorageDispatcher::sessionCleanerTask()
request_info.session_id = dead_session;
{
std::lock_guard lock(push_request_mutex);
requests_queue.push(std::move(request_info));
requests_queue->push(std::move(request_info));
}
finishSession(dead_session);
LOG_INFO(log, "Dead session close request pushed");
@ -273,4 +383,79 @@ void KeeperStorageDispatcher::finishSession(int64_t session_id)
session_to_response_callback.erase(session_it);
}
void KeeperStorageDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error)
{
for (const auto & [session_id, request] : requests_for_sessions)
{
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = error;
responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response});
}
}
void KeeperStorageDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions)
{
if (!result->has_result())
result->get();
/// If we get some errors, than send them to clients
if (!result->get_accepted() || result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
addErrorResponses(requests_for_sessions, Coordination::Error::ZOPERATIONTIMEOUT);
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
addErrorResponses(requests_for_sessions, Coordination::Error::ZRUNTIMEINCONSISTENCY);
result = nullptr;
requests_for_sessions.clear();
}
int64_t KeeperStorageDispatcher::getSessionID(int64_t session_timeout_ms)
{
KeeperStorage::RequestForSession request_info;
std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> request = std::make_shared<Coordination::ZooKeeperSessionIDRequest>();
request->internal_id = internal_session_id_counter.fetch_add(1);
request->session_timeout_ms = session_timeout_ms;
request->server_id = server->getServerID();
request_info.request = request;
request_info.session_id = -1;
auto promise = std::make_shared<std::promise<int64_t>>();
auto future = promise->get_future();
{
std::lock_guard lock(session_to_response_callback_mutex);
new_session_id_response_callback[request->internal_id] = [promise, internal_id = request->internal_id] (const Coordination::ZooKeeperResponsePtr & response)
{
if (response->getOpNum() != Coordination::OpNum::SessionID)
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response of type {} instead of SessionID response", Coordination::toString(response->getOpNum()))));
auto session_id_response = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
if (session_id_response.internal_id != internal_id)
{
promise->set_exception(std::make_exception_ptr(Exception(ErrorCodes::LOGICAL_ERROR,
"Incorrect response with internal id {} instead of {}", session_id_response.internal_id, internal_id)));
}
if (response->error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException("SessionID request failed with error", response->error)));
promise->set_value(session_id_response.session_id);
};
}
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
throw Exception("Cannot push session id request to queue within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
}
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
throw Exception("Cannot receive session id within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
return future.get();
}
}

View File

@ -32,24 +32,42 @@ private:
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
RequestsQueue requests_queue{1};
/// Size depends on coordination settings
std::unique_ptr<RequestsQueue> requests_queue;
ResponsesQueue responses_queue;
SnapshotsQueue snapshots_queue{1};
std::atomic<bool> shutdown_called{false};
std::mutex session_to_response_callback_mutex;
/// These two maps looks similar, but serves different purposes.
/// The first map is subscription map for normal responses like
/// (get, set, list, etc.). Dispatcher determines callback for each response
/// using session id from this map.
SessionToResponseCallback session_to_response_callback;
/// But when client connects to the server for the first time it doesn't
/// have session_id. It request it from server. We give temporary
/// internal id for such requests just to much client with its response.
SessionToResponseCallback new_session_id_response_callback;
/// Reading and batching new requests from client handlers
ThreadFromGlobalPool request_thread;
/// Pushing responses to clients client handlers
/// using session_id.
ThreadFromGlobalPool responses_thread;
/// Cleaning old dead sessions
ThreadFromGlobalPool session_cleaner_thread;
/// Dumping new snapshots to disk
ThreadFromGlobalPool snapshot_thread;
/// RAFT wrapper. Most important class.
std::unique_ptr<KeeperServer> server;
Poco::Logger * log;
/// Counter for new session_id requests.
std::atomic<int64_t> internal_session_id_counter{0};
private:
void requestThread();
void responseThread();
@ -57,6 +75,14 @@ private:
void snapshotThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
/// Add error responses for requests to responses queue.
/// Clears requests.
void addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error);
/// Forcefully wait for result and sets errors if something when wrong.
/// Clears both arguments
void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions);
public:
KeeperStorageDispatcher();
@ -78,10 +104,7 @@ public:
return server->isLeaderAlive();
}
int64_t getSessionID(long session_timeout_ms)
{
return server->getSessionID(session_timeout_ms);
}
int64_t getSessionID(int64_t session_timeout_ms);
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
/// Call if we don't need any responses for this session no more (session was expired)

View File

@ -211,6 +211,8 @@ TEST(CoordinationTest, ChangelogTestSimple)
changelog.init(1, 0);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.next_slot(), 2);
EXPECT_EQ(changelog.start_index(), 1);
EXPECT_EQ(changelog.last_entry()->get_term(), 77);
@ -225,6 +227,7 @@ TEST(CoordinationTest, ChangelogTestFile)
changelog.init(1, 0);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
for (const auto & p : fs::directory_iterator("./logs"))
EXPECT_EQ(p.path(), "./logs/changelog_1_5.bin");
@ -234,6 +237,7 @@ TEST(CoordinationTest, ChangelogTestFile)
changelog.append(entry);
changelog.append(entry);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
@ -249,6 +253,8 @@ TEST(CoordinationTest, ChangelogReadWrite)
auto entry = getLogEntry("hello world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 10);
DB::KeeperLogStore changelog_reader("./logs", 1000, true);
changelog_reader.init(1, 0);
@ -276,10 +282,14 @@ TEST(CoordinationTest, ChangelogWriteAt)
auto entry = getLogEntry("hello world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 10);
auto entry = getLogEntry("writer", 77);
changelog.write_at(7, entry);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 7);
EXPECT_EQ(changelog.last_entry()->get_term(), 77);
EXPECT_EQ(changelog.entry_at(7)->get_term(), 77);
@ -305,6 +315,7 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
auto entry = getLogEntry("hello world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 7);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
@ -319,6 +330,7 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
auto entry = getLogEntry("hello world", i * 10);
changelog_reader.append(entry);
}
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
@ -331,6 +343,7 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
auto entry = getLogEntry("someentry", 77);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 11);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
@ -354,6 +367,7 @@ TEST(CoordinationTest, ChangelogTestCompaction)
auto entry = getLogEntry("hello world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 3);
@ -373,6 +387,7 @@ TEST(CoordinationTest, ChangelogTestCompaction)
changelog.append(e3);
auto e4 = getLogEntry("hello world", 60);
changelog.append(e4);
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
@ -405,6 +420,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperations)
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 10);
@ -420,6 +436,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperations)
EXPECT_EQ(apply_changelog.size(), 10);
apply_changelog.apply_pack(8, *entries);
apply_changelog.end_of_append_batch(0, 0);
EXPECT_EQ(apply_changelog.size(), 12);
EXPECT_EQ(apply_changelog.start_index(), 1);
@ -447,6 +464,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 10);
@ -458,6 +476,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
EXPECT_EQ(changelog_new.size(), 0);
changelog_new.apply_pack(5, *entries);
changelog_new.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_new.size(), 5);
EXPECT_EQ(changelog_new.start_index(), 5);
@ -468,6 +487,8 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
auto e = getLogEntry("hello_world", 110);
changelog_new.append(e);
changelog_new.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_new.size(), 6);
EXPECT_EQ(changelog_new.start_index(), 5);
EXPECT_EQ(changelog_new.next_slot(), 11);
@ -488,6 +509,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
@ -501,6 +523,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
auto e1 = getLogEntry("helloworld", 5555);
changelog.write_at(7, e1);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 7);
EXPECT_EQ(changelog.start_index(), 1);
EXPECT_EQ(changelog.next_slot(), 8);
@ -534,6 +557,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
@ -547,6 +571,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
auto e1 = getLogEntry("helloworld", 5555);
changelog.write_at(11, e1);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 11);
EXPECT_EQ(changelog.start_index(), 1);
EXPECT_EQ(changelog.next_slot(), 12);
@ -580,6 +605,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtAllFiles)
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
@ -593,6 +619,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtAllFiles)
auto e1 = getLogEntry("helloworld", 5555);
changelog.write_at(1, e1);
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 1);
EXPECT_EQ(changelog.start_index(), 1);
EXPECT_EQ(changelog.next_slot(), 2);
@ -619,6 +646,7 @@ TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 35);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
@ -635,6 +663,7 @@ TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
auto entry = getLogEntry("36_hello_world", 360);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 36);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
@ -660,6 +689,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
auto entry = getLogEntry(std::to_string(i) + "_hello_world", i * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_EQ(changelog.size(), 35);
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
@ -674,6 +704,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
DB::KeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(1, 0);
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 90);
@ -689,6 +720,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
auto entry = getLogEntry("h", 7777);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 11);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
@ -719,6 +751,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin"));
@ -735,6 +768,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
EXPECT_FALSE(fs::exists("./logs/changelog_21_40.bin"));
auto entry = getLogEntry("hello_world", 7777);
changelog_reader.append(entry);
changelog_reader.end_of_append_batch(0, 0);
EXPECT_EQ(changelog_reader.size(), 3);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
@ -757,6 +791,7 @@ TEST(CoordinationTest, ChangelogTestLostFiles)
auto entry = getLogEntry(std::to_string(i) + "_hello_world", (i + 44) * 10);
changelog.append(entry);
}
changelog.end_of_append_batch(0, 0);
EXPECT_TRUE(fs::exists("./logs/changelog_1_20.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_21_40.bin"));
@ -1105,6 +1140,7 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, uint
request->path = "/hello_" + std::to_string(i);
auto entry = getLogEntryFromZKRequest(0, 1, request);
changelog.append(entry);
changelog.end_of_append_batch(0, 0);
state_machine->commit(i, changelog.entry_at(i)->get_buf());
bool snapshot_created = false;

View File

@ -0,0 +1,23 @@
# http://hadoop.apache.org/zookeeper/docs/current/zookeeperAdmin.html
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
dataDir=/var/lib/zookeeper
# Place the dataLogDir to a separate physical disc for better performance
# dataLogDir=/disk2/zookeeper
# the port at which the clients will connect
clientPort=2181
# Leader accepts client connections. Default value is "yes". The leader machine
# coordinates updates. For higher update throughput at thes slight expense of
# read throughput the leader can be configured to not accept clients and focus
# on coordination.
leaderServes=yes

View File

@ -0,0 +1,39 @@
(ns jepsen.clickhouse-keeper.bench
(:require [clojure.tools.logging :refer :all]
[jepsen
[client :as client]])
(:import (java.lang ProcessBuilder)
(java.lang ProcessBuilder$Redirect)))
(defn exec-process-builder
[command & args]
(let [pbuilder (ProcessBuilder. (into-array (cons command args)))]
(.redirectOutput pbuilder ProcessBuilder$Redirect/INHERIT)
(.redirectError pbuilder ProcessBuilder$Redirect/INHERIT)
(let [p (.start pbuilder)]
(.waitFor p))))
(defrecord BenchClient [port]
client/Client
(open! [this test node]
this)
(setup! [this test]
this)
(invoke! [this test op]
(let [bench-opts (into [] (clojure.string/split (:bench-opts op) #" "))
bench-path (:bench-path op)
nodes (into [] (flatten (map (fn [x] (identity ["-h" (str x ":" port)])) (:nodes test))))
all-args (concat [bench-path] bench-opts nodes)]
(info "Running cmd" all-args)
(apply exec-process-builder all-args)
(assoc op :type :ok :value "ok")))
(teardown! [_ test])
(close! [_ test]))
(defn bench-client
[port]
(BenchClient. port))

View File

@ -19,12 +19,17 @@
[url]
(non-precise-cached-wget! url))
(defn get-clickhouse-scp
[path]
(c/upload path (str common-prefix "/clickhouse")))
(defn download-clickhouse
[source]
(info "Downloading clickhouse from" source)
(cond
(clojure.string/starts-with? source "rbtorrent:") (get-clickhouse-sky source)
(clojure.string/starts-with? source "http") (get-clickhouse-url source)
(.exists (io/file source)) (get-clickhouse-scp source)
:else (throw (Exception. (str "Don't know how to download clickhouse from" source)))))
(defn unpack-deb
@ -128,11 +133,11 @@
db/LogFiles
(log-files [_ test node]
(c/su
(if (cu/exists? pid-file-path)
(do
(info node "Collecting traces")
(collect-traces test node))
(info node "Pid files doesn't exists"))
;(if (cu/exists? pid-file-path)
;(do
; (info node "Collecting traces")
; (collect-traces test node))
;(info node "Pid files doesn't exists"))
(kill-clickhouse! node test)
(if (cu/exists? coordination-data-dir)
(do

View File

@ -4,11 +4,13 @@
[clojure.pprint :refer [pprint]]
[jepsen.clickhouse-keeper.set :as set]
[jepsen.clickhouse-keeper.db :refer :all]
[jepsen.clickhouse-keeper.zookeeperdb :refer :all]
[jepsen.clickhouse-keeper.nemesis :as custom-nemesis]
[jepsen.clickhouse-keeper.register :as register]
[jepsen.clickhouse-keeper.unique :as unique]
[jepsen.clickhouse-keeper.queue :as queue]
[jepsen.clickhouse-keeper.counter :as counter]
[jepsen.clickhouse-keeper.bench :as bench]
[jepsen.clickhouse-keeper.constants :refer :all]
[clojure.string :as str]
[jepsen
@ -72,12 +74,29 @@
:validate [pos? "Must be a positive integer."]]
[nil, "--lightweight-run" "Subset of workloads/nemesises which is simple to validate"]
[nil, "--reuse-binary" "Use already downloaded binary if it exists, don't remove it on shutdown"]
[nil, "--bench" "Run perf-test mode"]
[nil, "--zookeeper-version VERSION" "Run zookeeper with version"
:default ""]
[nil, "--bench-opts STR" "Run perf-test mode"
:default "--generator list_medium_nodes -c 30 -i 1000"]
["-c" "--clickhouse-source URL" "URL for clickhouse deb or tgz package"
:default "https://clickhouse-builds.s3.yandex.net/21677/ef82333089156907a0979669d9374c2e18daabe5/clickhouse_build_check/clang-11_relwithdebuginfo_none_bundled_unsplitted_disable_False_deb/clickhouse-common-static_21.4.1.6313_amd64.deb"]])
:default "https://clickhouse-builds.s3.yandex.net/21677/ef82333089156907a0979669d9374c2e18daabe5/clickhouse_build_check/clang-11_relwithdebuginfo_none_bundled_unsplitted_disable_False_deb/clickhouse-common-static_21.4.1.6313_amd64.deb"]
[nil "--bench-path path" "Path to keeper-bench util"
:default "/home/alesap/code/cpp/BuildCH/utils/keeper-bench/keeper-bench"]])
(defn clickhouse-keeper-test
"Given an options map from the command line runner (e.g. :nodes, :ssh,
:concurrency, ...), constructs a test map."
(defn get-db
[opts]
(if (empty? (:zookeeper-version opts))
(db (:clickhouse-source opts) (boolean (:reuse-binary opts)))
(zookeeper-db (:zookeeper-version opts))))
(defn get-port
[opts]
(if (empty? (:zookeeper-version opts))
9181
2181))
(defn clickhouse-func-tests
[opts]
(info "Test opts\n" (with-out-str (pprint opts)))
(let [quorum (boolean (:quorum opts))
@ -87,7 +106,7 @@
opts
{:name (str "clickhouse-keeper-quorum=" quorum "-" (name (:workload opts)) "-" (name (:nemesis opts)))
:os ubuntu/os
:db (db (:clickhouse-source opts) (boolean (:reuse-binary opts)))
:db (get-db opts)
:pure-generators true
:client (:client workload)
:nemesis (:nemesis current-nemesis)
@ -105,6 +124,30 @@
(gen/sleep 10)
(gen/clients (:final-generator workload)))})))
(defn clickhouse-perf-test
[opts]
(info "Starting performance test")
(let [dct {:type :invoke :bench-opts (:bench-opts opts) :bench-path (:bench-path opts)}]
(merge tests/noop-test
opts
{:name (str "clickhouse-keeper-perf")
:os ubuntu/os
:db (get-db opts)
:pure-generators true
:client (bench/bench-client (get-port opts))
:nemesis nemesis/noop
:generator (->> dct
(gen/stagger 1)
(gen/nemesis nil))})))
(defn clickhouse-keeper-test
"Given an options map from the command line runner (e.g. :nodes, :ssh,
:concurrency, ...), constructs a test map."
[opts]
(if (boolean (:bench opts))
(clickhouse-perf-test opts)
(clickhouse-func-tests opts)))
(def all-nemesises (keys custom-nemesis/custom-nemesises))
(def all-workloads (keys workloads))

View File

@ -45,7 +45,7 @@
(defn zk-connect
[host port timeout]
(exec-with-retries 15 (fn [] (zk/connect (str host ":" port) :timeout-msec timeout))))
(exec-with-retries 30 (fn [] (zk/connect (str host ":" port) :timeout-msec timeout))))
(defn zk-create-range
[conn n]

View File

@ -0,0 +1,64 @@
(ns jepsen.clickhouse-keeper.zookeeperdb
(:require [clojure.tools.logging :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]
[clojure.java.io :as io]
[jepsen
[control :as c]
[db :as db]]
[jepsen.os.ubuntu :as ubuntu]))
(defn zk-node-ids
"Returns a map of node names to node ids."
[test]
(->> test
:nodes
(map-indexed (fn [i node] [node (inc i)]))
(into {})))
(defn zk-node-id
"Given a test and a node name from that test, returns the ID for that node."
[test node]
((zk-node-ids test) node))
(defn zoo-cfg-servers
"Constructs a zoo.cfg fragment for servers."
[test mynode]
(->> (zk-node-ids test)
(map (fn [[node id]]
(str "server." id "=" (if (= (name node) mynode) "0.0.0.0" (name node)) ":2888:3888")))
(clojure.string/join "\n")))
(defn zookeeper-db
"Zookeeper DB for a particular version."
[version]
(reify db/DB
(setup! [_ test node]
(c/su
(info node "Installing ZK" version)
(c/exec :apt-get :update)
(c/exec :apt-get :install (str "zookeeper=" version))
(c/exec :apt-get :install (str "zookeeperd=" version))
(c/exec :echo (zk-node-id test node) :> "/etc/zookeeper/conf/myid")
(c/exec :echo (str (slurp (io/resource "zoo.cfg"))
"\n"
(zoo-cfg-servers test node))
:> "/etc/zookeeper/conf/zoo.cfg")
(info node "ZK restarting")
(c/exec :service :zookeeper :restart)
(info "Connecting to zk" (name node))
(zk-connect (name node) 2181 1000)
(info node "ZK ready")))
(teardown! [_ test node]
(info node "tearing down ZK")
(c/su
(c/exec :service :zookeeper :stop :|| true)
(c/exec :rm :-rf
(c/lit "/var/lib/zookeeper/version-*")
(c/lit "/var/log/zookeeper/*"))))
db/LogFiles
(log-files [_ test node]
["/var/log/zookeeper/zookeeper.log"])))

View File

@ -32,6 +32,7 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
add_subdirectory (db-generator)
add_subdirectory (wal-dump)
add_subdirectory (check-mysql-binlog)
add_subdirectory (keeper-bench)
if (USE_NURAFT)
add_subdirectory (keeper-data-dumper)

View File

@ -0,0 +1,2 @@
add_executable(keeper-bench Generator.cpp Runner.cpp Stats.cpp main.cpp)
target_link_libraries(keeper-bench PRIVATE dbms)

View File

@ -0,0 +1,238 @@
#include "Generator.h"
#include <random>
#include <filesystem>
using namespace Coordination;
using namespace zkutil;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
namespace
{
std::string generateRandomString(size_t length)
{
if (length == 0)
return "";
static const auto & chars = "0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
static pcg64 rng(randomSeed());
static std::uniform_int_distribution<size_t> pick(0, sizeof(chars) - 2);
std::string s;
s.reserve(length);
while (length--)
s += chars[pick(rng)];
return s;
}
}
std::string generateRandomPath(const std::string & prefix, size_t length)
{
return std::filesystem::path(prefix) / generateRandomString(length);
}
std::string generateRandomData(size_t size)
{
return generateRandomString(size);
}
void CreateRequestGenerator::startup(Coordination::ZooKeeper & zookeeper)
{
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto create_callback = [promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
zookeeper.create(path_prefix, "", false, false, default_acls, create_callback);
future.get();
}
ZooKeeperRequestPtr CreateRequestGenerator::generate()
{
auto request = std::make_shared<ZooKeeperCreateRequest>();
request->acls = default_acls;
size_t plength = 5;
if (path_length)
plength = *path_length;
auto path_candidate = generateRandomPath(path_prefix, plength);
while (paths_created.count(path_candidate))
path_candidate = generateRandomPath(path_prefix, plength);
paths_created.insert(path_candidate);
request->path = path_candidate;
if (data_size)
request->data = generateRandomData(*data_size);
return request;
}
void GetRequestGenerator::startup(Coordination::ZooKeeper & zookeeper)
{
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto create_callback = [promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
zookeeper.create(path_prefix, "", false, false, default_acls, create_callback);
future.get();
size_t total_nodes = 1;
if (num_nodes)
total_nodes = *num_nodes;
for (size_t i = 0; i < total_nodes; ++i)
{
auto path = generateRandomPath(path_prefix, 5);
while (std::find(paths_to_get.begin(), paths_to_get.end(), path) != paths_to_get.end())
path = generateRandomPath(path_prefix, 5);
auto create_promise = std::make_shared<std::promise<void>>();
auto create_future = create_promise->get_future();
auto callback = [create_promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
create_promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
create_promise->set_value();
};
std::string data;
if (nodes_data_size)
data = generateRandomString(*nodes_data_size);
zookeeper.create(path, data, false, false, default_acls, callback);
create_future.get();
paths_to_get.push_back(path);
}
}
Coordination::ZooKeeperRequestPtr GetRequestGenerator::generate()
{
auto request = std::make_shared<ZooKeeperGetRequest>();
size_t path_index = distribution(rng);
request->path = paths_to_get[path_index];
return request;
}
void ListRequestGenerator::startup(Coordination::ZooKeeper & zookeeper)
{
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
auto create_callback = [promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value();
};
zookeeper.create(path_prefix, "", false, false, default_acls, create_callback);
future.get();
size_t total_nodes = 1;
if (num_nodes)
total_nodes = *num_nodes;
size_t path_length = 5;
if (paths_length)
path_length = *paths_length;
for (size_t i = 0; i < total_nodes; ++i)
{
auto path = generateRandomPath(path_prefix, path_length);
auto create_promise = std::make_shared<std::promise<void>>();
auto create_future = create_promise->get_future();
auto callback = [create_promise] (const CreateResponse & response)
{
if (response.error != Coordination::Error::ZOK)
create_promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
create_promise->set_value();
};
zookeeper.create(path, "", false, false, default_acls, callback);
create_future.get();
}
}
Coordination::ZooKeeperRequestPtr ListRequestGenerator::generate()
{
auto request = std::make_shared<ZooKeeperListRequest>();
request->path = path_prefix;
return request;
}
std::unique_ptr<IGenerator> getGenerator(const std::string & name)
{
if (name == "create_no_data")
{
return std::make_unique<CreateRequestGenerator>();
}
else if (name == "create_small_data")
{
return std::make_unique<CreateRequestGenerator>("/create_generator", 5, 32);
}
else if (name == "create_medium_data")
{
return std::make_unique<CreateRequestGenerator>("/create_generator", 5, 1024);
}
else if (name == "create_big_data")
{
return std::make_unique<CreateRequestGenerator>("/create_generator", 5, 512 * 1024);
}
else if (name == "get_no_data")
{
return std::make_unique<GetRequestGenerator>("/get_generator", 10, 0);
}
else if (name == "get_small_data")
{
return std::make_unique<GetRequestGenerator>("/get_generator", 10, 32);
}
else if (name == "get_medium_data")
{
return std::make_unique<GetRequestGenerator>("/get_generator", 10, 1024);
}
else if (name == "get_big_data")
{
return std::make_unique<GetRequestGenerator>("/get_generator", 10, 512 * 1024);
}
else if (name == "list_no_nodes")
{
return std::make_unique<ListRequestGenerator>("/list_generator", 0, 1);
}
else if (name == "list_few_nodes")
{
return std::make_unique<ListRequestGenerator>("/list_generator", 10, 5);
}
else if (name == "list_medium_nodes")
{
return std::make_unique<ListRequestGenerator>("/list_generator", 1000, 5);
}
else if (name == "list_a_lot_nodes")
{
return std::make_unique<ListRequestGenerator>("/list_generator", 100000, 5);
}
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknown generator {}", name);
}

View File

@ -0,0 +1,107 @@
#pragma once
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <functional>
#include <optional>
#include <pcg-random/pcg_random.hpp>
#include <Common/randomSeed.h>
std::string generateRandomPath(const std::string & prefix, size_t length = 5);
std::string generateRandomData(size_t size);
class IGenerator
{
public:
IGenerator()
{
Coordination::ACL acl;
acl.permissions = Coordination::ACL::All;
acl.scheme = "world";
acl.id = "anyone";
default_acls.emplace_back(std::move(acl));
}
virtual void startup(Coordination::ZooKeeper & /*zookeeper*/) {}
virtual Coordination::ZooKeeperRequestPtr generate() = 0;
virtual ~IGenerator() = default;
Coordination::ACLs default_acls;
};
class CreateRequestGenerator final : public IGenerator
{
public:
explicit CreateRequestGenerator(
std::string path_prefix_ = "/create_generator",
std::optional<uint64_t> path_length_ = std::nullopt,
std::optional<uint64_t> data_size_ = std::nullopt)
: path_prefix(path_prefix_)
, path_length(path_length_)
, data_size(data_size_)
{}
void startup(Coordination::ZooKeeper & zookeeper) override;
Coordination::ZooKeeperRequestPtr generate() override;
private:
std::string path_prefix;
std::optional<uint64_t> path_length;
std::optional<uint64_t> data_size;
std::unordered_set<std::string> paths_created;
};
class GetRequestGenerator final : public IGenerator
{
public:
explicit GetRequestGenerator(
std::string path_prefix_ = "/get_generator",
std::optional<uint64_t> num_nodes_ = std::nullopt,
std::optional<uint64_t> nodes_data_size_ = std::nullopt)
: path_prefix(path_prefix_)
, num_nodes(num_nodes_)
, nodes_data_size(nodes_data_size_)
, rng(randomSeed())
, distribution(0, num_nodes ? *num_nodes - 1 : 0)
{}
void startup(Coordination::ZooKeeper & zookeeper) override;
Coordination::ZooKeeperRequestPtr generate() override;
private:
std::string path_prefix;
std::optional<uint64_t> num_nodes;
std::optional<uint64_t> nodes_data_size;
std::vector<std::string> paths_to_get;
pcg64 rng;
std::uniform_int_distribution<size_t> distribution;
};
class ListRequestGenerator final : public IGenerator
{
public:
explicit ListRequestGenerator(
std::string path_prefix_ = "/list_generator",
std::optional<uint64_t> num_nodes_ = std::nullopt,
std::optional<uint64_t> paths_length_ = std::nullopt)
: path_prefix(path_prefix_)
, num_nodes(num_nodes_)
, paths_length(paths_length_)
{}
void startup(Coordination::ZooKeeper & zookeeper) override;
Coordination::ZooKeeperRequestPtr generate() override;
private:
std::string path_prefix;
std::optional<uint64_t> num_nodes;
std::optional<uint64_t> paths_length;
};
std::unique_ptr<IGenerator> getGenerator(const std::string & name);

View File

@ -0,0 +1,188 @@
#include "Runner.h"
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_BLOCK_SIGNAL;
}
}
void Runner::thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> & zookeepers)
{
Coordination::ZooKeeperRequestPtr request;
/// Randomly choosing connection index
pcg64 rng(randomSeed());
std::uniform_int_distribution<size_t> distribution(0, zookeepers.size() - 1);
/// In these threads we do not accept INT signal.
sigset_t sig_set;
if (sigemptyset(&sig_set)
|| sigaddset(&sig_set, SIGINT)
|| pthread_sigmask(SIG_BLOCK, &sig_set, nullptr))
{
DB::throwFromErrno("Cannot block signal.", DB::ErrorCodes::CANNOT_BLOCK_SIGNAL);
}
while (true)
{
bool extracted = false;
while (!extracted)
{
extracted = queue.tryPop(request, 100);
if (shutdown
|| (max_iterations && requests_executed >= max_iterations))
{
return;
}
}
const auto connection_index = distribution(rng);
auto & zk = zookeepers[connection_index];
auto promise = std::make_shared<std::promise<size_t>>();
auto future = promise->get_future();
Coordination::ResponseCallback callback = [promise](const Coordination::Response & response)
{
if (response.error != Coordination::Error::ZOK)
promise->set_exception(std::make_exception_ptr(zkutil::KeeperException(response.error)));
else
promise->set_value(response.bytesSize());
};
Stopwatch watch;
zk->executeGenericRequest(request, callback);
try
{
auto response_size = future.get();
double seconds = watch.elapsedSeconds();
std::lock_guard lock(mutex);
if (request->isReadRequest())
info->addRead(seconds, 1, request->bytesSize() + response_size);
else
info->addWrite(seconds, 1, request->bytesSize() + response_size);
}
catch (...)
{
if (!continue_on_error)
{
shutdown = true;
throw;
}
std::cerr << DB::getCurrentExceptionMessage(true, true /*check embedded stack trace*/) << std::endl;
}
++requests_executed;
}
}
bool Runner::tryPushRequestInteractively(const Coordination::ZooKeeperRequestPtr & request, DB::InterruptListener & interrupt_listener)
{
bool inserted = false;
while (!inserted)
{
inserted = queue.tryPush(request, 100);
if (shutdown)
{
/// An exception occurred in a worker
return false;
}
if (max_time > 0 && total_watch.elapsedSeconds() >= max_time)
{
std::cout << "Stopping launch of queries. Requested time limit is exhausted.\n";
return false;
}
if (interrupt_listener.check())
{
std::cout << "Stopping launch of queries. SIGINT received." << std::endl;
return false;
}
if (delay > 0 && delay_watch.elapsedSeconds() > delay)
{
printNumberOfRequestsExecuted(requests_executed);
std::lock_guard lock(mutex);
report(info, concurrency);
delay_watch.restart();
}
}
return true;
}
void Runner::runBenchmark()
{
auto aux_connections = getConnections();
std::cerr << "Preparing to run\n";
generator->startup(*aux_connections[0]);
std::cerr << "Prepared\n";
try
{
for (size_t i = 0; i < concurrency; ++i)
{
auto connections = getConnections();
pool.scheduleOrThrowOnError([this, connections]() mutable { thread(connections); });
}
}
catch (...)
{
pool.wait();
throw;
}
DB::InterruptListener interrupt_listener;
delay_watch.restart();
/// Push queries into queue
for (size_t i = 0; !max_iterations || i < max_iterations; ++i)
{
if (!tryPushRequestInteractively(generator->generate(), interrupt_listener))
{
shutdown = true;
break;
}
}
pool.wait();
total_watch.stop();
printNumberOfRequestsExecuted(requests_executed);
std::lock_guard lock(mutex);
report(info, concurrency);
}
std::vector<std::shared_ptr<Coordination::ZooKeeper>> Runner::getConnections()
{
std::vector<std::shared_ptr<Coordination::ZooKeeper>> zookeepers;
for (const auto & host_string : hosts_strings)
{
Coordination::ZooKeeper::Node node{Poco::Net::SocketAddress{host_string}, false};
std::vector<Coordination::ZooKeeper::Node> nodes;
nodes.push_back(node);
zookeepers.emplace_back(std::make_shared<Coordination::ZooKeeper>(
nodes,
"", /*chroot*/
"", /*identity type*/
"", /*identity*/
Poco::Timespan(0, 30000 * 1000),
Poco::Timespan(0, 1000 * 1000),
Poco::Timespan(0, 10000 * 1000)));
}
return zookeepers;
}

View File

@ -0,0 +1,79 @@
#pragma once
#include <Common/ZooKeeper/ZooKeeperImpl.h>
#include "Generator.h"
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadPool.h>
#include <pcg-random/pcg_random.hpp>
#include <Common/randomSeed.h>
#include <Common/InterruptListener.h>
#include <Core/Types.h>
#include "Stats.h"
using Ports = std::vector<UInt16>;
using Strings = std::vector<std::string>;
class Runner
{
public:
Runner(
size_t concurrency_,
const std::string & generator_name,
const Strings & hosts_strings_,
double max_time_,
double delay_,
bool continue_on_error_,
size_t max_iterations_)
: concurrency(concurrency_)
, pool(concurrency)
, hosts_strings(hosts_strings_)
, generator(getGenerator(generator_name))
, max_time(max_time_)
, delay(delay_)
, continue_on_error(continue_on_error_)
, max_iterations(max_iterations_)
, info(std::make_shared<Stats>())
, queue(concurrency)
{
}
void thread(std::vector<std::shared_ptr<Coordination::ZooKeeper>> & zookeepers);
void printNumberOfRequestsExecuted(size_t num)
{
std::cerr << "Requests executed: " << num << ".\n";
}
bool tryPushRequestInteractively(const Coordination::ZooKeeperRequestPtr & request, DB::InterruptListener & interrupt_listener);
void runBenchmark();
private:
size_t concurrency = 1;
ThreadPool pool;
Strings hosts_strings;
std::unique_ptr<IGenerator> generator;
double max_time = 0;
double delay = 1;
bool continue_on_error = false;
std::atomic<size_t> max_iterations = 0;
std::atomic<size_t> requests_executed = 0;
std::atomic<bool> shutdown = false;
std::shared_ptr<Stats> info;
Stopwatch total_watch;
Stopwatch delay_watch;
std::mutex mutex;
using Queue = ConcurrentBoundedQueue<Coordination::ZooKeeperRequestPtr>;
Queue queue;
std::vector<std::shared_ptr<Coordination::ZooKeeper>> getConnections();
};

View File

@ -0,0 +1,67 @@
#include "Stats.h"
#include <iostream>
void report(std::shared_ptr<Stats> & info, size_t concurrency)
{
std::cerr << "\n";
/// Avoid zeros, nans or exceptions
if (0 == info->read_requests && 0 == info->write_requests)
return;
double read_seconds = info->read_work_time / concurrency;
double write_seconds = info->write_work_time / concurrency;
std::cerr << "read requests " << info->read_requests << ", write requests " << info->write_requests << ", ";
if (info->errors)
{
std::cerr << "errors " << info->errors << ", ";
}
if (0 != info->read_requests)
{
std::cerr
<< "Read RPS: " << (info->read_requests / read_seconds) << ", "
<< "Read MiB/s: " << (info->requests_read_bytes / read_seconds / 1048576);
if (0 != info->write_requests)
std::cerr << ", ";
}
if (0 != info->write_requests)
{
std::cerr
<< "Write RPS: " << (info->write_requests / write_seconds) << ", "
<< "Write MiB/s: " << (info->requests_write_bytes / write_seconds / 1048576) << ". "
<< "\n";
}
std::cerr << "\n";
auto print_percentile = [&](double percent, Stats::Sampler & sampler)
{
std::cerr << percent << "%\t\t";
std::cerr << sampler.quantileNearest(percent / 100.0) << " sec.\t";
std::cerr << "\n";
};
if (0 != info->read_requests)
{
std::cerr << "Read sampler:\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent, info->read_sampler);
print_percentile(95, info->read_sampler);
print_percentile(99, info->read_sampler);
print_percentile(99.9, info->read_sampler);
print_percentile(99.99, info->read_sampler);
}
if (0 != info->write_requests)
{
std::cerr << "Write sampler:\n";
for (int percent = 0; percent <= 90; percent += 10)
print_percentile(percent, info->write_sampler);
print_percentile(95, info->write_sampler);
print_percentile(99, info->write_sampler);
print_percentile(99.9, info->write_sampler);
print_percentile(99.99, info->write_sampler);
}
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <vector>
#include <atomic>
#include <AggregateFunctions/ReservoirSampler.h>
struct Stats
{
std::atomic<size_t> read_requests{0};
std::atomic<size_t> write_requests{0};
size_t errors = 0;
size_t requests_write_bytes = 0;
size_t requests_read_bytes = 0;
double read_work_time = 0;
double write_work_time = 0;
using Sampler = ReservoirSampler<double>;
Sampler read_sampler {1 << 16};
Sampler write_sampler {1 << 16};
void addRead(double seconds, size_t requests_inc, size_t bytes_inc)
{
read_work_time += seconds;
read_requests += requests_inc;
requests_read_bytes += bytes_inc;
read_sampler.insert(seconds);
}
void addWrite(double seconds, size_t requests_inc, size_t bytes_inc)
{
write_work_time += seconds;
write_requests += requests_inc;
requests_write_bytes += bytes_inc;
write_sampler.insert(seconds);
}
void clear()
{
read_requests = 0;
write_requests = 0;
read_work_time = 0;
write_work_time = 0;
requests_read_bytes = 0;
requests_write_bytes = 0;
read_sampler.clear();
write_sampler.clear();
}
};
void report(std::shared_ptr<Stats> & info, size_t concurrency);

View File

@ -0,0 +1,61 @@
#include <iostream>
#include <boost/program_options.hpp>
#include "Runner.h"
#include "Stats.h"
#include "Generator.h"
#include <Common/TerminalSize.h>
#include <Core/Types.h>
using namespace std;
int main(int argc, char *argv[])
{
bool print_stacktrace = true;
try
{
using boost::program_options::value;
boost::program_options::options_description desc = createOptionsDescription("Allowed options", getTerminalWidth());
desc.add_options()
("help", "produce help message")
("generator", value<std::string>()->default_value("create_small_data"), "query to execute")
("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries")
("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("hosts,h", value<Strings>()->multitoken(), "")
("continue_on_errors", "continue testing even if a query fails")
("reconnect", "establish new connection for every query")
;
boost::program_options::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options);
boost::program_options::notify(options);
if (options.count("help"))
{
std::cout << "Usage: " << argv[0] << " [options] < queries.txt\n";
std::cout << desc << "\n";
return 1;
}
Runner runner(options["concurrency"].as<unsigned>(),
options["generator"].as<std::string>(),
options["hosts"].as<Strings>(),
options["timelimit"].as<double>(),
options["delay"].as<double>(),
options.count("continue_on_errors"),
options["iterations"].as<size_t>());
runner.runBenchmark();
return 0;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(print_stacktrace, true) << std::endl;
return DB::getCurrentExceptionCode();
}
}