Use 4LW for api version

This commit is contained in:
Antonio Andelic 2022-07-15 10:13:39 +00:00
parent 0e25dbbbeb
commit 73e0c35ab0
10 changed files with 93 additions and 75 deletions

View File

@ -898,4 +898,11 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this); registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this);
} }
int32_t fourLetterCommandNameToCode(std::string_view name)
{
int32_t res = *reinterpret_cast<const int32_t *>(name.data());
/// keep consistent with Coordination::read method by changing big endian to little endian.
return __builtin_bswap32(res);
}
} }

View File

@ -554,4 +554,6 @@ private:
ZooKeeperRequestFactory(); ZooKeeperRequestFactory();
}; };
int32_t fourLetterCommandNameToCode(std::string_view name);
} }

View File

@ -49,6 +49,12 @@ void write(const std::string & s, WriteBuffer & out)
out.write(s.data(), s.size()); out.write(s.data(), s.size());
} }
void write(std::string_view s, WriteBuffer & out)
{
write(static_cast<int32_t>(s.size()), out);
out.write(s.data(), s.size());
}
void write(const ACL & acl, WriteBuffer & out) void write(const ACL & acl, WriteBuffer & out)
{ {
write(acl.permissions, out); write(acl.permissions, out);

View File

@ -26,6 +26,7 @@ void write(uint8_t x, WriteBuffer & out);
void write(OpNum x, WriteBuffer & out); void write(OpNum x, WriteBuffer & out);
void write(bool x, WriteBuffer & out); void write(bool x, WriteBuffer & out);
void write(const std::string & s, WriteBuffer & out); void write(const std::string & s, WriteBuffer & out);
void write(std::string_view s, WriteBuffer & out);
void write(const ACL & acl, WriteBuffer & out); void write(const ACL & acl, WriteBuffer & out);
void write(const Stat & stat, WriteBuffer & out); void write(const Stat & stat, WriteBuffer & out);
void write(const Error & x, WriteBuffer & out); void write(const Error & x, WriteBuffer & out);

View File

@ -12,6 +12,7 @@
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <base/getThreadId.h> #include <base/getThreadId.h>
#include <IO/ReadHelpers.h>
#include <Common/config.h> #include <Common/config.h>
@ -354,12 +355,37 @@ ZooKeeper::ZooKeeper(
send_thread = ThreadFromGlobalPool([this] { sendThread(); }); send_thread = ThreadFromGlobalPool([this] { sendThread(); });
receive_thread = ThreadFromGlobalPool([this] { receiveThread(); }); receive_thread = ThreadFromGlobalPool([this] { receiveThread(); });
initApiVersion();
ProfileEvents::increment(ProfileEvents::ZooKeeperInit); ProfileEvents::increment(ProfileEvents::ZooKeeperInit);
} }
Poco::Net::StreamSocket ZooKeeper::connectToNode(const Poco::Net::SocketAddress & node_address, Poco::Timespan connection_timeout, bool is_secure)
{
Poco::Net::StreamSocket result;
/// Reset the state of previous attempt.
if (is_secure)
{
#if USE_SSL
result = Poco::Net::SecureStreamSocket();
#else
throw Poco::Exception(
"Communication with ZooKeeper over SSL is disabled because poco library was built without NetSSL support.");
#endif
}
else
{
result = Poco::Net::StreamSocket();
}
result.connect(node_address, connection_timeout);
result.setReceiveTimeout(operation_timeout);
result.setSendTimeout(operation_timeout);
result.setNoDelay(true);
return result;
}
void ZooKeeper::connect( void ZooKeeper::connect(
const Nodes & nodes, const Nodes & nodes,
Poco::Timespan connection_timeout) Poco::Timespan connection_timeout)
@ -377,28 +403,9 @@ void ZooKeeper::connect(
{ {
try try
{ {
/// Reset the state of previous attempt. socket = connectToNode(node.address, connection_timeout, node.secure);
if (node.secure)
{
#if USE_SSL
socket = Poco::Net::SecureStreamSocket();
#else
throw Poco::Exception(
"Communication with ZooKeeper over SSL is disabled because poco library was built without NetSSL support.");
#endif
}
else
{
socket = Poco::Net::StreamSocket();
}
socket.connect(node.address, connection_timeout);
socket_address = socket.peerAddress(); socket_address = socket.peerAddress();
socket.setReceiveTimeout(operation_timeout);
socket.setSendTimeout(operation_timeout);
socket.setNoDelay(true);
in.emplace(socket); in.emplace(socket);
out.emplace(socket); out.emplace(socket);
@ -423,6 +430,9 @@ void ZooKeeper::connect(
} }
connected = true; connected = true;
initApiVersion(node.address, connection_timeout, node.secure);
break; break;
} }
catch (...) catch (...)
@ -1066,29 +1076,29 @@ Coordination::KeeperApiVersion ZooKeeper::getApiVersion()
return keeper_api_version; return keeper_api_version;
} }
void ZooKeeper::initApiVersion() void ZooKeeper::initApiVersion(const Poco::Net::SocketAddress & node_address, Poco::Timespan connection_timeout, bool is_secure)
{ {
auto promise = std::make_shared<std::promise<Coordination::GetResponse>>(); try
auto future = promise->get_future();
auto callback = [promise](const Coordination::GetResponse & response) mutable
{ {
promise->set_value(response); auto command_socket = connectToNode(node_address, connection_timeout, is_secure);
};
get(Coordination::keeper_api_version_path, std::move(callback), {}); auto apiv_code = Coordination::fourLetterCommandNameToCode("apiv");
if (future.wait_for(std::chrono::milliseconds(operation_timeout.totalMilliseconds())) != std::future_status::ready)
return;
auto response = future.get(); WriteBufferFromPocoSocket command_out(command_socket);
Coordination::write(apiv_code, command_out);
command_out.next();
if (response.error != Coordination::Error::ZOK) ReadBufferFromPocoSocket command_in(command_socket);
return; std::string result;
readStringUntilEOF(result, command_in);
uint8_t keeper_version{0}; auto read_version = parseFromString<uint8_t>(result);
DB::ReadBufferFromOwnString buf(response.data); keeper_api_version = static_cast<KeeperApiVersion>(read_version);
DB::readIntText(keeper_version, buf); }
keeper_api_version = static_cast<Coordination::KeeperApiVersion>(keeper_version); catch (const DB::Exception & e)
{
LOG_ERROR(&Poco::Logger::get("ZooKeeper"), "Failed to get version: {}", e.message());
}
} }

View File

@ -254,6 +254,8 @@ private:
const Nodes & node, const Nodes & node,
Poco::Timespan connection_timeout); Poco::Timespan connection_timeout);
Poco::Net::StreamSocket connectToNode(const Poco::Net::SocketAddress & node_address, Poco::Timespan connection_timeout, bool is_secure);
void sendHandshake(); void sendHandshake();
void receiveHandshake(); void receiveHandshake();
@ -277,7 +279,7 @@ private:
void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false); void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false);
void initApiVersion(); void initApiVersion(const Poco::Net::SocketAddress & node_address, Poco::Timespan connection_timeout, bool is_secure);
CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession}; CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession};
std::shared_ptr<ZooKeeperLog> zk_log; std::shared_ptr<ZooKeeperLog> zk_log;

