Support 64bit XID in Keeper

This commit is contained in:
Antonio Andelic 2024-09-24 16:47:50 +02:00
parent 64a97632b3
commit 2419e3c579
25 changed files with 235 additions and 95 deletions

View File

@ -578,7 +578,7 @@ public:
virtual String getConnectedHostPort() const = 0;
/// Get the xid of current connection.
virtual int32_t getConnectionXid() const = 0;
virtual int64_t getConnectionXid() const = 0;
/// Useful to check owner of ephemeral node.
virtual int64_t getSessionID() const = 0;

View File

@ -41,7 +41,7 @@ public:
bool isExpired() const override { return expired; }
std::optional<int8_t> getConnectedNodeIdx() const override { return 0; }
String getConnectedHostPort() const override { return "TestKeeper:0000"; }
int32_t getConnectionXid() const override { return 0; }
int64_t getConnectionXid() const override { return 0; }
int64_t getSessionID() const override { return 0; }

View File

@ -1565,7 +1565,7 @@ String ZooKeeper::getConnectedHostPort() const
return impl->getConnectedHostPort();
}
int32_t ZooKeeper::getConnectionXid() const
int64_t ZooKeeper::getConnectionXid() const
{
return impl->getConnectionXid();
}

View File

@ -627,7 +627,7 @@ public:
std::optional<int8_t> getConnectedHostIdx() const;
String getConnectedHostPort() const;
int32_t getConnectionXid() const;
int64_t getConnectionXid() const;
String getConnectedHostAvailabilityZone() const;

View File

