Add an adapter for protocol servers.

This commit is contained in:
Vitaly Baranov 2020-10-08 03:23:10 +03:00
parent a327f24e3c
commit ff62fd4967
6 changed files with 145 additions and 43 deletions

View File

@ -64,6 +64,7 @@
#include <Common/ThreadFuzzer.h>
#include <Server/MySQLHandlerFactory.h>
#include <Server/PostgreSQLHandlerFactory.h>
#include <Server/ProtocolServerAdapter.h>
#if !defined(ARCADIA_BUILD)
@ -811,7 +812,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
std::vector<std::unique_ptr<Poco::Net::TCPServer>> servers;
std::vector<ProtocolServerAdapter> servers;
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");
@ -823,29 +824,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
listen_try = true;
}
#if USE_GRPC
std::vector<std::unique_ptr<GRPCServer>> grpc_servers;
auto start_grpc_servers = [&]
{
for (auto & server : grpc_servers)
{
if (server)
server_pool.start(*server);
}
};
auto stop_grpc_servers = [&]
{
for (auto & server : grpc_servers)
{
if (server)
server->stop();
}
};
#else
auto start_grpc_servers = []{};
auto stop_grpc_servers = []{};
#endif
auto make_socket_address = [&](const std::string & host, UInt16 port)
{
Poco::Net::SocketAddress socket_address;
@ -1067,7 +1045,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
create_server("grpc_port", [&](UInt16 port)
{
Poco::Net::SocketAddress server_address(listen_host, port);
grpc_servers.emplace_back(new GRPCServer(server_address.toString(), *this));
servers.emplace_back(std::make_unique<GRPCServer>(*this, server_pool, make_socket_address(listen_host, port)));
LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString());
});
#endif
@ -1093,9 +1071,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->enableNamedSessions();
for (auto & server : servers)
server->start();
start_grpc_servers();
server.start();
{
String level_str = config().getString("text_log.level", "");
@ -1127,12 +1103,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
int current_connections = 0;
for (auto & server : servers)
{
server->stop();
current_connections += server->currentConnections();
server.stop();
current_connections += server.currentConnections();
}
stop_grpc_servers();
if (current_connections)
LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
else
@ -1150,7 +1124,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
current_connections = 0;
for (auto & server : servers)
current_connections += server->currentConnections();
current_connections += server.currentConnections();
if (!current_connections)
break;
sleep_current_ms += sleep_one_ms;

View File

@ -510,20 +510,25 @@ namespace
}
GRPCServer::GRPCServer(std::string address_to_listen_, IServer & server_) : iserver(server_), log(&Poco::Logger::get("GRPCHandler"))
GRPCServer::GRPCServer(IServer & iserver_, Poco::ThreadPool & thread_pool_, const Poco::Net::SocketAddress & address_to_listen_)
: iserver(iserver_), thread_pool(thread_pool_), address_to_listen(address_to_listen_), log(&Poco::Logger::get("GRPCServer"))
{}
GRPCServer::~GRPCServer() = default;
void GRPCServer::start()
{
grpc::ServerBuilder builder;
builder.AddListeningPort(address_to_listen_, grpc::InsecureServerCredentials());
builder.AddListeningPort(address_to_listen.toString(), grpc::InsecureServerCredentials());
//keepalive pings default values
builder.RegisterService(&grpc_service);
builder.SetMaxReceiveMessageSize(INT_MAX);
notification_cq = builder.AddCompletionQueue();
new_call_cq = builder.AddCompletionQueue();
grpc_server = builder.BuildAndStart();
thread_pool.start(*this);
}
GRPCServer::~GRPCServer() = default;
void GRPCServer::stop()
{
grpc_server->Shutdown();
@ -531,6 +536,11 @@ void GRPCServer::stop()
new_call_cq->Shutdown();
}
size_t GRPCServer::currentConnections() const
{
return 0; //TODO
}
void GRPCServer::run()
{
HandleRpcs();

View File

@ -6,9 +6,14 @@
#if USE_GRPC
#include <Poco/Runnable.h>
#include <Poco/Net/SocketAddress.h>
#include "clickhouse_grpc.grpc.pb.h"
namespace Poco { class Logger; }
namespace Poco
{
class Logger;
class ThreadPool;
}
namespace grpc
{
@ -23,24 +28,31 @@ class IServer;
class GRPCServer final : public Poco::Runnable
{
public:
GRPCServer(const GRPCServer & handler) = delete;
GRPCServer(GRPCServer && handler) = delete;
GRPCServer(std::string address_to_listen_, IServer & server_);
GRPCServer(IServer & server_, Poco::ThreadPool & thread_pool_, const Poco::Net::SocketAddress & address_to_listen_);
~GRPCServer() override;
/// Starts the server. A new thread will be created that waits for and accepts incoming connections.
void start();
/// Stops the server. No new connections will be accepted.
void stop();
/// Returns the number of currently handled connections.
size_t currentConnections() const;
private:
virtual void run() override;
void HandleRpcs();
private:
using GRPCService = clickhouse::grpc::ClickHouse::AsyncService;
IServer & iserver;
Poco::ThreadPool & thread_pool;
Poco::Net::SocketAddress address_to_listen;
Poco::Logger * log;
std::unique_ptr<grpc::ServerCompletionQueue> notification_cq;
std::unique_ptr<grpc::ServerCompletionQueue> new_call_cq;
GRPCService grpc_service;
std::unique_ptr<grpc::Server> grpc_server;
std::string address_to_listen;
};
}
#endif

View File

@ -0,0 +1,50 @@
#include <Server/ProtocolServerAdapter.h>
#include <Poco/Net/TCPServer.h>
#if USE_GRPC
#include <Server/GRPCServer.h>
#endif
namespace DB
{
class ProtocolServerAdapter::TCPServerAdapterImpl : public Impl
{
public:
explicit TCPServerAdapterImpl(std::unique_ptr<Poco::Net::TCPServer> tcp_server_) : tcp_server(std::move(tcp_server_)) {}
~TCPServerAdapterImpl() override = default;
void start() override { tcp_server->start(); }
void stop() override { tcp_server->stop(); }
size_t currentConnections() const override { return tcp_server->currentConnections(); }
private:
std::unique_ptr<Poco::Net::TCPServer> tcp_server;
};
ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr<Poco::Net::TCPServer> tcp_server_)
{
impl = std::make_unique<TCPServerAdapterImpl>(std::move(tcp_server_));
}
#if USE_GRPC
class ProtocolServerAdapter::GRPCServerAdapterImpl : public Impl
{
public:
explicit GRPCServerAdapterImpl(std::unique_ptr<GRPCServer> grpc_server_) : grpc_server(std::move(grpc_server_)) {}
~GRPCServerAdapterImpl() override = default;
void start() override { grpc_server->start(); }
void stop() override { grpc_server->stop(); }
size_t currentConnections() const override { return grpc_server->currentConnections(); }
private:
std::unique_ptr<GRPCServer> grpc_server;
};
ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr<GRPCServer> grpc_server_)
{
impl = std::make_unique<GRPCServerAdapterImpl>(std::move(grpc_server_));
}
#endif
}