View File

@ -37,7 +37,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco
} }
const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr"; const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv";
KeeperConfigurationAndSettings::KeeperConfigurationAndSettings() KeeperConfigurationAndSettings::KeeperConfigurationAndSettings()
: server_id(NOT_EXIST) : server_id(NOT_EXIST)

View File

@ -2,6 +2,7 @@
#include <Coordination/KeeperDispatcher.h> #include <Coordination/KeeperDispatcher.h>
#include <Server/KeeperTCPHandler.h> #include <Server/KeeperTCPHandler.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#include <Poco/Environment.h> #include <Poco/Environment.h>
#include <Poco/Path.h> #include <Poco/Path.h>
@ -39,9 +40,7 @@ String IFourLetterCommand::toName(int32_t code)
int32_t IFourLetterCommand::toCode(const String & name) int32_t IFourLetterCommand::toCode(const String & name)
{ {
int32_t res = *reinterpret_cast<const int32_t *>(name.data()); return Coordination::fourLetterCommandNameToCode(name);
/// keep consistent with Coordination::read method by changing big endian to little endian.
return __builtin_bswap32(res);
} }
IFourLetterCommand::~IFourLetterCommand() = default; IFourLetterCommand::~IFourLetterCommand() = default;
@ -132,6 +131,9 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat
FourLetterCommandPtr recovery_command = std::make_shared<RecoveryCommand>(keeper_dispatcher); FourLetterCommandPtr recovery_command = std::make_shared<RecoveryCommand>(keeper_dispatcher);
factory.registerCommand(recovery_command); factory.registerCommand(recovery_command);
FourLetterCommandPtr api_version_command = std::make_shared<ApiVersionCommand>(keeper_dispatcher);
factory.registerCommand(api_version_command);
factory.initializeAllowList(keeper_dispatcher); factory.initializeAllowList(keeper_dispatcher);
factory.setInitialize(true); factory.setInitialize(true);
} }
@ -247,6 +249,8 @@ String MonitorCommand::run()
print(ret, "synced_followers", keeper_info.synced_follower_count); print(ret, "synced_followers", keeper_info.synced_follower_count);
} }
print(ret, "api_version", static_cast<uint64_t>(Coordination::current_keeper_api_version));
return ret.str(); return ret.str();
} }
@ -463,4 +467,9 @@ String RecoveryCommand::run()
return "ok"; return "ok";
} }
String ApiVersionCommand::run()
{
return toString(static_cast<uint8_t>(Coordination::current_keeper_api_version));
}
} }

