From 73e0c35ab070905c325dbac1a3530efa67d36dbc Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Fri, 15 Jul 2022 10:13:39 +0000 Subject: [PATCH] Use 4LW for api version --- src/Common/ZooKeeper/ZooKeeperCommon.cpp | 7 ++ src/Common/ZooKeeper/ZooKeeperCommon.h | 2 + src/Common/ZooKeeper/ZooKeeperIO.cpp | 6 ++ src/Common/ZooKeeper/ZooKeeperIO.h | 1 + src/Common/ZooKeeper/ZooKeeperImpl.cpp | 88 +++++++++++++---------- src/Common/ZooKeeper/ZooKeeperImpl.h | 4 +- src/Coordination/CoordinationSettings.cpp | 2 +- src/Coordination/FourLetterCommand.cpp | 15 +++- src/Coordination/FourLetterCommand.h | 12 ++++ src/Coordination/KeeperStorage.cpp | 31 -------- 10 files changed, 93 insertions(+), 75 deletions(-) diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.cpp b/src/Common/ZooKeeper/ZooKeeperCommon.cpp index b15126f5701..2cce22daae4 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/Common/ZooKeeper/ZooKeeperCommon.cpp @@ -898,4 +898,11 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory() registerZooKeeperRequest(*this); } +int32_t fourLetterCommandNameToCode(std::string_view name) +{ + int32_t res = *reinterpret_cast(name.data()); + /// keep consistent with Coordination::read method by changing big endian to little endian. + return __builtin_bswap32(res); +} + } diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index 53fabf651fa..36f33b0bb01 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -554,4 +554,6 @@ private: ZooKeeperRequestFactory(); }; +int32_t fourLetterCommandNameToCode(std::string_view name); + } diff --git a/src/Common/ZooKeeper/ZooKeeperIO.cpp b/src/Common/ZooKeeper/ZooKeeperIO.cpp index c84a8624d78..f796212ef0b 100644 --- a/src/Common/ZooKeeper/ZooKeeperIO.cpp +++ b/src/Common/ZooKeeper/ZooKeeperIO.cpp @@ -49,6 +49,12 @@ void write(const std::string & s, WriteBuffer & out) out.write(s.data(), s.size()); } +void write(std::string_view s, WriteBuffer & out) +{ + write(static_cast(s.size()), out); + out.write(s.data(), s.size()); +} + void write(const ACL & acl, WriteBuffer & out) { write(acl.permissions, out); diff --git a/src/Common/ZooKeeper/ZooKeeperIO.h b/src/Common/ZooKeeper/ZooKeeperIO.h index ec77b46f3d9..5e5503c504e 100644 --- a/src/Common/ZooKeeper/ZooKeeperIO.h +++ b/src/Common/ZooKeeper/ZooKeeperIO.h @@ -26,6 +26,7 @@ void write(uint8_t x, WriteBuffer & out); void write(OpNum x, WriteBuffer & out); void write(bool x, WriteBuffer & out); void write(const std::string & s, WriteBuffer & out); +void write(std::string_view s, WriteBuffer & out); void write(const ACL & acl, WriteBuffer & out); void write(const Stat & stat, WriteBuffer & out); void write(const Error & x, WriteBuffer & out); diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.cpp b/src/Common/ZooKeeper/ZooKeeperImpl.cpp index a0544935e25..4f9436947e9 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.cpp +++ b/src/Common/ZooKeeper/ZooKeeperImpl.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include @@ -354,12 +355,37 @@ ZooKeeper::ZooKeeper( send_thread = ThreadFromGlobalPool([this] { sendThread(); }); receive_thread = ThreadFromGlobalPool([this] { receiveThread(); }); - initApiVersion(); - ProfileEvents::increment(ProfileEvents::ZooKeeperInit); } +Poco::Net::StreamSocket ZooKeeper::connectToNode(const Poco::Net::SocketAddress & node_address, Poco::Timespan connection_timeout, bool is_secure) +{ + Poco::Net::StreamSocket result; + /// Reset the state of previous attempt. + if (is_secure) + { +#if USE_SSL + result = Poco::Net::SecureStreamSocket(); +#else + throw Poco::Exception( + "Communication with ZooKeeper over SSL is disabled because poco library was built without NetSSL support."); +#endif + } + else + { + result = Poco::Net::StreamSocket(); + } + + result.connect(node_address, connection_timeout); + + result.setReceiveTimeout(operation_timeout); + result.setSendTimeout(operation_timeout); + result.setNoDelay(true); + + return result; +} + void ZooKeeper::connect( const Nodes & nodes, Poco::Timespan connection_timeout) @@ -377,28 +403,9 @@ void ZooKeeper::connect( { try { - /// Reset the state of previous attempt. - if (node.secure) - { -#if USE_SSL - socket = Poco::Net::SecureStreamSocket(); -#else - throw Poco::Exception( - "Communication with ZooKeeper over SSL is disabled because poco library was built without NetSSL support."); -#endif - } - else - { - socket = Poco::Net::StreamSocket(); - } - - socket.connect(node.address, connection_timeout); + socket = connectToNode(node.address, connection_timeout, node.secure); socket_address = socket.peerAddress(); - socket.setReceiveTimeout(operation_timeout); - socket.setSendTimeout(operation_timeout); - socket.setNoDelay(true); - in.emplace(socket); out.emplace(socket); @@ -423,6 +430,9 @@ void ZooKeeper::connect( } connected = true; + + initApiVersion(node.address, connection_timeout, node.secure); + break; } catch (...) @@ -1066,29 +1076,29 @@ Coordination::KeeperApiVersion ZooKeeper::getApiVersion() return keeper_api_version; } -void ZooKeeper::initApiVersion() +void ZooKeeper::initApiVersion(const Poco::Net::SocketAddress & node_address, Poco::Timespan connection_timeout, bool is_secure) { - auto promise = std::make_shared>(); - auto future = promise->get_future(); - - auto callback = [promise](const Coordination::GetResponse & response) mutable + try { - promise->set_value(response); - }; + auto command_socket = connectToNode(node_address, connection_timeout, is_secure); - get(Coordination::keeper_api_version_path, std::move(callback), {}); - if (future.wait_for(std::chrono::milliseconds(operation_timeout.totalMilliseconds())) != std::future_status::ready) - return; + auto apiv_code = Coordination::fourLetterCommandNameToCode("apiv"); - auto response = future.get(); + WriteBufferFromPocoSocket command_out(command_socket); + Coordination::write(apiv_code, command_out); + command_out.next(); - if (response.error != Coordination::Error::ZOK) - return; + ReadBufferFromPocoSocket command_in(command_socket); + std::string result; + readStringUntilEOF(result, command_in); - uint8_t keeper_version{0}; - DB::ReadBufferFromOwnString buf(response.data); - DB::readIntText(keeper_version, buf); - keeper_api_version = static_cast(keeper_version); + auto read_version = parseFromString(result); + keeper_api_version = static_cast(read_version); + } + catch (const DB::Exception & e) + { + LOG_ERROR(&Poco::Logger::get("ZooKeeper"), "Failed to get version: {}", e.message()); + } } diff --git a/src/Common/ZooKeeper/ZooKeeperImpl.h b/src/Common/ZooKeeper/ZooKeeperImpl.h index 023e46f5017..ec7b43ec38f 100644 --- a/src/Common/ZooKeeper/ZooKeeperImpl.h +++ b/src/Common/ZooKeeper/ZooKeeperImpl.h @@ -254,6 +254,8 @@ private: const Nodes & node, Poco::Timespan connection_timeout); + Poco::Net::StreamSocket connectToNode(const Poco::Net::SocketAddress & node_address, Poco::Timespan connection_timeout, bool is_secure); + void sendHandshake(); void receiveHandshake(); @@ -277,7 +279,7 @@ private: void logOperationIfNeeded(const ZooKeeperRequestPtr & request, const ZooKeeperResponsePtr & response = nullptr, bool finalize = false); - void initApiVersion(); + void initApiVersion(const Poco::Net::SocketAddress & node_address, Poco::Timespan connection_timeout, bool is_secure); CurrentMetrics::Increment active_session_metric_increment{CurrentMetrics::ZooKeeperSession}; std::shared_ptr zk_log; diff --git a/src/Coordination/CoordinationSettings.cpp b/src/Coordination/CoordinationSettings.cpp index 34d69967828..046659af01e 100644 --- a/src/Coordination/CoordinationSettings.cpp +++ b/src/Coordination/CoordinationSettings.cpp @@ -37,7 +37,7 @@ void CoordinationSettings::loadFromConfig(const String & config_elem, const Poco } -const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr"; +const String KeeperConfigurationAndSettings::DEFAULT_FOUR_LETTER_WORD_CMD = "conf,cons,crst,envi,ruok,srst,srvr,stat,wchs,dirs,mntr,isro,rcvr,apiv"; KeeperConfigurationAndSettings::KeeperConfigurationAndSettings() : server_id(NOT_EXIST) diff --git a/src/Coordination/FourLetterCommand.cpp b/src/Coordination/FourLetterCommand.cpp index cec107806b7..ab86d1a3a0c 100644 --- a/src/Coordination/FourLetterCommand.cpp +++ b/src/Coordination/FourLetterCommand.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -39,9 +40,7 @@ String IFourLetterCommand::toName(int32_t code) int32_t IFourLetterCommand::toCode(const String & name) { - int32_t res = *reinterpret_cast(name.data()); - /// keep consistent with Coordination::read method by changing big endian to little endian. - return __builtin_bswap32(res); + return Coordination::fourLetterCommandNameToCode(name); } IFourLetterCommand::~IFourLetterCommand() = default; @@ -132,6 +131,9 @@ void FourLetterCommandFactory::registerCommands(KeeperDispatcher & keeper_dispat FourLetterCommandPtr recovery_command = std::make_shared(keeper_dispatcher); factory.registerCommand(recovery_command); + FourLetterCommandPtr api_version_command = std::make_shared(keeper_dispatcher); + factory.registerCommand(api_version_command); + factory.initializeAllowList(keeper_dispatcher); factory.setInitialize(true); } @@ -247,6 +249,8 @@ String MonitorCommand::run() print(ret, "synced_followers", keeper_info.synced_follower_count); } + print(ret, "api_version", static_cast(Coordination::current_keeper_api_version)); + return ret.str(); } @@ -463,4 +467,9 @@ String RecoveryCommand::run() return "ok"; } +String ApiVersionCommand::run() +{ + return toString(static_cast(Coordination::current_keeper_api_version)); +} + } diff --git a/src/Coordination/FourLetterCommand.h b/src/Coordination/FourLetterCommand.h index b5d08f4c250..8a98b94b33a 100644 --- a/src/Coordination/FourLetterCommand.h +++ b/src/Coordination/FourLetterCommand.h @@ -315,4 +315,16 @@ struct RecoveryCommand : public IFourLetterCommand String run() override; ~RecoveryCommand() override = default; }; + +struct ApiVersionCommand : public IFourLetterCommand +{ + explicit ApiVersionCommand(KeeperDispatcher & keeper_dispatcher_) + : IFourLetterCommand(keeper_dispatcher_) + { + } + + String name() override { return "apiv"; } + String run() override; + ~ApiVersionCommand() override = default; +}; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index d07caeaf496..2eec5756b35 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -843,9 +843,6 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce { Coordination::ZooKeeperGetRequest & request = dynamic_cast(*zk_request); - if (request.path == Coordination::keeper_api_version_path) - return {}; - if (!storage.uncommitted_state.getNode(request.path)) return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}}; @@ -868,16 +865,6 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce } } - // We cannot store the node because the result should be connected to the binary itself - // this way we avoid incorrect results when we read a snapshot from older Keeper that can have - // lower API version - if (request.path == Coordination::keeper_api_version_path) - { - response.data = std::to_string(static_cast(Coordination::current_keeper_api_version)); - response.error = Coordination::Error::ZOK; - return response_ptr; - } - auto & container = storage.container; auto node_it = container.find(request.path); if (node_it == container.end()) @@ -924,12 +911,6 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr std::vector new_deltas; - if (request.path == Coordination::keeper_api_version_path) - { - LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to delete an internal Keeper path ({}) which is not allowed", Coordination::keeper_api_version_path); - return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}}; - } - const auto update_parent_pzxid = [&]() { auto parent_path = parentPath(request.path); @@ -1076,12 +1057,6 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce std::vector new_deltas; - if (request.path == Coordination::keeper_api_version_path) - { - LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to update an internal Keeper path ({}) which is not allowed", Coordination::keeper_api_version_path); - return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}}; - } - if (!storage.uncommitted_state.getNode(request.path)) return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}}; @@ -1343,12 +1318,6 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr { Coordination::ZooKeeperSetACLRequest & request = dynamic_cast(*zk_request); - if (request.path == Coordination::keeper_api_version_path) - { - LOG_ERROR(&Poco::Logger::get("KeeperStorage"), "Trying to update an internal Keeper path ({}) which is not allowed", Coordination::keeper_api_version_path); - return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADARGUMENTS}}; - } - auto & uncommitted_state = storage.uncommitted_state; if (!uncommitted_state.getNode(request.path)) return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};