@ -101,6 +101,8 @@ void ZooKeeperArgs::initFromKeeperServerSection(const Poco::Util::AbstractConfig
session_timeout_ms = config.getInt(session_timeout_key);
}
use_xid_64 = config.getBool(std::string{config_name} + ".use_xid_64", false);
Poco::Util::AbstractConfiguration::Keys keys;
std::string raft_configuration_key = std::string{config_name} + ".raft_configuration";
config.keys(raft_configuration_key, keys);
@ -241,6 +243,10 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
{
use_compression = config.getBool(config_name + "." + key);
}
else if (key == "use_xid_64")
{
use_xid_64 = config.getBool(config_name + "." + key);
}
else if (key == "availability_zone_autodetect")
{
availability_zone_autodetect = config.getBool(config_name + "." + key);

View File

@ -49,6 +49,7 @@ struct ZooKeeperArgs
UInt64 send_sleep_ms = 0;
UInt64 recv_sleep_ms = 0;
bool use_compression = false;
bool use_xid_64 = false;
bool prefer_local_availability_zone = false;
bool availability_zone_autodetect = false;

View File

@ -16,14 +16,23 @@ namespace Coordination
using namespace DB;
void ZooKeeperResponse::write(WriteBuffer & out) const
void ZooKeeperResponse::write(WriteBuffer & out, bool use_xid_64) const
{
auto response_size = Coordination::size(xid) + Coordination::size(zxid) + Coordination::size(error);
size_t response_size = 0;
if (use_xid_64)
response_size += sizeof(int64_t);
else
response_size += sizeof(int32_t);
response_size += Coordination::size(zxid) + Coordination::size(error);
if (error == Error::ZOK)
response_size += sizeImpl();
Coordination::write(static_cast<int32_t>(response_size), out);
Coordination::write(xid, out);
if (use_xid_64)
Coordination::write(xid, out);
else
Coordination::write(static_cast<int32_t>(xid), out);
Coordination::write(zxid, out);
Coordination::write(error, out);
if (error == Error::ZOK)
@ -41,12 +50,21 @@ std::string ZooKeeperRequest::toString(bool short_format) const
toStringImpl(short_format));
}
void ZooKeeperRequest::write(WriteBuffer & out) const
void ZooKeeperRequest::write(WriteBuffer & out, bool use_xid_64) const
{
auto request_size = Coordination::size(xid) + Coordination::size(getOpNum()) + sizeImpl();
size_t request_size = 0;
if (use_xid_64)
request_size += sizeof(int64_t);
else
request_size += sizeof(int32_t);
request_size = Coordination::size(getOpNum()) + sizeImpl();
Coordination::write(static_cast<int32_t>(request_size), out);
Coordination::write(xid, out);
if (use_xid_64)
Coordination::write(static_cast<int64_t>(xid), out);
else
Coordination::write(static_cast<int32_t>(xid), out);
Coordination::write(getOpNum(), out);
writeImpl(out);
}
@ -150,10 +168,10 @@ size_t ZooKeeperWatchResponse::sizeImpl() const
return Coordination::size(type) + Coordination::size(state) + Coordination::size(path);
}
void ZooKeeperWatchResponse::write(WriteBuffer & out) const
void ZooKeeperWatchResponse::write(WriteBuffer & out, bool use_xid_64) const
{
if (error == Error::ZOK)
ZooKeeperResponse::write(out);
ZooKeeperResponse::write(out, use_xid_64);
/// skip bad responses for watches
}
@ -732,15 +750,13 @@ void ZooKeeperMultiRequest::writeImpl(WriteBuffer & out) const
size_t ZooKeeperMultiRequest::sizeImpl() const
{
size_t total_size = 0;
for (const auto & request : requests)
for (const auto & zk_request : requests)
{
const auto & zk_request = dynamic_cast<const ZooKeeperRequest &>(*request);
bool done = false;
int32_t error = -1;
total_size
+= Coordination::size(zk_request.getOpNum()) + Coordination::size(done) + Coordination::size(error) + zk_request.sizeImpl();
+= Coordination::size(zk_request->getOpNum()) + Coordination::size(done) + Coordination::size(error) + zk_request->sizeImpl();
}
OpNum op_num = OpNum::Error;

View File

@ -28,7 +28,7 @@ struct ZooKeeperResponse : virtual Response
virtual void readImpl(ReadBuffer &) = 0;
virtual void writeImpl(WriteBuffer &) const = 0;
virtual size_t sizeImpl() const = 0;
virtual void write(WriteBuffer & out) const;
virtual void write(WriteBuffer & out, bool use_xid_64) const;
virtual OpNum getOpNum() const = 0;
virtual void fillLogElements(LogElements & elems, size_t idx) const;
virtual int32_t tryGetOpNum() const { return static_cast<int32_t>(getOpNum()); }
@ -56,7 +56,7 @@ struct ZooKeeperRequest : virtual Request
virtual OpNum getOpNum() const = 0;
/// Writes length, xid, op_num, then the rest.
void write(WriteBuffer & out) const;
void write(WriteBuffer & out, bool use_xid_64) const;
std::string toString(bool short_format = false) const;
virtual void writeImpl(WriteBuffer &) const = 0;
@ -155,7 +155,7 @@ struct ZooKeeperWatchResponse final : WatchResponse, ZooKeeperResponse
void writeImpl(WriteBuffer & out) const override;
size_t sizeImpl() const override;
void write(WriteBuffer & out) const override;
void write(WriteBuffer & out, bool use_xid_64) const override;
OpNum getOpNum() const override
{

View File

@ -8,12 +8,13 @@
namespace Coordination
{
using XID = int32_t;
using XID = int64_t;
static constexpr XID WATCH_XID = -1;
static constexpr XID PING_XID = -2;
static constexpr XID AUTH_XID = -4;
static constexpr XID CLOSE_XID = 0x7FFFFFFF;
static constexpr XID CLOSE_XID = std::numeric_limits<int32_t>::max();
static constexpr XID CLOSE_XID_64 = std::numeric_limits<int64_t>::max();
enum class OpNum : int32_t
{
@ -49,6 +50,7 @@ OpNum getOpNum(int32_t raw_op_num);
static constexpr int32_t ZOOKEEPER_PROTOCOL_VERSION = 0;
static constexpr int32_t ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION = 10;
static constexpr int32_t ZOOKEEPER_PROTOCOL_VERSION_WITH_XID_64 = 11;
static constexpr int32_t KEEPER_PROTOCOL_VERSION_CONNECTION_REJECT = 42;
static constexpr int32_t CLIENT_HANDSHAKE_LENGTH = 44;
static constexpr int32_t CLIENT_HANDSHAKE_LENGTH_WITH_READONLY = 45;

View File

@ -378,6 +378,11 @@ ZooKeeper::ZooKeeper(
try
{
use_compression = args.use_compression;
if (args.use_xid_64)
{
use_xid_64 = true;
close_xid = CLOSE_XID_64;
}
connect(nodes, args.connection_timeout_ms * 1000);
}
catch (...)
@ -590,10 +595,19 @@ void ZooKeeper::sendHandshake()
bool read_only = true;
write(handshake_length);
if (use_compression)
if (use_xid_64)
{
write(ZOOKEEPER_PROTOCOL_VERSION_WITH_XID_64);
write(use_compression);
}
else if (use_compression)
{
write(ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION);
}
else
{
write(ZOOKEEPER_PROTOCOL_VERSION);
}
write(last_zxid_seen);
write(timeout);
write(previous_session_id);
@ -624,10 +638,15 @@ void ZooKeeper::receiveHandshake()
"Keeper server rejected the connection during the handshake. "
"Possibly it's overloaded, doesn't see leader or stale");
if (use_compression)
if (use_xid_64)
{
if (protocol_version_read < ZOOKEEPER_PROTOCOL_VERSION_WITH_XID_64)
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected protocol version with 64bit XID: {}", protocol_version_read);
}
else if (use_compression)
{
if (protocol_version_read != ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION)
throw Exception(Error::ZMARSHALLINGERROR,"Unexpected protocol version with compression: {}", protocol_version_read);
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected protocol version with compression: {}", protocol_version_read);
}
else if (protocol_version_read != ZOOKEEPER_PROTOCOL_VERSION)
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected protocol version: {}", protocol_version_read);
@ -650,7 +669,7 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
request.scheme = scheme;
request.data = data;
request.xid = AUTH_XID;
request.write(getWriteBuffer());
request.write(getWriteBuffer(), use_xid_64);
flushWriteBuffer();
int32_t length;
@ -710,7 +729,7 @@ void ZooKeeper::sendThread()
/// After we popped element from the queue, we must register callbacks (even in the case when expired == true right now),
/// because they must not be lost (callbacks must be called because the user will wait for them).
if (info.request->xid != CLOSE_XID)
if (info.request->xid != close_xid)
{
CurrentMetrics::add(CurrentMetrics::ZooKeeperRequest);
std::lock_guard lock(operations_mutex);
@ -730,13 +749,13 @@ void ZooKeeper::sendThread()
info.request->addRootPath(args.chroot);
info.request->probably_sent = true;
info.request->write(getWriteBuffer());
info.request->write(getWriteBuffer(), use_xid_64);
flushWriteBuffer();
logOperationIfNeeded(info.request);
/// We sent close request, exit
if (info.request->xid == CLOSE_XID)
if (info.request->xid == close_xid)
break;
}
}
@ -747,7 +766,7 @@ void ZooKeeper::sendThread()
ZooKeeperHeartbeatRequest request;
request.xid = PING_XID;
request.write(getWriteBuffer());
request.write(getWriteBuffer(), use_xid_64);
flushWriteBuffer();
}
@ -833,7 +852,16 @@ void ZooKeeper::receiveEvent()
read(length);
size_t count_before_event = in->count();
read(xid);
if (use_xid_64)
{
read(xid);
}
else
{
int32_t xid_32{0};
read(xid_32);
xid = xid_32;
}
read(zxid);
read(err);
@ -1191,7 +1219,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
if (!info.request->xid)
{
info.request->xid = next_xid.fetch_add(1);
if (info.request->xid == CLOSE_XID)
if (info.request->xid == close_xid)
throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "xid equal to close_xid");
if (info.request->xid < 0)
throw Exception::fromMessage(Error::ZSESSIONEXPIRED, "XID overflow");
@ -1538,7 +1566,7 @@ void ZooKeeper::multi(
void ZooKeeper::close()
{
ZooKeeperCloseRequest request;
request.xid = CLOSE_XID;
request.xid = close_xid;
RequestInfo request_info;
request_info.request = std::make_shared<ZooKeeperCloseRequest>(std::move(request));
@ -1568,7 +1596,7 @@ String ZooKeeper::getConnectedHostPort() const
return "";
}
int32_t ZooKeeper::getConnectionXid() const
int64_t ZooKeeper::getConnectionXid() const
{
return next_xid.load();
}

View File

@ -119,7 +119,7 @@ public:
std::optional<int8_t> getConnectedNodeIdx() const override;
String getConnectedHostPort() const override;
int32_t getConnectionXid() const override;
int64_t getConnectionXid() const override;
String tryGetAvailabilityZone() override;
@ -247,6 +247,9 @@ private:
std::optional<CompressedWriteBuffer> compressed_out;
bool use_compression = false;
bool use_xid_64 = false;
int64_t close_xid = CLOSE_XID;
int64_t session_id = 0;

View File

@ -597,20 +597,6 @@ void KeeperServer::shutdown()
namespace
{
// Serialize the request for the log entry
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorageBase::RequestForSession & request_for_session)
{
DB::WriteBufferFromNuraftBuffer write_buf;
DB::writeIntBinary(request_for_session.session_id, write_buf);
request_for_session.request->write(write_buf);
DB::writeIntBinary(request_for_session.time, write_buf);
/// we fill with dummy values to eliminate unnecessary copy later on when we will write correct values
DB::writeIntBinary(static_cast<int64_t>(0), write_buf); /// zxid
DB::writeIntBinary(KeeperStorageBase::DigestVersion::NO_DIGEST, write_buf); /// digest version or NO_DIGEST flag
DB::writeIntBinary(static_cast<uint64_t>(0), write_buf); /// digest value
/// if new fields are added, update KeeperStateMachine::ZooKeeperLogSerializationVersion along with parseRequest function and PreAppendLog callback handler
return write_buf.getBuffer();
}
}
@ -627,7 +613,7 @@ RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorageBase::Requests
std::vector<nuraft::ptr<nuraft::buffer>> entries;
entries.reserve(requests_for_sessions.size());
for (const auto & request_for_session : requests_for_sessions)
entries.push_back(getZooKeeperLogEntry(request_for_session));
entries.push_back(IKeeperStateMachine::getZooKeeperLogEntry(request_for_session));
std::lock_guard lock{server_write_mutex};
if (is_recovering)
@ -881,6 +867,9 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_ZXID_DIGEST)
bytes_missing += sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_XID_64)
bytes_missing += sizeof(uint32_t);
if (bytes_missing != 0)
{
auto new_buffer = nuraft::buffer::alloc(entry_buf->size() + bytes_missing);
@ -889,8 +878,8 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
entry = nuraft::cs_new<nuraft::log_entry>(entry->get_term(), entry_buf, entry->get_val_type());
}
size_t write_buffer_header_size
= sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
size_t write_buffer_header_size = sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version)
+ sizeof(request_for_session->digest->value) + sizeof(uint32_t);
if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME)
write_buffer_header_size += sizeof(request_for_session->time);