View File

@ -0,0 +1,55 @@
#pragma once
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#include <memory>
namespace Poco::Net { class TCPServer; }
namespace DB
{
class GRPCServer;
/// Provides an unified interface to access a protocol implementing server
/// no matter what type it has (HTTPServer, TCPServer, MySQLServer, GRPCServer, ...).
class ProtocolServerAdapter
{
public:
ProtocolServerAdapter() {}
ProtocolServerAdapter(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default;
~ProtocolServerAdapter() {}
ProtocolServerAdapter(std::unique_ptr<Poco::Net::TCPServer> tcp_server_);
#if USE_GRPC
ProtocolServerAdapter(std::unique_ptr<GRPCServer> grpc_server_);
#endif
/// Starts the server. A new thread will be created that waits for and accepts incoming connections.
void start() { impl->start(); }
/// Stops the server. No new connections will be accepted.
void stop() { impl->stop(); }
/// Returns the number of currently handled connections.
size_t currentConnections() const { return impl->currentConnections(); }
private:
class Impl
{
public:
virtual ~Impl() {}
virtual void start() = 0;
virtual void stop() = 0;
virtual size_t currentConnections() const = 0;
};
class TCPServerAdapterImpl;
class GRPCServerAdapterImpl;
std::unique_ptr<Impl> impl;
};
}

View File

@ -21,6 +21,7 @@ SRCS(
PostgreSQLHandlerFactory.cpp
PrometheusMetricsWriter.cpp
PrometheusRequestHandler.cpp
ProtocolServerAdapter.cpp
ReplicasStatusHandler.cpp
StaticRequestHandler.cpp
TCPHandler.cpp