Support compression for keeper protocol

This commit is contained in:
Smita Kulkarni 2023-09-24 11:38:08 +02:00
parent b4f9d8a517
commit ca07275143
9 changed files with 137 additions and 44 deletions

View File

@ -212,6 +212,10 @@ void ZooKeeperArgs::initFromKeeperSection(const Poco::Util::AbstractConfiguratio
.max_sec = config.getUInt(config_name + "." + key + ".max"), .max_sec = config.getUInt(config_name + "." + key + ".max"),
}; };
} }
else if (key == "compressed_protocol")
{
compressed_protocol = config.getBool(config_name + "." + key);
}
else else
throw KeeperException(Coordination::Error::ZBADARGUMENTS, "Unknown key {} in config file", key); throw KeeperException(Coordination::Error::ZBADARGUMENTS, "Unknown key {} in config file", key);
} }

View File

@ -44,6 +44,7 @@ struct ZooKeeperArgs
double recv_sleep_probability = 0.0; double recv_sleep_probability = 0.0;
UInt64 send_sleep_ms = 0; UInt64 send_sleep_ms = 0;
UInt64 recv_sleep_ms = 0; UInt64 recv_sleep_ms = 0;
bool compressed_protocol = false;
SessionLifetimeConfiguration fallback_session_lifetime = {}; SessionLifetimeConfiguration fallback_session_lifetime = {};
DB::GetPriorityForLoadBalancing get_priority_load_balancing; DB::GetPriorityForLoadBalancing get_priority_load_balancing;

View File

@ -45,6 +45,7 @@ enum class OpNum : int32_t
OpNum getOpNum(int32_t raw_op_num); OpNum getOpNum(int32_t raw_op_num);
static constexpr int32_t ZOOKEEPER_PROTOCOL_VERSION = 0; static constexpr int32_t ZOOKEEPER_PROTOCOL_VERSION = 0;
static constexpr int32_t ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION = 1;
static constexpr int32_t KEEPER_PROTOCOL_VERSION_CONNECTION_REJECT = 42; static constexpr int32_t KEEPER_PROTOCOL_VERSION_CONNECTION_REJECT = 42;
static constexpr int32_t CLIENT_HANDSHAKE_LENGTH = 44; static constexpr int32_t CLIENT_HANDSHAKE_LENGTH = 44;
static constexpr int32_t CLIENT_HANDSHAKE_LENGTH_WITH_READONLY = 45; static constexpr int32_t CLIENT_HANDSHAKE_LENGTH_WITH_READONLY = 45;

View File

@ -8,6 +8,7 @@
#include <cstdint> #include <cstdint>
#include <vector> #include <vector>
#include <array> #include <array>
#include <Compression/CompressedReadBuffer.h>
namespace Coordination namespace Coordination

View File

