Merge pull request #57479 from mkmkme/mkmkme/zookeeper-read-only

Add support for read-only mode in ZooKeeper
This commit is contained in:
Alexey Milovidov 2023-12-07 20:22:00 +01:00 committed by GitHub
commit 429ed34607
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 22 additions and 5 deletions

View File

@ -144,6 +144,7 @@ const char * errorMessage(Error code)
case Error::ZCLOSING: return "ZooKeeper is closing"; case Error::ZCLOSING: return "ZooKeeper is closing";
case Error::ZNOTHING: return "(not error) no server responses to process"; case Error::ZNOTHING: return "(not error) no server responses to process";
case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored"; case Error::ZSESSIONMOVED: return "Session moved to another server, so operation is ignored";
case Error::ZNOTREADONLY: return "State-changing request is passed to read-only server";
} }
UNREACHABLE(); UNREACHABLE();
@ -156,7 +157,8 @@ bool isHardwareError(Error zk_return_code)
|| zk_return_code == Error::ZSESSIONMOVED || zk_return_code == Error::ZSESSIONMOVED
|| zk_return_code == Error::ZCONNECTIONLOSS || zk_return_code == Error::ZCONNECTIONLOSS
|| zk_return_code == Error::ZMARSHALLINGERROR || zk_return_code == Error::ZMARSHALLINGERROR
|| zk_return_code == Error::ZOPERATIONTIMEOUT; || zk_return_code == Error::ZOPERATIONTIMEOUT
|| zk_return_code == Error::ZNOTREADONLY;
} }
bool isUserError(Error zk_return_code) bool isUserError(Error zk_return_code)
@ -196,4 +198,3 @@ void MultiResponse::removeRootPath(const String & root_path)
} }
} }

View File

@ -109,7 +109,8 @@ enum class Error : int32_t
ZAUTHFAILED = -115, /// Client authentication failed ZAUTHFAILED = -115, /// Client authentication failed
ZCLOSING = -116, /// ZooKeeper is closing ZCLOSING = -116, /// ZooKeeper is closing
ZNOTHING = -117, /// (not error) no server responses to process ZNOTHING = -117, /// (not error) no server responses to process
ZSESSIONMOVED = -118 /// Session moved to another server, so operation is ignored ZSESSIONMOVED = -118, /// Session moved to another server, so operation is ignored
ZNOTREADONLY = -119, /// State-changing request is passed to read-only server
}; };
/// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors /// Network errors and similar. You should reinitialize ZooKeeper session in case of these errors
@ -445,6 +446,7 @@ enum State
CONNECTING = 1, CONNECTING = 1,
ASSOCIATING = 2, ASSOCIATING = 2,
CONNECTED = 3, CONNECTED = 3,
READONLY = 5,
NOTCONNECTED = 999 NOTCONNECTED = 999
}; };

View File