View File

@ -221,7 +221,47 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine<Storage>::pre_commit(uint64_t log
return result;
}
std::shared_ptr<KeeperStorageBase::RequestForSession> IKeeperStateMachine::parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version)
namespace
{
union XidHelper
{
struct
{
uint32_t lower;
uint32_t upper;
} parts;
int64_t xid;
};
};
// Serialize the request for the log entry
nuraft::ptr<nuraft::buffer> IKeeperStateMachine::getZooKeeperLogEntry(const KeeperStorageBase::RequestForSession & request_for_session)
{
DB::WriteBufferFromNuraftBuffer write_buf;
DB::writeIntBinary(request_for_session.session_id, write_buf);
const auto & request = request_for_session.request;
size_t request_size = sizeof(uint32_t) + Coordination::size(request->getOpNum()) + request->sizeImpl();
Coordination::write(static_cast<int32_t>(request_size), write_buf);
XidHelper xid_helper{.xid = request->xid};
Coordination::write(xid_helper.parts.lower, write_buf);
Coordination::write(request->getOpNum(), write_buf);
request->writeImpl(write_buf);
DB::writeIntBinary(request_for_session.time, write_buf);
/// we fill with dummy values to eliminate unnecessary copy later on when we will write correct values
DB::writeIntBinary(static_cast<int64_t>(0), write_buf); /// zxid
DB::writeIntBinary(KeeperStorageBase::DigestVersion::NO_DIGEST, write_buf); /// digest version or NO_DIGEST flag
DB::writeIntBinary(static_cast<uint64_t>(0), write_buf); /// digest value
Coordination::write(xid_helper.parts.upper, write_buf); /// for 64bit XID MSB
/// if new fields are added, update KeeperStateMachine::ZooKeeperLogSerializationVersion along with parseRequest function and PreAppendLog callback handler
return write_buf.getBuffer();
}
std::shared_ptr<KeeperStorageBase::RequestForSession>
IKeeperStateMachine::parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version)
{
ReadBufferFromNuraftBuffer buffer(data);
auto request_for_session = std::make_shared<KeeperStorageBase::RequestForSession>();
@ -230,14 +270,62 @@ std::shared_ptr<KeeperStorageBase::RequestForSession> IKeeperStateMachine::parse
int32_t length;
Coordination::read(length, buffer);
int32_t xid;
Coordination::read(xid, buffer);
/// because of backwards compatibility, only 32bit xid could be written
/// for that reason we serialize XID in 2 parts:
/// - lower: 32 least significant bits of 64bit XID OR 32bit XID
/// - upper: 32 most significant bits of 64bit XID
XidHelper xid_helper;
Coordination::read(xid_helper.parts.lower, buffer);
/// go to end of the buffer and read extra information including second part of XID
auto buffer_position = buffer.getPosition();
buffer.seek(length - sizeof(uint32_t), SEEK_CUR);
using enum ZooKeeperLogSerializationVersion;
ZooKeeperLogSerializationVersion version = INITIAL;
if (!buffer.eof())
{
version = WITH_TIME;
readIntBinary(request_for_session->time, buffer);
}
else
request_for_session->time
= std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
if (!buffer.eof())
{
version = WITH_ZXID_DIGEST;
readIntBinary(request_for_session->zxid, buffer);
chassert(!buffer.eof());
request_for_session->digest.emplace();
readIntBinary(request_for_session->digest->version, buffer);
if (request_for_session->digest->version != KeeperStorageBase::DigestVersion::NO_DIGEST || !buffer.eof())
readIntBinary(request_for_session->digest->value, buffer);
}
if (!buffer.eof())
{
version = WITH_XID_64;
Coordination::read(xid_helper.parts.upper, buffer);
}
if (serialization_version)
*serialization_version = version;
int64_t xid = xid_helper.xid;
buffer.seek(buffer_position, SEEK_SET);
static constexpr std::array non_cacheable_xids{
Coordination::WATCH_XID,
Coordination::PING_XID,
Coordination::AUTH_XID,
Coordination::CLOSE_XID,
Coordination::CLOSE_XID_64,
};
const bool should_cache
@ -266,44 +354,13 @@ std::shared_ptr<KeeperStorageBase::RequestForSession> IKeeperStateMachine::parse
}
}
Coordination::OpNum opnum;
Coordination::read(opnum, buffer);
request_for_session->request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request_for_session->request->xid = xid;
request_for_session->request->readImpl(buffer);
using enum ZooKeeperLogSerializationVersion;
ZooKeeperLogSerializationVersion version = INITIAL;
if (!buffer.eof())
{
version = WITH_TIME;
readIntBinary(request_for_session->time, buffer);
}
else
request_for_session->time
= std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
if (!buffer.eof())
{
version = WITH_ZXID_DIGEST;
readIntBinary(request_for_session->zxid, buffer);
chassert(!buffer.eof());
request_for_session->digest.emplace();
readIntBinary(request_for_session->digest->version, buffer);
if (request_for_session->digest->version != KeeperStorageBase::DigestVersion::NO_DIGEST || !buffer.eof())
readIntBinary(request_for_session->digest->value, buffer);
}
if (serialization_version)
*serialization_version = version;
if (should_cache && !final)
{
std::lock_guard lock(request_cache_mutex);

View File

@ -35,6 +35,7 @@ public:
INITIAL = 0,
WITH_TIME = 1,
WITH_ZXID_DIGEST = 2,
WITH_XID_64 = 3,
};
/// lifetime of a parsed request is:
@ -45,7 +46,10 @@ public:
///
/// final - whether it's the final time we will fetch the request so we can safely remove it from cache
/// serialization_version - information about which fields were parsed from the buffer so we can modify the buffer accordingly
std::shared_ptr<KeeperStorageBase::RequestForSession> parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version = nullptr);
std::shared_ptr<KeeperStorageBase::RequestForSession>
parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version = nullptr);
static nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorageBase::RequestForSession & request_for_session);
virtual bool preprocess(const KeeperStorageBase::RequestForSession & request_for_session) = 0;