@ -16,6 +16,9 @@
#include <Common/ZooKeeper/ZooKeeperIO.h> #include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Compression/CompressionFactory.h>
#include "Coordination/KeeperConstants.h" #include "Coordination/KeeperConstants.h"
#include "config.h" #include "config.h"
@ -274,13 +277,13 @@ using namespace DB;
template <typename T> template <typename T>
void ZooKeeper::write(const T & x) void ZooKeeper::write(const T & x)
{ {
Coordination::write(x, *out); Coordination::write(x, *maybe_compressed_out);
} }
template <typename T> template <typename T>
void ZooKeeper::read(T & x) void ZooKeeper::read(T & x)
{ {
Coordination::read(x, *in); Coordination::read(x, *maybe_compressed_in);
} }
static void removeRootPath(String & path, const String & chroot) static void removeRootPath(String & path, const String & chroot)
@ -345,7 +348,21 @@ ZooKeeper::ZooKeeper(
if (args.enable_fault_injections_during_startup) if (args.enable_fault_injections_during_startup)
setupFaultDistributions(); setupFaultDistributions();
try
{
use_compression = args.compressed_protocol;
connect(nodes, args.connection_timeout_ms * 1000); connect(nodes, args.connection_timeout_ms * 1000);
}
catch (...)
{
if (use_compression)
{
use_compression = false;
connect(nodes, args.connection_timeout_ms * 1000);
}
else
throw;
}
if (!args.auth_scheme.empty()) if (!args.auth_scheme.empty())
sendAuth(args.auth_scheme, args.identity); sendAuth(args.auth_scheme, args.identity);
@ -422,8 +439,10 @@ void ZooKeeper::connect(
socket.setSendTimeout(args.operation_timeout_ms * 1000); socket.setSendTimeout(args.operation_timeout_ms * 1000);
socket.setNoDelay(true); socket.setNoDelay(true);
in.emplace(socket); in = std::make_shared<ReadBufferFromPocoSocket>(socket);
out.emplace(socket); out = std::make_shared<WriteBufferFromPocoSocket>(socket);
maybe_compressed_in = in;
maybe_compressed_out = out;
try try
{ {
@ -444,7 +463,15 @@ void ZooKeeper::connect(
e.addMessage("while receiving handshake from ZooKeeper"); e.addMessage("while receiving handshake from ZooKeeper");
throw; throw;
} }
connected = true; connected = true;
if (use_compression)
{
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out,
CompressionCodecFactory::instance().get(
"ZSTD", {}));
}
if (connected_callback.has_value()) if (connected_callback.has_value())
(*connected_callback)(i, node); (*connected_callback)(i, node);
@ -513,16 +540,19 @@ void ZooKeeper::sendHandshake()
std::array<char, passwd_len> passwd {}; std::array<char, passwd_len> passwd {};
write(handshake_length); write(handshake_length);
if (use_compression)
write(ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION);
else
write(ZOOKEEPER_PROTOCOL_VERSION); write(ZOOKEEPER_PROTOCOL_VERSION);
write(last_zxid_seen); write(last_zxid_seen);
write(timeout); write(timeout);
write(previous_session_id); write(previous_session_id);
write(passwd); write(passwd);
maybe_compressed_out->next();
out->next(); out->next();
} }
void ZooKeeper::receiveHandshake() void ZooKeeper::receiveHandshake()
{ {
int32_t handshake_length; int32_t handshake_length;
@ -535,8 +565,7 @@ void ZooKeeper::receiveHandshake()
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected handshake length received: {}", handshake_length); throw Exception(Error::ZMARSHALLINGERROR, "Unexpected handshake length received: {}", handshake_length);
read(protocol_version_read); read(protocol_version_read);
if (protocol_version_read != ZOOKEEPER_PROTOCOL_VERSION)
{
/// Special way to tell a client that server is not ready to serve it. /// Special way to tell a client that server is not ready to serve it.
/// It's better for faster failover than just connection drop. /// It's better for faster failover than just connection drop.
/// Implemented in clickhouse-keeper. /// Implemented in clickhouse-keeper.
@ -544,9 +573,14 @@ void ZooKeeper::receiveHandshake()
throw Exception::fromMessage(Error::ZCONNECTIONLOSS, throw Exception::fromMessage(Error::ZCONNECTIONLOSS,
"Keeper server rejected the connection during the handshake. " "Keeper server rejected the connection during the handshake. "
"Possibly it's overloaded, doesn't see leader or stale"); "Possibly it's overloaded, doesn't see leader or stale");
else
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected protocol version: {}", protocol_version_read); if (use_compression)
{
if (protocol_version_read != ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION)
throw Exception(Error::ZMARSHALLINGERROR,"Unexpected protocol version with compression: {}", protocol_version_read);
} }
else if (protocol_version_read != ZOOKEEPER_PROTOCOL_VERSION)
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected protocol version: {}", protocol_version_read);
read(timeout); read(timeout);
if (timeout != args.session_timeout_ms) if (timeout != args.session_timeout_ms)
@ -564,7 +598,9 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
request.scheme = scheme; request.scheme = scheme;
request.data = data; request.data = data;
request.xid = AUTH_XID; request.xid = AUTH_XID;
request.write(*out); request.write(*maybe_compressed_out);
maybe_compressed_out->next();
out->next();
int32_t length; int32_t length;
XID read_xid; XID read_xid;
@ -580,10 +616,14 @@ void ZooKeeper::sendAuth(const String & scheme, const String & data)
if (read_xid != AUTH_XID) if (read_xid != AUTH_XID)
throw Exception(Error::ZMARSHALLINGERROR, "Unexpected event received in reply to auth request: {}", read_xid); throw Exception(Error::ZMARSHALLINGERROR, "Unexpected event received in reply to auth request: {}", read_xid);
if (!use_compression)
{
int32_t actual_length = static_cast<int32_t>(in->count() - count_before_event); int32_t actual_length = static_cast<int32_t>(in->count() - count_before_event);
if (length != actual_length) if (length != actual_length)
throw Exception(Error::ZMARSHALLINGERROR, "Response length doesn't match. Expected: {}, actual: {}", length, actual_length); throw Exception(Error::ZMARSHALLINGERROR, "Response length doesn't match. Expected: {}, actual: {}", length, actual_length);
}
if (err != Error::ZOK) if (err != Error::ZOK)
throw Exception(Error::ZMARSHALLINGERROR, "Error received in reply to auth request. Code: {}. Message: {}", throw Exception(Error::ZMARSHALLINGERROR, "Error received in reply to auth request. Code: {}. Message: {}",
static_cast<int32_t>(err), err); static_cast<int32_t>(err), err);
@ -639,7 +679,10 @@ void ZooKeeper::sendThread()
info.request->addRootPath(args.chroot); info.request->addRootPath(args.chroot);
info.request->probably_sent = true; info.request->probably_sent = true;
info.request->write(*out); info.request->write(*maybe_compressed_out);
maybe_compressed_out->next();
out->next();
logOperationIfNeeded(info.request); logOperationIfNeeded(info.request);
@ -655,7 +698,9 @@ void ZooKeeper::sendThread()
ZooKeeperHeartbeatRequest request; ZooKeeperHeartbeatRequest request;
request.xid = PING_XID; request.xid = PING_XID;
request.write(*out); request.write(*maybe_compressed_out);
maybe_compressed_out->next();
out->next();
} }
ProfileEvents::increment(ProfileEvents::ZooKeeperBytesSent, out->count() - prev_bytes_sent); ProfileEvents::increment(ProfileEvents::ZooKeeperBytesSent, out->count() - prev_bytes_sent);
@ -827,7 +872,7 @@ void ZooKeeper::receiveEvent()
} }
else else
{ {
response->readImpl(*in); response->readImpl(*maybe_compressed_in);
response->removeRootPath(args.chroot); response->removeRootPath(args.chroot);
} }
/// Instead of setting the watch in sendEvent, set it in receiveEvent because need to check the response. /// Instead of setting the watch in sendEvent, set it in receiveEvent because need to check the response.
@ -860,9 +905,14 @@ void ZooKeeper::receiveEvent()
} }
} }
if (!use_compression)
{
int32_t actual_length = static_cast<int32_t>(in->count() - count_before_event); int32_t actual_length = static_cast<int32_t>(in->count() - count_before_event);
if (length != actual_length) if (length != actual_length)
throw Exception(Error::ZMARSHALLINGERROR, "Response length doesn't match. Expected: {}, actual: {}", length, actual_length); throw Exception(Error::ZMARSHALLINGERROR, "Response length doesn't match. Expected: {}, actual: {}",
length, actual_length);
}
logOperationIfNeeded(request_info.request, response, /* finalize= */ false, elapsed_ms); logOperationIfNeeded(request_info.request, response, /* finalize= */ false, elapsed_ms);
} }

