From c2525ef211a5cc1b93914cc67a7a6a36a2f2df5e Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 30 Oct 2020 17:16:47 +0300 Subject: [PATCH] Server and client pinging each other --- programs/server/Server.cpp | 16 + src/Common/ZooKeeper/TestKeeperStorage.cpp | 6 + src/Common/ZooKeeper/TestKeeperStorage.h | 77 +++++ src/Common/ZooKeeper/ZooKeeperCommon.h | 1 - src/Interpreters/Context.cpp | 12 + src/Interpreters/Context.h | 3 + src/Server/TCPHandlerFactory.h | 11 +- src/Server/TestKeeperTCPHandler.cpp | 339 +++++++++++++++++++++ src/Server/TestKeeperTCPHandler.h | 48 +++ 9 files changed, 510 insertions(+), 3 deletions(-) create mode 100644 src/Common/ZooKeeper/TestKeeperStorage.cpp create mode 100644 src/Common/ZooKeeper/TestKeeperStorage.h create mode 100644 src/Server/TestKeeperTCPHandler.cpp create mode 100644 src/Server/TestKeeperTCPHandler.h diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index b85cb5e75f2..f855a76e362 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -949,6 +949,22 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "Listening for connections with native protocol (tcp): {}", address.toString()); }); + /// TCP TestKeeper + create_server("test_keeper_tcp_port", [&](UInt16 port) + { + Poco::Net::ServerSocket socket; + auto address = socket_bind_listen(socket, listen_host, port); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + servers.emplace_back(std::make_unique( + new TCPHandlerFactory(*this, false, true), + server_pool, + socket, + new Poco::Net::TCPServerParams)); + + LOG_INFO(log, "Listening for connections to fake zookeeper (tcp): {}", address.toString()); + }); + /// TCP with SSL create_server("tcp_port_secure", [&](UInt16 port) { diff --git a/src/Common/ZooKeeper/TestKeeperStorage.cpp b/src/Common/ZooKeeper/TestKeeperStorage.cpp new file mode 100644 index 00000000000..c238fa2620f --- /dev/null +++ b/src/Common/ZooKeeper/TestKeeperStorage.cpp @@ -0,0 +1,6 @@ +#include + +namespace DB +{ + +} diff --git a/src/Common/ZooKeeper/TestKeeperStorage.h b/src/Common/ZooKeeper/TestKeeperStorage.h new file mode 100644 index 00000000000..f0c8a942dff --- /dev/null +++ b/src/Common/ZooKeeper/TestKeeperStorage.h @@ -0,0 +1,77 @@ +#pragma once + +#include +#include +#include +#include + +namespace zkutil +{ + +using namespace DB; + +class TestKeeperStorage +{ + struct TestKeeperRequest; + using TestKeeperRequestPtr = std::shared_ptr; + + std::atomic session_id_counter{0}; + + struct Node + { + String data; + Coordination::ACLs acls; + bool is_ephemeral = false; + bool is_sequental = false; + Coordination::Stat stat{}; + int32_t seq_num = 0; + }; + + using Container = std::map; + + using WatchCallbacks = std::vector; + using Watches = std::map; + + Container container; + + String root_path; + + std::atomic zxid{0}; + + Watches watches; + Watches list_watches; /// Watches for 'list' request (watches on children). + + using clock = std::chrono::steady_clock; + + struct RequestInfo + { + TestKeeperRequestPtr request; + Coordination::ResponseCallback callback; + Coordination::WatchCallback watch; + clock::time_point time; + }; + using RequestsQueue = ConcurrentBoundedQueue; + RequestsQueue requests_queue{1}; + + void pushRequest(RequestInfo && request); + + void finalize(); + + ThreadFromGlobalPool processing_thread; + + void processingThread(); + +public: + void putRequest(const Coordination::ZooKeeperRequestPtr & request, std::shared_ptr response_out); + + int64_t getSessionID() + { + return session_id_counter.fetch_add(1); + } + int64_t getZXID() + { + return zxid.fetch_add(1); + } +}; + +} diff --git a/src/Common/ZooKeeper/ZooKeeperCommon.h b/src/Common/ZooKeeper/ZooKeeperCommon.h index 0b19869dd5a..6293cbb09fe 100644 --- a/src/Common/ZooKeeper/ZooKeeperCommon.h +++ b/src/Common/ZooKeeper/ZooKeeperCommon.h @@ -25,7 +25,6 @@ namespace Coordination using XID = int32_t; using OpNum = int32_t; - struct ZooKeeperResponse : virtual Response { virtual ~ZooKeeperResponse() override = default; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index b75e9ae9d58..863b179df90 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -300,6 +301,8 @@ struct ContextShared mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper. + mutable std::mutex test_keeper_storage_mutex; + mutable std::shared_ptr test_keeper_storage; mutable std::mutex auxiliary_zookeepers_mutex; mutable std::map auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients. @@ -1492,6 +1495,15 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const return shared->zookeeper; } +std::shared_ptr & Context::getTestKeeperStorage() const +{ + std::lock_guard lock(shared->test_keeper_storage_mutex); + if (!shared->test_keeper_storage) + shared->test_keeper_storage = std::make_shared(); + + return shared->test_keeper_storage; +} + zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const { std::lock_guard lock(shared->auxiliary_zookeepers_mutex); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index fbf578494ed..22e90d8eb5a 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -40,6 +40,7 @@ namespace Poco namespace zkutil { class ZooKeeper; + class TestKeeperStorage; } @@ -483,6 +484,8 @@ public: std::shared_ptr getZooKeeper() const; /// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry. std::shared_ptr getAuxiliaryZooKeeper(const String & name) const; + + std::shared_ptr & getTestKeeperStorage() const; /// Has ready or expired ZooKeeper bool hasZooKeeper() const; /// Reset current zookeeper session. Do not create a new one. diff --git a/src/Server/TCPHandlerFactory.h b/src/Server/TCPHandlerFactory.h index 5ecd427bf8b..945a2350508 100644 --- a/src/Server/TCPHandlerFactory.h +++ b/src/Server/TCPHandlerFactory.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace Poco { class Logger; } @@ -16,6 +17,7 @@ class TCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory private: IServer & server; Poco::Logger * log; + bool test_keeper; class DummyTCPHandler : public Poco::Net::TCPServerConnection { @@ -25,9 +27,10 @@ private: }; public: - explicit TCPHandlerFactory(IServer & server_, bool secure_ = false) + explicit TCPHandlerFactory(IServer & server_, bool secure_ = false, bool test_keeper_ = false) : server(server_) , log(&Poco::Logger::get(std::string("TCP") + (secure_ ? "S" : "") + "HandlerFactory")) + , test_keeper(test_keeper_) { } @@ -36,7 +39,11 @@ public: try { LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString()); - return new TCPHandler(server, socket); + + if (test_keeper) + return new TestKeeperTCPHandler(server, socket); + else + return new TCPHandler(server, socket); } catch (const Poco::Net::NetException &) { diff --git a/src/Server/TestKeeperTCPHandler.cpp b/src/Server/TestKeeperTCPHandler.cpp new file mode 100644 index 00000000000..a0679554f64 --- /dev/null +++ b/src/Server/TestKeeperTCPHandler.cpp @@ -0,0 +1,339 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + +} + +#ifdef __clang__ +#pragma clang diagnostic push +#pragma clang diagnostic ignored "-Wunused-function" +#endif + +/// ZooKeeper has 1 MB node size and serialization limit by default, +/// but it can be raised up, so we have a slightly larger limit on our side. +#define MAX_STRING_OR_ARRAY_SIZE (1 << 28) /// 256 MiB + +/// Assuming we are at little endian. + +static void write(int64_t x, WriteBuffer & out) +{ + x = __builtin_bswap64(x); + writeBinary(x, out); +} + +static void write(int32_t x, WriteBuffer & out) +{ + x = __builtin_bswap32(x); + writeBinary(x, out); +} + +static void write(bool x, WriteBuffer & out) +{ + writeBinary(x, out); +} + +static void write(const String & s, WriteBuffer & out) +{ + write(int32_t(s.size()), out); + out.write(s.data(), s.size()); +} + +template void write(std::array s, WriteBuffer & out) +{ + write(int32_t(N), out); + out.write(s.data(), N); +} + +template void write(const std::vector & arr, WriteBuffer & out) +{ + write(int32_t(arr.size()), out); + for (const auto & elem : arr) + write(elem, out); +} + +static void write(const Coordination::ACL & acl, WriteBuffer & out) +{ + write(acl.permissions, out); + write(acl.scheme, out); + write(acl.id, out); +} + +static void write(const Coordination::Stat & stat, WriteBuffer & out) +{ + write(stat.czxid, out); + write(stat.mzxid, out); + write(stat.ctime, out); + write(stat.mtime, out); + write(stat.version, out); + write(stat.cversion, out); + write(stat.aversion, out); + write(stat.ephemeralOwner, out); + write(stat.dataLength, out); + write(stat.numChildren, out); + write(stat.pzxid, out); +} + +static void write(const Coordination::Error & x, WriteBuffer & out) +{ + write(static_cast(x), out); +} + +static void read(int64_t & x, ReadBuffer & in) +{ + readBinary(x, in); + x = __builtin_bswap64(x); +} + +static void read(int32_t & x, ReadBuffer & in) +{ + readBinary(x, in); + x = __builtin_bswap32(x); +} + +static void read(Coordination::Error & x, ReadBuffer & in) +{ + int32_t code; + read(code, in); + x = Coordination::Error(code); +} + +static void read(bool & x, ReadBuffer & in) +{ + readBinary(x, in); +} + +static void read(String & s, ReadBuffer & in) +{ + int32_t size = 0; + read(size, in); + + if (size == -1) + { + /// It means that zookeeper node has NULL value. We will treat it like empty string. + s.clear(); + return; + } + + if (size < 0) + throw Exception("Negative size while reading string from ZooKeeper", ErrorCodes::LOGICAL_ERROR); + + if (size > MAX_STRING_OR_ARRAY_SIZE) + throw Exception("Too large string size while reading from ZooKeeper", ErrorCodes::LOGICAL_ERROR); + + s.resize(size); + in.read(s.data(), size); +} + +template void read(std::array & s, ReadBuffer & in) +{ + int32_t size = 0; + read(size, in); + if (size != N) + throw Exception("Unexpected array size while reading from ZooKeeper", ErrorCodes::LOGICAL_ERROR); + in.read(s.data(), N); +} + +static void read(Coordination::Stat & stat, ReadBuffer & in) +{ + read(stat.czxid, in); + read(stat.mzxid, in); + read(stat.ctime, in); + read(stat.mtime, in); + read(stat.version, in); + read(stat.cversion, in); + read(stat.aversion, in); + read(stat.ephemeralOwner, in); + read(stat.dataLength, in); + read(stat.numChildren, in); + read(stat.pzxid, in); +} + +template void read(std::vector & arr, ReadBuffer & in) +{ + int32_t size = 0; + read(size, in); + if (size < 0) + throw Exception("Negative size while reading array from ZooKeeper", ErrorCodes::LOGICAL_ERROR); + if (size > MAX_STRING_OR_ARRAY_SIZE) + throw Exception("Too large array size while reading from ZooKeeper", ErrorCodes::LOGICAL_ERROR); + arr.resize(size); + for (auto & elem : arr) + read(elem, in); +} + +static void read(Coordination::ACL & acl, ReadBuffer & in) +{ + read(acl.permissions, in); + read(acl.scheme, in); + read(acl.id, in); +} + +#ifdef __clang__ +#pragma clang diagnostic pop +#endif + +void TestKeeperTCPHandler::sendHandshake() +{ + static constexpr int32_t handshake_length = 36; + static constexpr int32_t protocol_version = 0; + static constexpr int32_t DEFAULT_SESSION_TIMEOUT = 30000; + + write(handshake_length, *out); + write(protocol_version, *out); + write(DEFAULT_SESSION_TIMEOUT, *out); + write(test_keeper_storage->getSessionID(), *out); + constexpr int32_t passwd_len = 16; + std::array passwd{}; + write(passwd, *out); + out->next(); +} + +void TestKeeperTCPHandler::run() +{ + runImpl(); +} + +void TestKeeperTCPHandler::receiveHandshake() +{ + int32_t handshake_length; + int32_t protocol_version; + int64_t last_zxid_seen; + int32_t timeout; + int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero. + constexpr int32_t passwd_len = 16; + std::array passwd {}; + + read(handshake_length, *in); + if (handshake_length != 44) + throw Exception("Unexpected handshake length received: " + toString(handshake_length), ErrorCodes::LOGICAL_ERROR); + + read(protocol_version, *in); + + if (protocol_version != 0) + throw Exception("Unexpected protocol version: " + toString(protocol_version), ErrorCodes::LOGICAL_ERROR); + + read(last_zxid_seen, *in); + + if (last_zxid_seen != 0) + throw Exception("Non zero last_zxid_seen is not supported", ErrorCodes::LOGICAL_ERROR); + + read(timeout, *in); + read(previous_session_id, *in); + + if (previous_session_id != 0) + throw Exception("Non zero previous session id is not supported", ErrorCodes::LOGICAL_ERROR); + + read(passwd, *in); +} + + +void TestKeeperTCPHandler::runImpl() +{ + setThreadName("TstKprHandler"); + ThreadStatus thread_status; + auto global_receive_timeout = global_context.getSettingsRef().receive_timeout; + auto global_send_timeout = global_context.getSettingsRef().send_timeout; + + socket().setReceiveTimeout(global_receive_timeout); + socket().setSendTimeout(global_send_timeout); + socket().setNoDelay(true); + + in = std::make_shared(socket()); + out = std::make_shared(socket()); + + if (in->eof()) + { + LOG_WARNING(log, "Client has not sent any data."); + return; + } + + try + { + receiveHandshake(); + } + catch (const Exception & e) /// Typical for an incorrect username, password, or address. + { + LOG_DEBUG(log, "Cannot receive handshake {}", e.displayText()); + } + + sendHandshake(); + + while (true) + { + UInt64 max_wait = operation_timeout.totalMicroseconds(); + if (in->poll(max_wait)) + { + receiveHeartbeatRequest(); + sendHeartbeatResponse(); + } + } +} + + +void TestKeeperTCPHandler::receiveHeartbeatRequest() +{ + LOG_DEBUG(log, "Receiving heartbeat event"); + int32_t length; + read(length, *in); + int32_t total_count = in->count(); + LOG_DEBUG(log, "RECEIVED LENGTH {}", length); + int32_t xid; + LOG_DEBUG(log, "READING XID"); + read(xid, *in); + + LOG_DEBUG(log, "Received xid {}", xid); + + if (xid == -2) + { + int32_t opnum; + read(opnum, *in); + LOG_DEBUG(log, "RRECEIVED OP NUM {}", opnum); + auto request = std::make_shared(); + request->readImpl(*in); + int32_t readed = in->count() - total_count; + if (readed != length) + LOG_DEBUG(log, "EXPECTED TO READ {}, BUT GOT {}", length, readed); + } + else + { + LOG_INFO(log, "UNKNOWN EVENT xid:{}", xid); + } + + LOG_DEBUG(log, "Event received"); +} + + +void TestKeeperTCPHandler::sendHeartbeatResponse() +{ + LOG_DEBUG(log, "Sending heartbeat event"); + int32_t length = sizeof(int32_t) + sizeof(int64_t) + sizeof(Coordination::Error); + write(length, *out); + int64_t zxid = test_keeper_storage->getZXID(); + int32_t xid = -2; + write(xid, *out); + write(zxid, *out); + write(Coordination::Error::ZOK, *out); + auto response = std::make_shared(); + response->writeImpl(*out); + out->next(); +} + +} diff --git a/src/Server/TestKeeperTCPHandler.h b/src/Server/TestKeeperTCPHandler.h new file mode 100644 index 00000000000..967ea1d29e7 --- /dev/null +++ b/src/Server/TestKeeperTCPHandler.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include "IServer.h" +#include +#include +#include +#include +#include + +namespace DB +{ + +class TestKeeperTCPHandler : public Poco::Net::TCPServerConnection +{ +public: + TestKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_) + : Poco::Net::TCPServerConnection(socket_) + , server(server_) + , log(&Poco::Logger::get("TestKeeperTCPHandler")) + , global_context(server.context()) + , test_keeper_storage(global_context.getTestKeeperStorage()) + , operation_timeout(10000) + { + } + + void run() override; +private: + IServer & server; + Poco::Logger * log; + Context global_context; + std::shared_ptr test_keeper_storage; + Poco::Timespan operation_timeout; + + /// Streams for reading/writing from/to client connection socket. + std::shared_ptr in; + std::shared_ptr out; + + void runImpl(); + + void sendHandshake(); + void receiveHandshake(); + + void receiveHeartbeatRequest(); + void sendHeartbeatResponse(); +}; + +}