View File

@ -134,7 +134,7 @@ ColumnsDescription ZooKeeperLogElement::getColumnsDescription()
{"session_id", std::make_shared<DataTypeInt64>(), "The session ID that the ZooKeeper server sets for each connection."},
{"duration_microseconds", std::make_shared<DataTypeUInt64>(), "The time taken by ZooKeeper to execute the request."},
{"xid", std::make_shared<DataTypeInt32>(), "The ID of the request within the session. This is usually a sequential request number. It is the same for the request row and the paired response/finalize row."},
{"xid", std::make_shared<DataTypeInt64>(), "The ID of the request within the session. This is usually a sequential request number. It is the same for the request row and the paired response/finalize row."},
{"has_watch", std::make_shared<DataTypeUInt8>(), "The request whether the watch has been set."},
{"op_num", op_num_enum, "The type of request or response."},
{"path", std::make_shared<DataTypeString>(), "The path to the ZooKeeper node specified in the request, or an empty string if the request not requires specifying a path."},

View File

@ -31,7 +31,7 @@ struct ZooKeeperLogElement
UInt64 duration_microseconds = 0;
/// Common request info
Int32 xid = 0;
Int64 xid = 0;
bool has_watch = false;
Int32 op_num = 0;
String path;

View File

@ -1,4 +1,5 @@
#include <Server/KeeperTCPHandler.h>
#include "Common/ZooKeeper/ZooKeeperConstants.h"
#if USE_NURAFT
@ -252,7 +253,9 @@ void KeeperTCPHandler::sendHandshake(bool has_leader, bool & use_compression)
Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out);
if (has_leader)
{
if (use_compression)
if (use_xid_64)
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_XID_64, *out);
else if (use_compression)
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION, *out);
else
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *out);
@ -290,10 +293,23 @@ Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length, bool
Coordination::read(protocol_version, *in);
if (protocol_version != Coordination::ZOOKEEPER_PROTOCOL_VERSION && protocol_version != Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION)
if (protocol_version != Coordination::ZOOKEEPER_PROTOCOL_VERSION
&& protocol_version < Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION
&& protocol_version > Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_XID_64)
{
throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected protocol version: {}", toString(protocol_version));
}
use_compression = (protocol_version == Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION);
if (protocol_version == Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION)
{
use_compression = true;
}
else if (protocol_version >= Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_XID_64)
{
close_xid = Coordination::CLOSE_XID_64;
use_xid_64 = true;
Coordination::read(use_compression, *in);
}
Coordination::read(last_zxid_seen, *in);
Coordination::read(timeout_ms, *in);
@ -485,7 +501,7 @@ void KeeperTCPHandler::runImpl()
updateStats(response, request_with_response.request);
packageSent();
response->write(getWriteBuffer());
response->write(getWriteBuffer(), use_xid_64);
flushWriteBuffer();
log_long_operation("Sending response");
if (response->error == Coordination::Error::ZSESSIONEXPIRED)
@ -582,8 +598,15 @@ std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveReque
auto & read_buffer = getReadBuffer();
int32_t length;
Coordination::read(length, read_buffer);
int32_t xid;
Coordination::read(xid, read_buffer);
int64_t xid;
if (use_xid_64)
Coordination::read(xid, read_buffer);
else
{
int32_t read_xid;
Coordination::read(read_xid, read_buffer);
xid = read_xid;
}
Coordination::OpNum opnum;
Coordination::read(opnum, read_buffer);