View File

@ -15,6 +15,8 @@
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <IO/ReadBufferFromPocoSocket.h> #include <IO/ReadBufferFromPocoSocket.h>
#include <IO/WriteBufferFromPocoSocket.h> #include <IO/WriteBufferFromPocoSocket.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Poco/Net/StreamSocket.h> #include <Poco/Net/StreamSocket.h>
#include <Poco/Net/SocketAddress.h> #include <Poco/Net/SocketAddress.h>
@ -236,8 +238,13 @@ private:
Poco::Net::StreamSocket socket; Poco::Net::StreamSocket socket;
/// To avoid excessive getpeername(2) calls. /// To avoid excessive getpeername(2) calls.
Poco::Net::SocketAddress socket_address; Poco::Net::SocketAddress socket_address;
std::optional<ReadBufferFromPocoSocket> in;
std::optional<WriteBufferFromPocoSocket> out; std::shared_ptr<ReadBufferFromPocoSocket> in;
std::shared_ptr<WriteBufferFromPocoSocket> out;
std::shared_ptr<ReadBuffer> maybe_compressed_in;
std::shared_ptr<WriteBuffer> maybe_compressed_out;
bool use_compression = false;
int64_t session_id = 0; int64_t session_id = 0;

View File

@ -20,6 +20,7 @@
#include <queue> #include <queue>
#include <mutex> #include <mutex>
#include <Coordination/FourLetterCommand.h> #include <Coordination/FourLetterCommand.h>
#include <IO/CompressionMethod.h>
#include <base/hex.h> #include <base/hex.h>
@ -242,11 +243,14 @@ KeeperTCPHandler::KeeperTCPHandler(
KeeperTCPHandler::registerConnection(this); KeeperTCPHandler::registerConnection(this);
} }
void KeeperTCPHandler::sendHandshake(bool has_leader) void KeeperTCPHandler::sendHandshake(bool has_leader, bool & use_compression)
{ {
Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out); Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out);
if (has_leader) if (has_leader)
{ {
if (use_compression)
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION, *out);
else
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *out); Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *out);
} }
else else
@ -269,7 +273,7 @@ void KeeperTCPHandler::run()
runImpl(); runImpl();
} }
Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length) Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length, bool & use_compression)
{ {
int32_t protocol_version; int32_t protocol_version;
int64_t last_zxid_seen; int64_t last_zxid_seen;
@ -282,9 +286,11 @@ Poco::Timespan KeeperTCPHandler::receiveHandshake(int32_t handshake_length)
Coordination::read(protocol_version, *in); Coordination::read(protocol_version, *in);
if (protocol_version != Coordination::ZOOKEEPER_PROTOCOL_VERSION) if (protocol_version != Coordination::ZOOKEEPER_PROTOCOL_VERSION && protocol_version != Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION)
throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected protocol version: {}", toString(protocol_version)); throw Exception(ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT, "Unexpected protocol version: {}", toString(protocol_version));
use_compression = (protocol_version == Coordination::ZOOKEEPER_PROTOCOL_VERSION_WITH_COMPRESSION);
Coordination::read(last_zxid_seen, *in); Coordination::read(last_zxid_seen, *in);
Coordination::read(timeout_ms, *in); Coordination::read(timeout_ms, *in);
@ -312,6 +318,8 @@ void KeeperTCPHandler::runImpl()
in = std::make_shared<ReadBufferFromPocoSocket>(socket()); in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket()); out = std::make_shared<WriteBufferFromPocoSocket>(socket());
bool use_compression = false;
if (in->eof()) if (in->eof())
{ {
LOG_WARNING(log, "Client has not sent any data."); LOG_WARNING(log, "Client has not sent any data.");
@ -343,7 +351,7 @@ void KeeperTCPHandler::runImpl()
try try
{ {
int32_t handshake_length = header; int32_t handshake_length = header;
auto client_timeout = receiveHandshake(handshake_length); auto client_timeout = receiveHandshake(handshake_length, use_compression);
if (client_timeout.totalMilliseconds() == 0) if (client_timeout.totalMilliseconds() == 0)
client_timeout = Poco::Timespan(Coordination::DEFAULT_SESSION_TIMEOUT_MS * Poco::Timespan::MILLISECONDS); client_timeout = Poco::Timespan(Coordination::DEFAULT_SESSION_TIMEOUT_MS * Poco::Timespan::MILLISECONDS);
@ -367,20 +375,33 @@ void KeeperTCPHandler::runImpl()
catch (const Exception & e) catch (const Exception & e)
{ {
LOG_WARNING(log, "Cannot receive session id {}", e.displayText()); LOG_WARNING(log, "Cannot receive session id {}", e.displayText());
sendHandshake(false); sendHandshake(/* has_leader */ false, use_compression);
return; return;
} }
sendHandshake(true); sendHandshake(/* has_leader */ true, use_compression);
} }
else else
{ {
LOG_WARNING(log, "Ignoring user request, because the server is not active yet"); LOG_WARNING(log, "Ignoring user request, because the server is not active yet");
sendHandshake(false); sendHandshake(/* has_leader */ false, use_compression);
return; return;
} }
if (use_compression)
{
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out,
CompressionCodecFactory::instance().get("ZSTD",
{}));
}
else
{
maybe_compressed_in = in;
maybe_compressed_out = out;
}
auto response_fd = poll_wrapper->getResponseFD(); auto response_fd = poll_wrapper->getResponseFD();
auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response) auto response_callback = [this, response_fd] (const Coordination::ZooKeeperResponsePtr & response)
{ {
@ -467,7 +488,9 @@ void KeeperTCPHandler::runImpl()
updateStats(response); updateStats(response);
packageSent(); packageSent();
response->write(*out); response->write(*maybe_compressed_out);
maybe_compressed_out->next();
out->next();
log_long_operation("Sending response"); log_long_operation("Sending response");
if (response->error == Coordination::Error::ZSESSIONEXPIRED) if (response->error == Coordination::Error::ZSESSIONEXPIRED)
{ {
@ -525,7 +548,8 @@ bool KeeperTCPHandler::tryExecuteFourLetterWordCmd(int32_t command)
try try
{ {
String res = command_ptr->run(); String res = command_ptr->run();
out->write(res.data(), res.size()); maybe_compressed_out->write(res.data(),res.size());
maybe_compressed_out->next();
out->next(); out->next();
} }
catch (...) catch (...)
@ -540,16 +564,16 @@ bool KeeperTCPHandler::tryExecuteFourLetterWordCmd(int32_t command)
std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveRequest() std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveRequest()
{ {
int32_t length; int32_t length;
Coordination::read(length, *in); Coordination::read(length, *maybe_compressed_in);
int32_t xid; int32_t xid;
Coordination::read(xid, *in); Coordination::read(xid, *maybe_compressed_in);
Coordination::OpNum opnum; Coordination::OpNum opnum;
Coordination::read(opnum, *in); Coordination::read(opnum, *maybe_compressed_in);
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request->xid = xid; request->xid = xid;
request->readImpl(*in); request->readImpl(*maybe_compressed_in);
if (!keeper_dispatcher->putRequest(request, session_id)) if (!keeper_dispatcher->putRequest(request, session_id))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id); throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Session {} already disconnected", session_id);

View File

@ -17,6 +17,8 @@
#include <unordered_map> #include <unordered_map>
#include <Coordination/KeeperConnectionStats.h> #include <Coordination/KeeperConnectionStats.h>
#include <Poco/Timestamp.h> #include <Poco/Timestamp.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
namespace DB namespace DB
{ {
@ -80,13 +82,15 @@ private:
/// Streams for reading/writing from/to client connection socket. /// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBufferFromPocoSocket> in; std::shared_ptr<ReadBufferFromPocoSocket> in;
std::shared_ptr<WriteBufferFromPocoSocket> out; std::shared_ptr<WriteBufferFromPocoSocket> out;
std::shared_ptr<ReadBuffer> maybe_compressed_in;
std::shared_ptr<WriteBuffer> maybe_compressed_out;
std::atomic<bool> connected{false}; std::atomic<bool> connected{false};
void runImpl(); void runImpl();
void sendHandshake(bool has_leader); void sendHandshake(bool has_leader, bool & use_compression);
Poco::Timespan receiveHandshake(int32_t handshake_length); Poco::Timespan receiveHandshake(int32_t handshake_length, bool & use_compression);
static bool isHandShake(int32_t handshake_length); static bool isHandShake(int32_t handshake_length);
bool tryExecuteFourLetterWordCmd(int32_t command); bool tryExecuteFourLetterWordCmd(int32_t command);

View File

@ -2,6 +2,7 @@
<zookeeper> <zookeeper>
<!--<zookeeper_load_balancing>random / in_order / nearest_hostname / first_or_random / round_robin</zookeeper_load_balancing>--> <!--<zookeeper_load_balancing>random / in_order / nearest_hostname / first_or_random / round_robin</zookeeper_load_balancing>-->
<zookeeper_load_balancing>random</zookeeper_load_balancing> <zookeeper_load_balancing>random</zookeeper_load_balancing>
<compressed_protocol>true</compressed_protocol>
<node index="1"> <node index="1">
<host>localhost</host> <host>localhost</host>
<port>9181</port> <port>9181</port>