diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index dea82d949bd..8228a1fcc74 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -64,6 +64,7 @@ #include #include #include +#include #if !defined(ARCADIA_BUILD) @@ -811,7 +812,7 @@ int Server::main(const std::vector & /*args*/) http_params->setTimeout(settings.http_receive_timeout); http_params->setKeepAliveTimeout(keep_alive_timeout); - std::vector> servers; + std::vector servers; std::vector listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host"); @@ -823,29 +824,6 @@ int Server::main(const std::vector & /*args*/) listen_try = true; } -#if USE_GRPC - std::vector> grpc_servers; - auto start_grpc_servers = [&] - { - for (auto & server : grpc_servers) - { - if (server) - server_pool.start(*server); - } - }; - auto stop_grpc_servers = [&] - { - for (auto & server : grpc_servers) - { - if (server) - server->stop(); - } - }; -#else - auto start_grpc_servers = []{}; - auto stop_grpc_servers = []{}; -#endif - auto make_socket_address = [&](const std::string & host, UInt16 port) { Poco::Net::SocketAddress socket_address; @@ -1067,7 +1045,7 @@ int Server::main(const std::vector & /*args*/) create_server("grpc_port", [&](UInt16 port) { Poco::Net::SocketAddress server_address(listen_host, port); - grpc_servers.emplace_back(new GRPCServer(server_address.toString(), *this)); + servers.emplace_back(std::make_unique(*this, server_pool, make_socket_address(listen_host, port))); LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString()); }); #endif @@ -1093,9 +1071,7 @@ int Server::main(const std::vector & /*args*/) global_context->enableNamedSessions(); for (auto & server : servers) - server->start(); - - start_grpc_servers(); + server.start(); { String level_str = config().getString("text_log.level", ""); @@ -1127,12 +1103,10 @@ int Server::main(const std::vector & /*args*/) int current_connections = 0; for (auto & server : servers) { - server->stop(); - current_connections += server->currentConnections(); + server.stop(); + current_connections += server.currentConnections(); } - stop_grpc_servers(); - if (current_connections) LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections); else @@ -1150,7 +1124,7 @@ int Server::main(const std::vector & /*args*/) { current_connections = 0; for (auto & server : servers) - current_connections += server->currentConnections(); + current_connections += server.currentConnections(); if (!current_connections) break; sleep_current_ms += sleep_one_ms; diff --git a/src/Server/GRPCServer.cpp b/src/Server/GRPCServer.cpp index 46232c8565f..33af9791369 100644 --- a/src/Server/GRPCServer.cpp +++ b/src/Server/GRPCServer.cpp @@ -510,20 +510,25 @@ namespace } -GRPCServer::GRPCServer(std::string address_to_listen_, IServer & server_) : iserver(server_), log(&Poco::Logger::get("GRPCHandler")) +GRPCServer::GRPCServer(IServer & iserver_, Poco::ThreadPool & thread_pool_, const Poco::Net::SocketAddress & address_to_listen_) + : iserver(iserver_), thread_pool(thread_pool_), address_to_listen(address_to_listen_), log(&Poco::Logger::get("GRPCServer")) +{} + +GRPCServer::~GRPCServer() = default; + +void GRPCServer::start() { grpc::ServerBuilder builder; - builder.AddListeningPort(address_to_listen_, grpc::InsecureServerCredentials()); + builder.AddListeningPort(address_to_listen.toString(), grpc::InsecureServerCredentials()); //keepalive pings default values builder.RegisterService(&grpc_service); builder.SetMaxReceiveMessageSize(INT_MAX); notification_cq = builder.AddCompletionQueue(); new_call_cq = builder.AddCompletionQueue(); grpc_server = builder.BuildAndStart(); + thread_pool.start(*this); } -GRPCServer::~GRPCServer() = default; - void GRPCServer::stop() { grpc_server->Shutdown(); @@ -531,6 +536,11 @@ void GRPCServer::stop() new_call_cq->Shutdown(); } +size_t GRPCServer::currentConnections() const +{ + return 0; //TODO +} + void GRPCServer::run() { HandleRpcs(); diff --git a/src/Server/GRPCServer.h b/src/Server/GRPCServer.h index aa8aa4f3f52..0e477329fea 100644 --- a/src/Server/GRPCServer.h +++ b/src/Server/GRPCServer.h @@ -6,9 +6,14 @@ #if USE_GRPC #include +#include #include "clickhouse_grpc.grpc.pb.h" -namespace Poco { class Logger; } +namespace Poco +{ + class Logger; + class ThreadPool; +} namespace grpc { @@ -23,24 +28,31 @@ class IServer; class GRPCServer final : public Poco::Runnable { public: - GRPCServer(const GRPCServer & handler) = delete; - GRPCServer(GRPCServer && handler) = delete; - GRPCServer(std::string address_to_listen_, IServer & server_); + GRPCServer(IServer & server_, Poco::ThreadPool & thread_pool_, const Poco::Net::SocketAddress & address_to_listen_); ~GRPCServer() override; + /// Starts the server. A new thread will be created that waits for and accepts incoming connections. + void start(); + + /// Stops the server. No new connections will be accepted. void stop(); + + /// Returns the number of currently handled connections. + size_t currentConnections() const; + +private: virtual void run() override; void HandleRpcs(); -private: using GRPCService = clickhouse::grpc::ClickHouse::AsyncService; IServer & iserver; + Poco::ThreadPool & thread_pool; + Poco::Net::SocketAddress address_to_listen; Poco::Logger * log; std::unique_ptr notification_cq; std::unique_ptr new_call_cq; GRPCService grpc_service; std::unique_ptr grpc_server; - std::string address_to_listen; }; } #endif diff --git a/src/Server/ProtocolServerAdapter.cpp b/src/Server/ProtocolServerAdapter.cpp new file mode 100644 index 00000000000..7f57687f259 --- /dev/null +++ b/src/Server/ProtocolServerAdapter.cpp @@ -0,0 +1,50 @@ +#include +#include + +#if USE_GRPC +#include +#endif + + +namespace DB +{ +class ProtocolServerAdapter::TCPServerAdapterImpl : public Impl +{ +public: + explicit TCPServerAdapterImpl(std::unique_ptr tcp_server_) : tcp_server(std::move(tcp_server_)) {} + ~TCPServerAdapterImpl() override = default; + + void start() override { tcp_server->start(); } + void stop() override { tcp_server->stop(); } + size_t currentConnections() const override { return tcp_server->currentConnections(); } + +private: + std::unique_ptr tcp_server; +}; + +ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr tcp_server_) +{ + impl = std::make_unique(std::move(tcp_server_)); +} + +#if USE_GRPC +class ProtocolServerAdapter::GRPCServerAdapterImpl : public Impl +{ +public: + explicit GRPCServerAdapterImpl(std::unique_ptr grpc_server_) : grpc_server(std::move(grpc_server_)) {} + ~GRPCServerAdapterImpl() override = default; + + void start() override { grpc_server->start(); } + void stop() override { grpc_server->stop(); } + size_t currentConnections() const override { return grpc_server->currentConnections(); } + +private: + std::unique_ptr grpc_server; +}; + +ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr grpc_server_) +{ + impl = std::make_unique(std::move(grpc_server_)); +} +#endif +} diff --git a/src/Server/ProtocolServerAdapter.h b/src/Server/ProtocolServerAdapter.h new file mode 100644 index 00000000000..c0f82dbfde0 --- /dev/null +++ b/src/Server/ProtocolServerAdapter.h @@ -0,0 +1,55 @@ +#pragma once + +#if !defined(ARCADIA_BUILD) +#include +#endif + +#include + +namespace Poco::Net { class TCPServer; } + +namespace DB +{ +class GRPCServer; + +/// Provides an unified interface to access a protocol implementing server +/// no matter what type it has (HTTPServer, TCPServer, MySQLServer, GRPCServer, ...). +class ProtocolServerAdapter +{ +public: + ProtocolServerAdapter() {} + ProtocolServerAdapter(ProtocolServerAdapter && src) = default; + ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default; + ~ProtocolServerAdapter() {} + + ProtocolServerAdapter(std::unique_ptr tcp_server_); + +#if USE_GRPC + ProtocolServerAdapter(std::unique_ptr grpc_server_); +#endif + + /// Starts the server. A new thread will be created that waits for and accepts incoming connections. + void start() { impl->start(); } + + /// Stops the server. No new connections will be accepted. + void stop() { impl->stop(); } + + /// Returns the number of currently handled connections. + size_t currentConnections() const { return impl->currentConnections(); } + +private: + class Impl + { + public: + virtual ~Impl() {} + virtual void start() = 0; + virtual void stop() = 0; + virtual size_t currentConnections() const = 0; + }; + class TCPServerAdapterImpl; + class GRPCServerAdapterImpl; + + std::unique_ptr impl; +}; + +} diff --git a/src/Server/ya.make b/src/Server/ya.make index 335bc8c4a2a..9a728b39641 100644 --- a/src/Server/ya.make +++ b/src/Server/ya.make @@ -21,6 +21,7 @@ SRCS( PostgreSQLHandlerFactory.cpp PrometheusMetricsWriter.cpp PrometheusRequestHandler.cpp + ProtocolServerAdapter.cpp ReplicasStatusHandler.cpp StaticRequestHandler.cpp TCPHandler.cpp