Server and client pinging each other

This commit is contained in:
alesapin 2020-10-30 17:16:47 +03:00
parent 5e2a3d12d7
commit c2525ef211
9 changed files with 510 additions and 3 deletions

View File

@ -949,6 +949,22 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Listening for connections with native protocol (tcp): {}", address.toString());
});
/// TCP TestKeeper
create_server("test_keeper_tcp_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, false, true),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections to fake zookeeper (tcp): {}", address.toString());
});
/// TCP with SSL
create_server("tcp_port_secure", [&](UInt16 port)
{

View File

@ -0,0 +1,6 @@
#include <Common/ZooKeeper/TestKeeperStorage.h>
namespace DB
{
}

View File

@ -0,0 +1,77 @@
#pragma once
#include <Common/ThreadPool.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
namespace zkutil
{
using namespace DB;
class TestKeeperStorage
{
struct TestKeeperRequest;
using TestKeeperRequestPtr = std::shared_ptr<TestKeeperRequest>;
std::atomic<int64_t> session_id_counter{0};
struct Node
{
String data;
Coordination::ACLs acls;
bool is_ephemeral = false;
bool is_sequental = false;
Coordination::Stat stat{};
int32_t seq_num = 0;
};
using Container = std::map<std::string, Node>;
using WatchCallbacks = std::vector<Coordination::WatchCallback>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
Container container;
String root_path;
std::atomic<int64_t> zxid{0};
Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children).
using clock = std::chrono::steady_clock;
struct RequestInfo
{
TestKeeperRequestPtr request;
Coordination::ResponseCallback callback;
Coordination::WatchCallback watch;
clock::time_point time;
};
using RequestsQueue = ConcurrentBoundedQueue<RequestInfo>;
RequestsQueue requests_queue{1};
void pushRequest(RequestInfo && request);
void finalize();
ThreadFromGlobalPool processing_thread;
void processingThread();
public:
void putRequest(const Coordination::ZooKeeperRequestPtr & request, std::shared_ptr<WriteBuffer> response_out);
int64_t getSessionID()
{
return session_id_counter.fetch_add(1);
}
int64_t getZXID()
{
return zxid.fetch_add(1);
}
};
}

View File

@ -25,7 +25,6 @@ namespace Coordination
using XID = int32_t;
using OpNum = int32_t;
struct ZooKeeperResponse : virtual Response
{
virtual ~ZooKeeperResponse() override = default;

View File

@ -12,6 +12,7 @@
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/thread_local_rng.h>
#include <Common/ZooKeeper/TestKeeperStorage.h>
#include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
@ -300,6 +301,8 @@ struct ContextShared
mutable zkutil::ZooKeeperPtr zookeeper; /// Client for ZooKeeper.
mutable std::mutex test_keeper_storage_mutex;
mutable std::shared_ptr<zkutil::TestKeeperStorage> test_keeper_storage;
mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
@ -1492,6 +1495,15 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
return shared->zookeeper;
}
std::shared_ptr<zkutil::TestKeeperStorage> & Context::getTestKeeperStorage() const
{
std::lock_guard lock(shared->test_keeper_storage_mutex);
if (!shared->test_keeper_storage)
shared->test_keeper_storage = std::make_shared<zkutil::TestKeeperStorage>();
return shared->test_keeper_storage;
}
zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
{
std::lock_guard lock(shared->auxiliary_zookeepers_mutex);

View File

@ -40,6 +40,7 @@ namespace Poco
namespace zkutil
{
class ZooKeeper;
class TestKeeperStorage;
}
@ -483,6 +484,8 @@ public:
std::shared_ptr<zkutil::ZooKeeper> getZooKeeper() const;
/// Same as above but return a zookeeper connection from auxiliary_zookeepers configuration entry.
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
std::shared_ptr<zkutil::TestKeeperStorage> & getTestKeeperStorage() const;
/// Has ready or expired ZooKeeper
bool hasZooKeeper() const;
/// Reset current zookeeper session. Do not create a new one.

View File

@ -5,6 +5,7 @@
#include <common/logger_useful.h>
#include <Server/IServer.h>
#include <Server/TCPHandler.h>
#include <Server/TestKeeperTCPHandler.h>
namespace Poco { class Logger; }
@ -16,6 +17,7 @@ class TCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory
private:
IServer & server;
Poco::Logger * log;
bool test_keeper;
class DummyTCPHandler : public Poco::Net::TCPServerConnection
{
@ -25,9 +27,10 @@ private:
};
public:
explicit TCPHandlerFactory(IServer & server_, bool secure_ = false)
explicit TCPHandlerFactory(IServer & server_, bool secure_ = false, bool test_keeper_ = false)
: server(server_)
, log(&Poco::Logger::get(std::string("TCP") + (secure_ ? "S" : "") + "HandlerFactory"))
, test_keeper(test_keeper_)
{
}
@ -36,7 +39,11 @@ public:
try
{
LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString());
return new TCPHandler(server, socket);
if (test_keeper)
return new TestKeeperTCPHandler(server, socket);
else
return new TCPHandler(server, socket);
}
catch (const Poco::Net::NetException &)
{

View File

@ -0,0 +1,339 @@
#include <Server/TestKeeperTCPHandler.h>
#include <Core/Types.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <Poco/Net/NetException.h>
#include <Common/CurrentThread.h>
#include <Common/Stopwatch.h>
#include <Common/NetException.h>
#include <Common/setThreadName.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-function"
#endif
/// ZooKeeper has 1 MB node size and serialization limit by default,
/// but it can be raised up, so we have a slightly larger limit on our side.
#define MAX_STRING_OR_ARRAY_SIZE (1 << 28) /// 256 MiB
/// Assuming we are at little endian.
static void write(int64_t x, WriteBuffer & out)
{
x = __builtin_bswap64(x);
writeBinary(x, out);
}
static void write(int32_t x, WriteBuffer & out)
{
x = __builtin_bswap32(x);
writeBinary(x, out);
}
static void write(bool x, WriteBuffer & out)
{
writeBinary(x, out);
}
static void write(const String & s, WriteBuffer & out)
{
write(int32_t(s.size()), out);
out.write(s.data(), s.size());
}
template <size_t N> void write(std::array<char, N> s, WriteBuffer & out)
{
write(int32_t(N), out);
out.write(s.data(), N);
}
template <typename T> void write(const std::vector<T> & arr, WriteBuffer & out)
{
write(int32_t(arr.size()), out);
for (const auto & elem : arr)
write(elem, out);
}
static void write(const Coordination::ACL & acl, WriteBuffer & out)
{
write(acl.permissions, out);
write(acl.scheme, out);
write(acl.id, out);
}
static void write(const Coordination::Stat & stat, WriteBuffer & out)
{
write(stat.czxid, out);
write(stat.mzxid, out);
write(stat.ctime, out);
write(stat.mtime, out);
write(stat.version, out);
write(stat.cversion, out);
write(stat.aversion, out);
write(stat.ephemeralOwner, out);
write(stat.dataLength, out);
write(stat.numChildren, out);
write(stat.pzxid, out);
}
static void write(const Coordination::Error & x, WriteBuffer & out)
{
write(static_cast<int32_t>(x), out);
}
static void read(int64_t & x, ReadBuffer & in)
{
readBinary(x, in);
x = __builtin_bswap64(x);
}
static void read(int32_t & x, ReadBuffer & in)
{
readBinary(x, in);
x = __builtin_bswap32(x);
}
static void read(Coordination::Error & x, ReadBuffer & in)
{
int32_t code;
read(code, in);
x = Coordination::Error(code);
}
static void read(bool & x, ReadBuffer & in)
{
readBinary(x, in);
}
static void read(String & s, ReadBuffer & in)
{
int32_t size = 0;
read(size, in);
if (size == -1)
{
/// It means that zookeeper node has NULL value. We will treat it like empty string.
s.clear();
return;
}
if (size < 0)
throw Exception("Negative size while reading string from ZooKeeper", ErrorCodes::LOGICAL_ERROR);
if (size > MAX_STRING_OR_ARRAY_SIZE)
throw Exception("Too large string size while reading from ZooKeeper", ErrorCodes::LOGICAL_ERROR);
s.resize(size);
in.read(s.data(), size);
}
template <size_t N> void read(std::array<char, N> & s, ReadBuffer & in)
{
int32_t size = 0;
read(size, in);
if (size != N)
throw Exception("Unexpected array size while reading from ZooKeeper", ErrorCodes::LOGICAL_ERROR);
in.read(s.data(), N);
}
static void read(Coordination::Stat & stat, ReadBuffer & in)
{
read(stat.czxid, in);
read(stat.mzxid, in);
read(stat.ctime, in);
read(stat.mtime, in);
read(stat.version, in);
read(stat.cversion, in);
read(stat.aversion, in);
read(stat.ephemeralOwner, in);
read(stat.dataLength, in);
read(stat.numChildren, in);
read(stat.pzxid, in);
}
template <typename T> void read(std::vector<T> & arr, ReadBuffer & in)
{
int32_t size = 0;
read(size, in);
if (size < 0)
throw Exception("Negative size while reading array from ZooKeeper", ErrorCodes::LOGICAL_ERROR);
if (size > MAX_STRING_OR_ARRAY_SIZE)
throw Exception("Too large array size while reading from ZooKeeper", ErrorCodes::LOGICAL_ERROR);
arr.resize(size);
for (auto & elem : arr)
read(elem, in);
}
static void read(Coordination::ACL & acl, ReadBuffer & in)
{
read(acl.permissions, in);
read(acl.scheme, in);
read(acl.id, in);
}
#ifdef __clang__
#pragma clang diagnostic pop
#endif
void TestKeeperTCPHandler::sendHandshake()
{
static constexpr int32_t handshake_length = 36;
static constexpr int32_t protocol_version = 0;
static constexpr int32_t DEFAULT_SESSION_TIMEOUT = 30000;
write(handshake_length, *out);
write(protocol_version, *out);
write(DEFAULT_SESSION_TIMEOUT, *out);
write(test_keeper_storage->getSessionID(), *out);
constexpr int32_t passwd_len = 16;
std::array<char, passwd_len> passwd{};
write(passwd, *out);
out->next();
}
void TestKeeperTCPHandler::run()
{
runImpl();
}
void TestKeeperTCPHandler::receiveHandshake()
{
int32_t handshake_length;
int32_t protocol_version;
int64_t last_zxid_seen;
int32_t timeout;
int64_t previous_session_id = 0; /// We don't support session restore. So previous session_id is always zero.
constexpr int32_t passwd_len = 16;
std::array<char, passwd_len> passwd {};
read(handshake_length, *in);
if (handshake_length != 44)
throw Exception("Unexpected handshake length received: " + toString(handshake_length), ErrorCodes::LOGICAL_ERROR);
read(protocol_version, *in);
if (protocol_version != 0)
throw Exception("Unexpected protocol version: " + toString(protocol_version), ErrorCodes::LOGICAL_ERROR);
read(last_zxid_seen, *in);
if (last_zxid_seen != 0)
throw Exception("Non zero last_zxid_seen is not supported", ErrorCodes::LOGICAL_ERROR);
read(timeout, *in);
read(previous_session_id, *in);
if (previous_session_id != 0)
throw Exception("Non zero previous session id is not supported", ErrorCodes::LOGICAL_ERROR);
read(passwd, *in);
}
void TestKeeperTCPHandler::runImpl()
{
setThreadName("TstKprHandler");
ThreadStatus thread_status;
auto global_receive_timeout = global_context.getSettingsRef().receive_timeout;
auto global_send_timeout = global_context.getSettingsRef().send_timeout;
socket().setReceiveTimeout(global_receive_timeout);
socket().setSendTimeout(global_send_timeout);
socket().setNoDelay(true);
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
out = std::make_shared<WriteBufferFromPocoSocket>(socket());
if (in->eof())
{
LOG_WARNING(log, "Client has not sent any data.");
return;
}
try
{
receiveHandshake();
}
catch (const Exception & e) /// Typical for an incorrect username, password, or address.
{
LOG_DEBUG(log, "Cannot receive handshake {}", e.displayText());
}
sendHandshake();
while (true)
{
UInt64 max_wait = operation_timeout.totalMicroseconds();
if (in->poll(max_wait))
{
receiveHeartbeatRequest();
sendHeartbeatResponse();
}
}
}
void TestKeeperTCPHandler::receiveHeartbeatRequest()
{
LOG_DEBUG(log, "Receiving heartbeat event");
int32_t length;
read(length, *in);
int32_t total_count = in->count();
LOG_DEBUG(log, "RECEIVED LENGTH {}", length);
int32_t xid;
LOG_DEBUG(log, "READING XID");
read(xid, *in);
LOG_DEBUG(log, "Received xid {}", xid);
if (xid == -2)
{
int32_t opnum;
read(opnum, *in);
LOG_DEBUG(log, "RRECEIVED OP NUM {}", opnum);
auto request = std::make_shared<Coordination::ZooKeeperHeartbeatRequest>();
request->readImpl(*in);
int32_t readed = in->count() - total_count;
if (readed != length)
LOG_DEBUG(log, "EXPECTED TO READ {}, BUT GOT {}", length, readed);
}
else
{
LOG_INFO(log, "UNKNOWN EVENT xid:{}", xid);
}
LOG_DEBUG(log, "Event received");
}
void TestKeeperTCPHandler::sendHeartbeatResponse()
{
LOG_DEBUG(log, "Sending heartbeat event");
int32_t length = sizeof(int32_t) + sizeof(int64_t) + sizeof(Coordination::Error);
write(length, *out);
int64_t zxid = test_keeper_storage->getZXID();
int32_t xid = -2;
write(xid, *out);
write(zxid, *out);
write(Coordination::Error::ZOK, *out);
auto response = std::make_shared<Coordination::ZooKeeperHeartbeatResponse>();
response->writeImpl(*out);
out->next();
}
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <Poco/Net/TCPServerConnection.h>
#include "IServer.h"
#include <Interpreters/Context.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/TestKeeperStorage.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocket.h>
namespace DB
{
class TestKeeperTCPHandler : public Poco::Net::TCPServerConnection
{
public:
TestKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, log(&Poco::Logger::get("TestKeeperTCPHandler"))
, global_context(server.context())
, test_keeper_storage(global_context.getTestKeeperStorage())
, operation_timeout(10000)
{
}
void run() override;
private:
IServer & server;
Poco::Logger * log;
Context global_context;
std::shared_ptr<zkutil::TestKeeperStorage> test_keeper_storage;
Poco::Timespan operation_timeout;
/// Streams for reading/writing from/to client connection socket.
std::shared_ptr<ReadBufferFromPocoSocket> in;
std::shared_ptr<WriteBufferFromPocoSocket> out;
void runImpl();
void sendHandshake();
void receiveHandshake();
void receiveHeartbeatRequest();
void sendHeartbeatResponse();
};
}