View File

@ -315,4 +315,16 @@ struct RecoveryCommand : public IFourLetterCommand
String run() override; String run() override;
~RecoveryCommand() override = default; ~RecoveryCommand() override = default;
}; };
struct ApiVersionCommand : public IFourLetterCommand
{
explicit ApiVersionCommand(KeeperDispatcher & keeper_dispatcher_)
: IFourLetterCommand(keeper_dispatcher_)
{
}
String name() override { return "apiv"; }
String run() override;
~ApiVersionCommand() override = default;
};
} }

View File

@ -843,9 +843,6 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
{ {
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request); Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
if (request.path == Coordination::keeper_api_version_path)
return {};
if (!storage.uncommitted_state.getNode(request.path)) if (!storage.uncommitted_state.getNode(request.path))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}}; return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
@ -868,16 +865,6 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
} }
} }
// We cannot store the node because the result should be connected to the binary itself
// this way we avoid incorrect results when we read a snapshot from older Keeper that can have
// lower API version
if (request.path == Coordination::keeper_api_version_path)
{
response.data = std::to_string(static_cast<uint8_t>(Coordination::current_keeper_api_version));
response.error = Coordination::Error::ZOK;
return response_ptr;
}
auto & container = storage.container; auto & container = storage.container;
auto node_it = container.find(request.path); auto node_it = container.find(request.path);
if (node_it == container.end()) if (node_it == container.end())
@ -924,12 +911,6 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
std::vector<KeeperStorage::Delta> new_deltas; std::vector<KeeperStorage::Delta> new_deltas;
if (request.path == Coordination::keeper_api_version_path)
{
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to delete an internal Keeper path ({}) which is not allowed", Coordination::keeper_api_version_path);
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
const auto update_parent_pzxid = [&]() const auto update_parent_pzxid = [&]()
{ {
auto parent_path = parentPath(request.path); auto parent_path = parentPath(request.path);
@ -1076,12 +1057,6 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
std::vector<KeeperStorage::Delta> new_deltas; std::vector<KeeperStorage::Delta> new_deltas;
if (request.path == Coordination::keeper_api_version_path)
{
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to update an internal Keeper path ({}) which is not allowed", Coordination::keeper_api_version_path);
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
if (!storage.uncommitted_state.getNode(request.path)) if (!storage.uncommitted_state.getNode(request.path))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}}; return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
@ -1343,12 +1318,6 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
{ {
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request); Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
if (request.path == Coordination::keeper_api_version_path)
{
LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to update an internal Keeper path ({}) which is not allowed", Coordination::keeper_api_version_path);
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}};
}
auto & uncommitted_state = storage.uncommitted_state; auto & uncommitted_state = storage.uncommitted_state;
if (!uncommitted_state.getNode(request.path)) if (!uncommitted_state.getNode(request.path))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}}; return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};