@ -323,6 +323,9 @@ Coordination::Error ZooKeeper::tryCreate(const std::string & path, const std::st
{ {
Coordination::Error code = createImpl(path, data, mode, path_created); Coordination::Error code = createImpl(path, data, mode, path_created);
if (code == Coordination::Error::ZNOTREADONLY && exists(path))
return Coordination::Error::ZNODEEXISTS;
if (!(code == Coordination::Error::ZOK || if (!(code == Coordination::Error::ZOK ||
code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNONODE ||
code == Coordination::Error::ZNODEEXISTS || code == Coordination::Error::ZNODEEXISTS ||
@ -345,6 +348,8 @@ void ZooKeeper::createIfNotExists(const std::string & path, const std::string &
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS) if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
return; return;
else if (code == Coordination::Error::ZNOTREADONLY && exists(path))
return;
else else
throw KeeperException::fromPath(code, path); throw KeeperException::fromPath(code, path);
} }

View File

@ -51,6 +51,7 @@ static constexpr int32_t KEEPER_PROTOCOL_VERSION_CONNECTION_REJECT = 42;
static constexpr int32_t CLIENT_HANDSHAKE_LENGTH = 44; static constexpr int32_t CLIENT_HANDSHAKE_LENGTH = 44;
static constexpr int32_t CLIENT_HANDSHAKE_LENGTH_WITH_READONLY = 45; static constexpr int32_t CLIENT_HANDSHAKE_LENGTH_WITH_READONLY = 45;
static constexpr int32_t SERVER_HANDSHAKE_LENGTH = 36; static constexpr int32_t SERVER_HANDSHAKE_LENGTH = 36;
static constexpr int32_t SERVER_HANDSHAKE_LENGTH_WITH_READONLY = 37;
static constexpr int32_t PASSWORD_LENGTH = 16; static constexpr int32_t PASSWORD_LENGTH = 16;
/// ZooKeeper has 1 MB node size and serialization limit by default, /// ZooKeeper has 1 MB node size and serialization limit by default,

View File

@ -1,3 +1,4 @@
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#include <Common/ZooKeeper/ZooKeeperImpl.h> #include <Common/ZooKeeper/ZooKeeperImpl.h>
#include <IO/Operators.h> #include <IO/Operators.h>
@ -552,12 +553,13 @@ void ZooKeeper::connect(
void ZooKeeper::sendHandshake() void ZooKeeper::sendHandshake()
{ {
int32_t handshake_length = 44; int32_t handshake_length = 45;
int64_t last_zxid_seen = 0; int64_t last_zxid_seen = 0;
int32_t timeout = args.session_timeout_ms; int32_t timeout = args.session_timeout_ms;
int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero. int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero.
constexpr int32_t passwd_len = 16; constexpr int32_t passwd_len = 16;
std::array<char, passwd_len> passwd {}; std::array<char, passwd_len> passwd {};
bool read_only = true;
write(handshake_length); write(handshake_length);
if (use_compression) if (use_compression)
@ -568,6 +570,7 @@ void ZooKeeper::sendHandshake()
write(timeout); write(timeout);
write(previous_session_id); write(previous_session_id);
write(passwd); write(passwd);
write(read_only);
flushWriteBuffer(); flushWriteBuffer();
} }
@ -577,9 +580,10 @@ void ZooKeeper::receiveHandshake()
int32_t protocol_version_read; int32_t protocol_version_read;
int32_t timeout; int32_t timeout;
std::array<char, PASSWORD_LENGTH> passwd; std::array<char, PASSWORD_LENGTH> passwd;
bool read_only;
read(handshake_length); read(handshake_length);
if (handshake_length != SERVER_HANDSHAKE_LENGTH) if (handshake_length != SERVER_HANDSHAKE_LENGTH && handshake_length != SERVER_HANDSHAKE_LENGTH_WITH_READONLY)
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected handshake length received: {}", handshake_length); throw Exception(Error::ZMARSHALLINGERROR, "Unexpected handshake length received: {}", handshake_length);
read(protocol_version_read); read(protocol_version_read);
@ -607,6 +611,8 @@ void ZooKeeper::receiveHandshake()
read(session_id); read(session_id);
read(passwd); read(passwd);
if (handshake_length == SERVER_HANDSHAKE_LENGTH_WITH_READONLY)
read(read_only);
} }

View File

@ -53,6 +53,7 @@ DataTypePtr getCoordinationErrorCodesEnumType()
{"ZCLOSING", static_cast<Int8>(Coordination::Error::ZCLOSING)}, {"ZCLOSING", static_cast<Int8>(Coordination::Error::ZCLOSING)},
{"ZNOTHING", static_cast<Int8>(Coordination::Error::ZNOTHING)}, {"ZNOTHING", static_cast<Int8>(Coordination::Error::ZNOTHING)},
{"ZSESSIONMOVED", static_cast<Int8>(Coordination::Error::ZSESSIONMOVED)}, {"ZSESSIONMOVED", static_cast<Int8>(Coordination::Error::ZSESSIONMOVED)},
{"ZNOTREADONLY", static_cast<Int8>(Coordination::Error::ZNOTREADONLY)},
}); });
} }
@ -115,6 +116,7 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"CONNECTING", static_cast<Int16>(Coordination::State::CONNECTING)}, {"CONNECTING", static_cast<Int16>(Coordination::State::CONNECTING)},
{"ASSOCIATING", static_cast<Int16>(Coordination::State::ASSOCIATING)}, {"ASSOCIATING", static_cast<Int16>(Coordination::State::ASSOCIATING)},
{"CONNECTED", static_cast<Int16>(Coordination::State::CONNECTED)}, {"CONNECTED", static_cast<Int16>(Coordination::State::CONNECTED)},
{"READONLY", static_cast<Int16>(Coordination::State::READONLY)},
{"NOTCONNECTED", static_cast<Int16>(Coordination::State::NOTCONNECTED)}, {"NOTCONNECTED", static_cast<Int16>(Coordination::State::NOTCONNECTED)},
}); });