View File

@ -84,6 +84,7 @@ private:
ThreadSafeResponseQueuePtr responses;
Coordination::XID close_xid = Coordination::CLOSE_XID;
bool use_xid_64 = false;
/// Streams for reading/writing from/to client connection socket.
std::optional<ReadBufferFromPocoSocket> in;

View File

@ -34,7 +34,7 @@ ColumnsDescription StorageSystemZooKeeperConnection::getColumnsDescription()
/* 6 */ {"is_expired", std::make_shared<DataTypeUInt8>(), "Is the current connection expired."},
/* 7 */ {"keeper_api_version", std::make_shared<DataTypeUInt8>(), "Keeper API version."},
/* 8 */ {"client_id", std::make_shared<DataTypeInt64>(), "Session id of the connection."},
/* 9 */ {"xid", std::make_shared<DataTypeInt32>(), "XID of the current session."},
/* 9 */ {"xid", std::make_shared<DataTypeInt64>(), "XID of the current session."},
/* 10*/ {"enabled_feature_flags", std::make_shared<DataTypeArray>(std::move(feature_flags_enum)),
"Feature flags which are enabled. Only applicable to ClickHouse Keeper."
},

View File

@ -3,6 +3,7 @@
<!--<zookeeper_load_balancing>random / in_order / nearest_hostname / hostname_levenshtein_distance / first_or_random / round_robin</zookeeper_load_balancing>-->
<zookeeper_load_balancing>random</zookeeper_load_balancing>
<use_compression>1</use_compression>
<use_xid_64>1</use_xid_64>
<node index="1">
<host>127.0.0.1</host>
<port>9181</port>

View File

@ -1,6 +1,7 @@
<clickhouse>
<zookeeper>
<use_compression>1</use_compression>
<use_xid_64>1</use_xid_64>
<node index="1">
<host>localhost</host>
<port>9181</port>

View File

@ -62,6 +62,8 @@ source /repo/tests/docker_scripts/utils.lib
config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml
if [[ -n "$BUGFIX_VALIDATE_CHECK" ]] && [[ "$BUGFIX_VALIDATE_CHECK" -eq 1 ]]; then
sudo sed -i "/<use_xid_64>1<\/use_xid_64>/d" /etc/clickhouse-server/config.d/zookeeper.xml
function remove_keeper_config()
{
sudo sed -i "/<$1>$2<\/$1>/d" /etc/clickhouse-server/config.d/keeper_port.xml

View File

@ -34,6 +34,7 @@
<tcp_port>9181</tcp_port>
<server_id>{id}</server_id>
<digest_enabled>1</digest_enabled>
<use_xid_64>1</use_xid_64>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>

View File

@ -186,6 +186,9 @@ void Runner::parseHostsFromConfig(const Poco::Util::AbstractConfiguration & conf
if (config.has(key + ".use_compression"))
connection_info.use_compression = config.getBool(key + ".use_compression");
if (config.has(key + ".use_xid_64"))
connection_info.use_xid_64 = config.getBool(key + ".use_xid_64");
};
fill_connection_details("connections", default_connection_info);
@ -1258,6 +1261,7 @@ std::shared_ptr<Coordination::ZooKeeper> Runner::getConnection(const ConnectionI
args.connection_timeout_ms = connection_info.connection_timeout_ms;
args.operation_timeout_ms = connection_info.operation_timeout_ms;
args.use_compression = connection_info.use_compression;
args.use_xid_64 = connection_info.use_xid_64;
return std::make_shared<Coordination::ZooKeeper>(nodes, args, nullptr);
}

View File

@ -84,6 +84,7 @@ private:
int32_t connection_timeout_ms = Coordination::DEFAULT_CONNECTION_TIMEOUT_MS;
int32_t operation_timeout_ms = Coordination::DEFAULT_OPERATION_TIMEOUT_MS;
bool use_compression = false;
bool use_xid_64 = false;
size_t sessions = 1;
};