Merge pull request #64132 from TTPO100AJIEX/refactor-protocol-server

Refactoring of Server.h: Isolate server management from other logic
This commit is contained in:
Robert Schulze 2024-05-26 18:41:35 +00:00 committed by GitHub
commit 79bcc54931
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1324 additions and 1032 deletions

File diff suppressed because it is too large Load Diff

View File

@ -1,15 +1,10 @@
#pragma once
#include <Server/IServer.h>
#include <Daemon/BaseDaemon.h>
#include <Server/HTTP/HTTPContext.h>
#include <Server/TCPProtocolStackFactory.h>
#include <Server/ServerType.h>
#include <Poco/Net/HTTPServerParams.h>
/** Server provides three interfaces:
* 1. HTTP - simple interface for any applications.
* 1. HTTP, GRPC - simple interfaces for any applications.
* 2. TCP - interface for native clickhouse-client and for server to server internal communications.
* More rich and efficient, but less compatible
* - data is transferred by columns;
@ -18,43 +13,21 @@
* 3. Interserver HTTP - for replication.
*/
namespace Poco
{
namespace Net
{
class ServerSocket;
}
}
namespace DB
{
class AsynchronousMetrics;
class ProtocolServerAdapter;
class Server : public BaseDaemon, public IServer
{
public:
using ServerApplication::run;
Poco::Util::LayeredConfiguration & config() const override
{
return BaseDaemon::config();
}
Poco::Util::LayeredConfiguration & config() const override { return BaseDaemon::config(); }
Poco::Logger & logger() const override
{
return BaseDaemon::logger();
}
Poco::Logger & logger() const override { return BaseDaemon::logger(); }
ContextMutablePtr context() const override
{
return global_context;
}
ContextMutablePtr context() const override { return global_context; }
bool isCancelled() const override
{
return BaseDaemon::isCancelled();
}
bool isCancelled() const override { return BaseDaemon::isCancelled(); }
void defineOptions(Poco::Util::OptionSet & _options) override;
@ -73,64 +46,6 @@ private:
ContextMutablePtr global_context;
/// Updated/recent config, to compare http_handlers
ConfigurationPtr latest_config;
HTTPContextPtr httpContext() const;
Poco::Net::SocketAddress socketBindListen(
const Poco::Util::AbstractConfiguration & config,
Poco::Net::ServerSocket & socket,
const std::string & host,
UInt16 port,
[[maybe_unused]] bool secure = false) const;
std::unique_ptr<TCPProtocolStackFactory> buildProtocolStackFromConfig(
const Poco::Util::AbstractConfiguration & config,
const std::string & protocol,
Poco::Net::HTTPServerParams::Ptr http_params,
AsynchronousMetrics & async_metrics,
bool & is_secure);
using CreateServerFunc = std::function<ProtocolServerAdapter(UInt16)>;
void createServer(
Poco::Util::AbstractConfiguration & config,
const std::string & listen_host,
const char * port_name,
bool listen_try,
bool start_server,
std::vector<ProtocolServerAdapter> & servers,
CreateServerFunc && func) const;
void createServers(
Poco::Util::AbstractConfiguration & config,
const Strings & listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers,
bool start_servers = false,
const ServerType & server_type = ServerType(ServerType::Type::QUERIES_ALL));
void createInterserverServers(
Poco::Util::AbstractConfiguration & config,
const Strings & interserver_listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers,
bool start_servers = false,
const ServerType & server_type = ServerType(ServerType::Type::QUERIES_ALL));
void updateServers(
Poco::Util::AbstractConfiguration & config,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers,
std::vector<ProtocolServerAdapter> & servers_to_start_before_tables);
void stopServers(
std::vector<ProtocolServerAdapter> & servers,
const ServerType & server_type
) const;
};
}

View File

@ -236,6 +236,7 @@ add_object_library(clickhouse_client Client)
add_object_library(clickhouse_bridge BridgeHelper)
add_object_library(clickhouse_server Server)
add_object_library(clickhouse_server_http Server/HTTP)
add_object_library(clickhouse_server_manager Server/ServersManager)
add_object_library(clickhouse_formats Formats)
add_object_library(clickhouse_processors Processors)
add_object_library(clickhouse_processors_executors Processors/Executors)

View File

@ -0,0 +1,268 @@
#include <Server/ServersManager/IServersManager.h>
#include <Interpreters/Context.h>
#include <Server/waitServersToFinish.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/logger_useful.h>
#include <Common/makeSocketAddress.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
}
IServersManager::IServersManager(ContextMutablePtr global_context_, Poco::Logger * logger_)
: global_context(global_context_), logger(logger_)
{
}
bool IServersManager::empty() const
{
return servers.empty();
}
std::vector<ProtocolServerMetrics> IServersManager::getMetrics() const
{
std::vector<ProtocolServerMetrics> metrics;
metrics.reserve(servers.size());
for (const auto & server : servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
return metrics;
}
void IServersManager::startServers()
{
for (auto & server : servers)
{
server.start();
LOG_INFO(logger, "Listening for {}", server.getDescription());
}
}
void IServersManager::stopServers(const ServerType & server_type)
{
/// Remove servers once all their connections are closed
auto check_server = [&](const char prefix[], auto & server)
{
if (!server.isStopping())
return false;
size_t current_connections = server.currentConnections();
LOG_DEBUG(
logger,
"Server {}{}: {} ({} connections)",
server.getDescription(),
prefix,
!current_connections ? "finished" : "waiting",
current_connections);
return !current_connections;
};
std::erase_if(servers, std::bind_front(check_server, " (from one of previous remove)"));
for (auto & server : servers)
{
if (!server.isStopping() && server_type.shouldStop(server.getPortName()))
server.stop();
}
std::erase_if(servers, std::bind_front(check_server, ""));
}
void IServersManager::updateServers(
const Poco::Util::AbstractConfiguration & config,
IServer & iserver,
std::mutex & servers_lock,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
ConfigurationPtr latest_config)
{
stopServersForUpdate(config, latest_config);
createServers(config, iserver, servers_lock, server_pool, async_metrics, true, ServerType(ServerType::Type::QUERIES_ALL));
}
Poco::Net::SocketAddress IServersManager::socketBindListen(
const Poco::Util::AbstractConfiguration & config, Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port) const
{
auto address = makeSocketAddress(host, port, logger);
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ config.getBool("listen_reuse_port", false));
/// If caller requests any available port from the OS, discover it after binding.
if (port == 0)
{
address = socket.address();
LOG_DEBUG(logger, "Requested any available port (port == 0), actual port is {:d}", address.port());
}
socket.listen(/* backlog = */ config.getUInt("listen_backlog", 4096));
return address;
}
void IServersManager::createServer(
const Poco::Util::AbstractConfiguration & config,
const std::string & listen_host,
const char * port_name,
bool start_server,
CreateServerFunc && func)
{
/// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
if (config.getString(port_name, "").empty())
return;
/// If we already have an active server for this listen_host/port_name, don't create it again
for (const auto & server : servers)
{
if (!server.isStopping() && server.getListenHost() == listen_host && server.getPortName() == port_name)
return;
}
auto port = config.getInt(port_name);
try
{
servers.push_back(func(port));
if (start_server)
{
servers.back().start();
LOG_INFO(logger, "Listening for {}", servers.back().getDescription());
}
global_context->registerServerPort(port_name, port);
}
catch (const Poco::Exception &)
{
if (!getListenTry(config))
{
throw Exception(ErrorCodes::NETWORK_ERROR, "Listen [{}]:{} failed: {}", listen_host, port, getCurrentExceptionMessage(false));
}
LOG_WARNING(
logger,
"Listen [{}]:{} failed: {}. If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, "
"then consider to "
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
" Example for disabled IPv4: <listen_host>::</listen_host>",
listen_host,
port,
getCurrentExceptionMessage(false));
}
}
void IServersManager::stopServersForUpdate(const Poco::Util::AbstractConfiguration & config, ConfigurationPtr latest_config)
{
/// Remove servers once all their connections are closed
auto check_server = [&](const char prefix[], auto & server)
{
if (!server.isStopping())
return false;
size_t current_connections = server.currentConnections();
LOG_DEBUG(
logger,
"Server {}{}: {} ({} connections)",
server.getDescription(),
prefix,
!current_connections ? "finished" : "waiting",
current_connections);
return !current_connections;
};
std::erase_if(servers, std::bind_front(check_server, " (from one of previous reload)"));
const auto listen_hosts = getListenHosts(config);
const Poco::Util::AbstractConfiguration & previous_config = latest_config ? *latest_config : config;
for (auto & server : servers)
{
if (server.isStopping())
return;
std::string port_name = server.getPortName();
bool has_host = false;
bool is_http = false;
if (port_name.starts_with("protocols."))
{
std::string protocol = port_name.substr(0, port_name.find_last_of('.'));
has_host = config.has(protocol + ".host");
std::string conf_name = protocol;
std::string prefix = protocol + ".";
std::unordered_set<std::string> pset{conf_name};
while (true)
{
if (config.has(prefix + "type"))
{
std::string type = config.getString(prefix + "type");
if (type == "http")
{
is_http = true;
break;
}
}
if (!config.has(prefix + "impl"))
break;
conf_name = "protocols." + config.getString(prefix + "impl");
prefix = conf_name + ".";
if (!pset.insert(conf_name).second)
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name);
}
}
else
{
/// NOTE: better to compare using getPortName() over using
/// dynamic_cast<> since HTTPServer is also used for prometheus and
/// internal replication communications.
is_http = server.getPortName() == "http_port" || server.getPortName() == "https_port";
}
if (!has_host)
has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server.getListenHost()) != listen_hosts.end();
bool has_port = !config.getString(port_name, "").empty();
bool force_restart = is_http && !isSameConfiguration(previous_config, config, "http_handlers");
if (force_restart)
LOG_TRACE(logger, "<http_handlers> had been changed, will reload {}", server.getDescription());
if (!has_host || !has_port || config.getInt(server.getPortName()) != server.portNumber() || force_restart)
{
server.stop();
LOG_INFO(logger, "Stopped listening for {}", server.getDescription());
}
}
std::erase_if(servers, std::bind_front(check_server, ""));
}
Strings IServersManager::getListenHosts(const Poco::Util::AbstractConfiguration & config) const
{
auto listen_hosts = DB::getMultipleValuesFromConfig(config, "", "listen_host");
if (listen_hosts.empty())
{
listen_hosts.emplace_back("::1");
listen_hosts.emplace_back("127.0.0.1");
}
return listen_hosts;
}
bool IServersManager::getListenTry(const Poco::Util::AbstractConfiguration & config) const
{
bool listen_try = config.getBool("listen_try", false);
if (!listen_try)
{
Poco::Util::AbstractConfiguration::Keys protocols;
config.keys("protocols", protocols);
listen_try = DB::getMultipleValuesFromConfig(config, "", "listen_host").empty()
&& std::none_of(
protocols.begin(),
protocols.end(),
[&](const auto & protocol)
{ return config.has("protocols." + protocol + ".host") && config.has("protocols." + protocol + ".port"); });
}
return listen_try;
}
}

View File

@ -0,0 +1,74 @@
#pragma once
#include <mutex>
#include <Core/ServerSettings.h>
#include <Interpreters/Context_fwd.h>
#include <Server/IServer.h>
#include <Server/ProtocolServerAdapter.h>
#include <Server/ServerType.h>
#include <Poco/Logger.h>
#include <Poco/Net/ServerSocket.h>
#include <Poco/ThreadPool.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/AsynchronousMetrics.h>
#include <Common/Config/ConfigProcessor.h>
namespace DB
{
class IServersManager
{
public:
IServersManager(ContextMutablePtr global_context_, Poco::Logger * logger_);
virtual ~IServersManager() = default;
bool empty() const;
std::vector<ProtocolServerMetrics> getMetrics() const;
virtual void createServers(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
std::mutex & servers_lock,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
bool start_servers,
const ServerType & server_type)
= 0;
void startServers();
void stopServers(const ServerType & server_type);
virtual size_t stopServers(const ServerSettings & server_settings, std::mutex & servers_lock) = 0;
virtual void updateServers(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
std::mutex & servers_lock,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
ConfigurationPtr latest_config);
protected:
ContextMutablePtr global_context;
Poco::Logger * logger;
std::vector<ProtocolServerAdapter> servers;
Poco::Net::SocketAddress socketBindListen(
const Poco::Util::AbstractConfiguration & config, Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port) const;
using CreateServerFunc = std::function<ProtocolServerAdapter(UInt16)>;
void createServer(
const Poco::Util::AbstractConfiguration & config,
const std::string & listen_host,
const char * port_name,
bool start_server,
CreateServerFunc && func);
void stopServersForUpdate(const Poco::Util::AbstractConfiguration & config, ConfigurationPtr latest_config);
Strings getListenHosts(const Poco::Util::AbstractConfiguration & config) const;
bool getListenTry(const Poco::Util::AbstractConfiguration & config) const;
};
}

View File

@ -0,0 +1,327 @@
#include <Server/ServersManager/InterServersManager.h>
#include <Interpreters/Context.h>
#include <Server/HTTP/HTTPServer.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/KeeperReadinessHandler.h>
#include <Server/waitServersToFinish.h>
#include <Poco/Net/HTTPServerParams.h>
#include <Common/Config/AbstractConfigurationComparison.h>
#include <Common/ProfileEvents.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/logger_useful.h>
#if USE_SSL
# include <Poco/Net/SecureServerSocket.h>
#endif
#if USE_NURAFT
# include <Coordination/FourLetterCommand.h>
# include <Server/KeeperTCPHandlerFactory.h>
#endif
namespace ProfileEvents
{
extern const Event InterfaceInterserverSendBytes;
extern const Event InterfaceInterserverReceiveBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
void InterServersManager::createServers(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
std::mutex & servers_lock,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
bool start_servers,
const ServerType & server_type)
{
if (config.has("keeper_server.server_id"))
{
#if USE_NURAFT
//// If we don't have configured connection probably someone trying to use clickhouse-server instead
//// of clickhouse-keeper, so start synchronously.
bool can_initialize_keeper_async = false;
if (zkutil::hasZooKeeperConfig(config)) /// We have configured connection to some zookeeper cluster
{
/// If we cannot connect to some other node from our cluster then we have to wait our Keeper start
/// synchronously.
can_initialize_keeper_async = global_context->tryCheckClientConnectionToMyKeeperCluster();
}
/// Initialize keeper RAFT.
global_context->initializeKeeperDispatcher(can_initialize_keeper_async);
FourLetterCommandFactory::registerCommands(*global_context->getKeeperDispatcher());
auto config_getter = [this]() -> const Poco::Util::AbstractConfiguration & { return global_context->getConfigRef(); };
for (const auto & listen_host : getListenHosts(config))
{
/// TCP Keeper
constexpr auto port_name = "keeper_server.tcp_port";
createServer(
config,
listen_host,
port_name,
/* start_server = */ false,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(
Poco::Timespan(config.getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0));
socket.setSendTimeout(
Poco::Timespan(config.getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
return ProtocolServerAdapter(
listen_host,
port_name,
"Keeper (tcp): " + address.toString(),
std::make_unique<TCPServer>(
new KeeperTCPHandlerFactory(
config_getter,
global_context->getKeeperDispatcher(),
global_context->getSettingsRef().receive_timeout.totalSeconds(),
global_context->getSettingsRef().send_timeout.totalSeconds(),
false),
server_pool,
socket));
});
constexpr auto secure_port_name = "keeper_server.tcp_port_secure";
createServer(
config,
listen_host,
secure_port_name,
/* start_server = */ false,
[&](UInt16 port) -> ProtocolServerAdapter
{
# if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(
Poco::Timespan(config.getUInt64("keeper_server.socket_receive_timeout_sec", DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC), 0));
socket.setSendTimeout(
Poco::Timespan(config.getUInt64("keeper_server.socket_send_timeout_sec", DBMS_DEFAULT_SEND_TIMEOUT_SEC), 0));
return ProtocolServerAdapter(
listen_host,
secure_port_name,
"Keeper with secure protocol (tcp_secure): " + address.toString(),
std::make_unique<TCPServer>(
new KeeperTCPHandlerFactory(
config_getter,
global_context->getKeeperDispatcher(),
global_context->getSettingsRef().receive_timeout.totalSeconds(),
global_context->getSettingsRef().send_timeout.totalSeconds(),
true),
server_pool,
socket));
# else
UNUSED(port);
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
# endif
});
/// HTTP control endpoints
createServer(
config,
listen_host,
/* port_name = */ "keeper_server.http_control.port",
/* start_server = */ false,
[&](UInt16 port) -> ProtocolServerAdapter
{
auto http_context = std::make_shared<HTTPContext>(global_context);
Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(http_context->getReceiveTimeout());
http_params->setKeepAliveTimeout(keep_alive_timeout);
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(http_context->getReceiveTimeout());
socket.setSendTimeout(http_context->getSendTimeout());
return ProtocolServerAdapter(
listen_host,
port_name,
"HTTP Control: http://" + address.toString(),
std::make_unique<HTTPServer>(
std::move(http_context),
createKeeperHTTPControlMainHandlerFactory(
config_getter(), global_context->getKeeperDispatcher(), "KeeperHTTPControlHandler-factory"),
server_pool,
socket,
http_params));
});
}
#else
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED, "ClickHouse server built without NuRaft library. Cannot use internal coordination.");
#endif
}
{
std::lock_guard lock(servers_lock);
/// We should start interserver communications before (and more important shutdown after) tables.
/// Because server can wait for a long-running queries (for example in tcp_handler) after interserver handler was already shut down.
/// In this case we will have replicated tables which are unable to send any parts to other replicas, but still can
/// communicate with zookeeper, execute merges, etc.
createInterserverServers(config, server, server_pool, async_metrics, start_servers, server_type);
startServers();
}
}
size_t InterServersManager::stopServers(const ServerSettings & server_settings, std::mutex & servers_lock)
{
if (servers.empty())
{
return 0;
}
LOG_DEBUG(logger, "Waiting for current connections to servers for tables to finish.");
size_t current_connections = 0;
{
std::lock_guard lock(servers_lock);
for (auto & server : servers)
{
server.stop();
current_connections += server.currentConnections();
}
}
if (current_connections)
LOG_INFO(logger, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
else
LOG_INFO(logger, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(servers, servers_lock, server_settings.shutdown_wait_unfinished);
if (current_connections)
LOG_INFO(
logger,
"Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections "
"after context shutdown.",
current_connections);
else
LOG_INFO(logger, "Closed connections to servers for tables.");
return current_connections;
}
void InterServersManager::updateServers(
const Poco::Util::AbstractConfiguration & config,
IServer & iserver,
std::mutex & /*servers_lock*/,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
ConfigurationPtr latest_config)
{
stopServersForUpdate(config, latest_config);
createInterserverServers(config, iserver, server_pool, async_metrics, true, ServerType(ServerType::Type::QUERIES_ALL));
}
Strings InterServersManager::getInterserverListenHosts(const Poco::Util::AbstractConfiguration & config) const
{
auto interserver_listen_hosts = DB::getMultipleValuesFromConfig(config, "", "interserver_listen_host");
if (!interserver_listen_hosts.empty())
return interserver_listen_hosts;
/// Use more general restriction in case of emptiness
return getListenHosts(config);
}
void InterServersManager::createInterserverServers(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
bool start_servers,
const ServerType & server_type)
{
const Settings & settings = global_context->getSettingsRef();
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(global_context->getServerSettings().keep_alive_timeout);
/// Now iterate over interserver_listen_hosts
for (const auto & interserver_listen_host : getInterserverListenHosts(config))
{
if (server_type.shouldStart(ServerType::Type::INTERSERVER_HTTP))
{
/// Interserver IO HTTP
constexpr auto port_name = "interserver_http_port";
createServer(
config,
interserver_listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, interserver_listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
return ProtocolServerAdapter(
interserver_listen_host,
port_name,
"replica communication (interserver): http://" + address.toString(),
std::make_unique<HTTPServer>(
std::make_shared<HTTPContext>(global_context),
createHandlerFactory(server, config, async_metrics, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params,
ProfileEvents::InterfaceInterserverReceiveBytes,
ProfileEvents::InterfaceInterserverSendBytes));
});
}
if (server_type.shouldStart(ServerType::Type::INTERSERVER_HTTPS))
{
constexpr auto port_name = "interserver_https_port";
createServer(
config,
interserver_listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(config, socket, interserver_listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
return ProtocolServerAdapter(
interserver_listen_host,
port_name,
"secure replica communication (interserver): https://" + address.toString(),
std::make_unique<HTTPServer>(
std::make_shared<HTTPContext>(global_context),
createHandlerFactory(server, config, async_metrics, "InterserverIOHTTPSHandler-factory"),
server_pool,
socket,
http_params,
ProfileEvents::InterfaceInterserverReceiveBytes,
ProfileEvents::InterfaceInterserverSendBytes));
#else
UNUSED(port);
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
#endif
});
}
}
}
}

View File

@ -0,0 +1,44 @@
#pragma once
#include <Server/ServersManager/IServersManager.h>
namespace DB
{
class InterServersManager : public IServersManager
{
public:
using IServersManager::IServersManager;
void createServers(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
std::mutex & servers_lock,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
bool start_servers,
const ServerType & server_type) override;
size_t stopServers(const ServerSettings & server_settings, std::mutex & servers_lock) override;
void updateServers(
const Poco::Util::AbstractConfiguration & config,
IServer & iserver,
std::mutex & servers_lock,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
ConfigurationPtr latest_config) override;
private:
Strings getInterserverListenHosts(const Poco::Util::AbstractConfiguration & config) const;
void createInterserverServers(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
bool start_servers,
const ServerType & server_type);
};
}

View File

@ -0,0 +1,523 @@
#include <Server/ServersManager/ProtocolServersManager.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Server/HTTP/HTTPServer.h>
#include <Server/HTTP/HTTPServerConnectionFactory.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/MySQLHandlerFactory.h>
#include <Server/PostgreSQLHandlerFactory.h>
#include <Server/ProxyV1HandlerFactory.h>
#include <Server/TCPHandlerFactory.h>
#include <Server/TLSHandlerFactory.h>
#include <Server/waitServersToFinish.h>
#include <Common/ProfileEvents.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/makeSocketAddress.h>
#if USE_SSL
# include <Poco/Net/SecureServerSocket.h>
#endif
#if USE_GRPC
# include <Server/GRPCServer.h>
#endif
namespace ProfileEvents
{
extern const Event InterfaceNativeSendBytes;
extern const Event InterfaceNativeReceiveBytes;
extern const Event InterfaceHTTPSendBytes;
extern const Event InterfaceHTTPReceiveBytes;
extern const Event InterfacePrometheusSendBytes;
extern const Event InterfacePrometheusReceiveBytes;
extern const Event InterfaceMySQLSendBytes;
extern const Event InterfaceMySQLReceiveBytes;
extern const Event InterfacePostgreSQLSendBytes;
extern const Event InterfacePostgreSQLReceiveBytes;
extern const Event InterfaceInterserverSendBytes;
extern const Event InterfaceInterserverReceiveBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int INVALID_CONFIG_PARAMETER;
}
void ProtocolServersManager::createServers(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
std::mutex & /*servers_lock*/,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
bool start_servers,
const ServerType & server_type)
{
auto listen_hosts = getListenHosts(config);
const Settings & settings = global_context->getSettingsRef();
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(global_context->getServerSettings().keep_alive_timeout);
Poco::Util::AbstractConfiguration::Keys protocols;
config.keys("protocols", protocols);
for (const auto & protocol : protocols)
{
if (!server_type.shouldStart(ServerType::Type::CUSTOM, protocol))
continue;
std::string prefix = "protocols." + protocol + ".";
std::string port_name = prefix + "port";
std::string description{"<undefined> protocol"};
if (config.has(prefix + "description"))
description = config.getString(prefix + "description");
if (!config.has(prefix + "port"))
continue;
std::vector<std::string> hosts;
if (config.has(prefix + "host"))
hosts.push_back(config.getString(prefix + "host"));
else
hosts = listen_hosts;
for (const auto & host : hosts)
{
bool is_secure = false;
auto stack = buildProtocolStackFromConfig(config, server, protocol, http_params, async_metrics, is_secure);
if (stack->empty())
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' stack empty", protocol);
createServer(
config,
host,
port_name.c_str(),
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
host,
port_name.c_str(),
description + ": " + address.toString(),
std::make_unique<TCPServer>(stack.release(), server_pool, socket, new Poco::Net::TCPServerParams));
});
}
}
for (const auto & listen_host : listen_hosts)
{
if (server_type.shouldStart(ServerType::Type::HTTP))
{
/// HTTP
constexpr auto port_name = "http_port";
createServer(
config,
listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
return ProtocolServerAdapter(
listen_host,
port_name,
"http://" + address.toString(),
std::make_unique<HTTPServer>(
std::make_shared<HTTPContext>(global_context),
createHandlerFactory(server, config, async_metrics, "HTTPHandler-factory"),
server_pool,
socket,
http_params,
ProfileEvents::InterfaceHTTPReceiveBytes,
ProfileEvents::InterfaceHTTPSendBytes));
});
}
if (server_type.shouldStart(ServerType::Type::HTTPS))
{
/// HTTPS
constexpr auto port_name = "https_port";
createServer(
config,
listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
return ProtocolServerAdapter(
listen_host,
port_name,
"https://" + address.toString(),
std::make_unique<HTTPServer>(
std::make_shared<HTTPContext>(global_context),
createHandlerFactory(server, config, async_metrics, "HTTPSHandler-factory"),
server_pool,
socket,
http_params,
ProfileEvents::InterfaceHTTPReceiveBytes,
ProfileEvents::InterfaceHTTPSendBytes));
#else
UNUSED(port);
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"HTTPS protocol is disabled because Poco library was built without NetSSL support.");
#endif
});
}
if (server_type.shouldStart(ServerType::Type::TCP))
{
/// TCP
constexpr auto port_name = "tcp_port";
createServer(
config,
listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
listen_host,
port_name,
"native protocol (tcp): " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(
server, false, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
});
}
if (server_type.shouldStart(ServerType::Type::TCP_WITH_PROXY))
{
/// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt
constexpr auto port_name = "tcp_with_proxy_port";
createServer(
config,
listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
listen_host,
port_name,
"native protocol (tcp) with PROXY: " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(
server, false, true, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
});
}
if (server_type.shouldStart(ServerType::Type::TCP_SECURE))
{
/// TCP with SSL
constexpr auto port_name = "tcp_port_secure";
createServer(
config,
listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
#if USE_SSL
Poco::Net::SecureServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
listen_host,
port_name,
"secure native protocol (tcp_secure): " + address.toString(),
std::make_unique<TCPServer>(
new TCPHandlerFactory(
server, true, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
#else
UNUSED(port);
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
#endif
});
}
if (server_type.shouldStart(ServerType::Type::MYSQL))
{
constexpr auto port_name = "mysql_port";
createServer(
config,
listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
listen_host,
port_name,
"MySQL compatibility protocol: " + address.toString(),
std::make_unique<TCPServer>(
new MySQLHandlerFactory(
server, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
});
}
if (server_type.shouldStart(ServerType::Type::POSTGRESQL))
{
constexpr auto port_name = "postgresql_port";
createServer(
config,
listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
listen_host,
port_name,
"PostgreSQL compatibility protocol: " + address.toString(),
std::make_unique<TCPServer>(
new PostgreSQLHandlerFactory(
server, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes),
server_pool,
socket,
new Poco::Net::TCPServerParams));
});
}
#if USE_GRPC
if (server_type.shouldStart(ServerType::Type::GRPC))
{
constexpr auto port_name = "grpc_port";
createServer(
config,
listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::SocketAddress server_address(listen_host, port);
return ProtocolServerAdapter(
listen_host,
port_name,
"gRPC protocol: " + server_address.toString(),
std::make_unique<GRPCServer>(server, makeSocketAddress(listen_host, port, logger)));
});
}
#endif
if (server_type.shouldStart(ServerType::Type::PROMETHEUS))
{
/// Prometheus (if defined and not setup yet with http_port)
constexpr auto port_name = "prometheus.port";
createServer(
config,
listen_host,
port_name,
start_servers,
[&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
return ProtocolServerAdapter(
listen_host,
port_name,
"Prometheus: http://" + address.toString(),
std::make_unique<HTTPServer>(
std::make_shared<HTTPContext>(global_context),
createHandlerFactory(server, config, async_metrics, "PrometheusHandler-factory"),
server_pool,
socket,
http_params,
ProfileEvents::InterfacePrometheusReceiveBytes,
ProfileEvents::InterfacePrometheusSendBytes));
});
}
}
}
size_t ProtocolServersManager::stopServers(const ServerSettings & server_settings, std::mutex & servers_lock)
{
if (servers.empty())
{
return 0;
}
LOG_DEBUG(logger, "Waiting for current connections to close.");
size_t current_connections = 0;
{
std::lock_guard lock(servers_lock);
for (auto & server : servers)
{
server.stop();
current_connections += server.currentConnections();
}
}
if (current_connections)
LOG_WARNING(logger, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
else
LOG_INFO(logger, "Closed all listening sockets.");
/// Wait for unfinished backups and restores.
/// This must be done after closing listening sockets (no more backups/restores) but before ProcessList::killAllQueries
/// (because killAllQueries() will cancel all running backups/restores).
if (server_settings.shutdown_wait_backups_and_restores)
global_context->waitAllBackupsAndRestores();
/// Killing remaining queries.
if (!server_settings.shutdown_wait_unfinished_queries)
global_context->getProcessList().killAllQueries();
if (current_connections)
current_connections = waitServersToFinish(servers, servers_lock, server_settings.shutdown_wait_unfinished);
if (current_connections)
LOG_WARNING(
logger,
"Closed connections. But {} remain."
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>",
current_connections);
else
LOG_INFO(logger, "Closed connections.");
return current_connections;
}
std::unique_ptr<TCPProtocolStackFactory> ProtocolServersManager::buildProtocolStackFromConfig(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
const std::string & protocol,
Poco::Net::HTTPServerParams::Ptr http_params,
AsynchronousMetrics & async_metrics,
bool & is_secure) const
{
auto create_factory = [&](const std::string & type, const std::string & conf_name) -> TCPServerConnectionFactory::Ptr
{
if (type == "tcp")
return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(
server, false, false, ProfileEvents::InterfaceNativeReceiveBytes, ProfileEvents::InterfaceNativeSendBytes));
if (type == "tls")
#if USE_SSL
return TCPServerConnectionFactory::Ptr(new TLSHandlerFactory(server, conf_name));
#else
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.");
#endif
if (type == "proxy1")
return TCPServerConnectionFactory::Ptr(new ProxyV1HandlerFactory(server, conf_name));
if (type == "mysql")
return TCPServerConnectionFactory::Ptr(
new MySQLHandlerFactory(server, ProfileEvents::InterfaceMySQLReceiveBytes, ProfileEvents::InterfaceMySQLSendBytes));
if (type == "postgres")
return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(
server, ProfileEvents::InterfacePostgreSQLReceiveBytes, ProfileEvents::InterfacePostgreSQLSendBytes));
if (type == "http")
return TCPServerConnectionFactory::Ptr(new HTTPServerConnectionFactory(
std::make_shared<HTTPContext>(global_context),
http_params,
createHandlerFactory(server, config, async_metrics, "HTTPHandler-factory"),
ProfileEvents::InterfaceHTTPReceiveBytes,
ProfileEvents::InterfaceHTTPSendBytes));
if (type == "prometheus")
return TCPServerConnectionFactory::Ptr(new HTTPServerConnectionFactory(
std::make_shared<HTTPContext>(global_context),
http_params,
createHandlerFactory(server, config, async_metrics, "PrometheusHandler-factory"),
ProfileEvents::InterfacePrometheusReceiveBytes,
ProfileEvents::InterfacePrometheusSendBytes));
if (type == "interserver")
return TCPServerConnectionFactory::Ptr(new HTTPServerConnectionFactory(
std::make_shared<HTTPContext>(global_context),
http_params,
createHandlerFactory(server, config, async_metrics, "InterserverIOHTTPHandler-factory"),
ProfileEvents::InterfaceInterserverReceiveBytes,
ProfileEvents::InterfaceInterserverSendBytes));
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol configuration error, unknown protocol name '{}'", type);
};
std::string conf_name = "protocols." + protocol;
std::string prefix = conf_name + ".";
std::unordered_set<std::string> pset{conf_name};
auto stack = std::make_unique<TCPProtocolStackFactory>(server, conf_name);
while (true)
{
// if there is no "type" - it's a reference to another protocol and this is just an endpoint
if (config.has(prefix + "type"))
{
std::string type = config.getString(prefix + "type");
if (type == "tls")
{
if (is_secure)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' contains more than one TLS layer", protocol);
is_secure = true;
}
stack->append(create_factory(type, conf_name));
}
if (!config.has(prefix + "impl"))
break;
conf_name = "protocols." + config.getString(prefix + "impl");
prefix = conf_name + ".";
if (!pset.insert(conf_name).second)
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name);
}
return stack;
}
}

View File

@ -0,0 +1,37 @@
#pragma once
#include <Server/ServersManager/IServersManager.h>
#include <Server/TCPProtocolStackFactory.h>
#include <Poco/Net/HTTPServerParams.h>
namespace DB
{
class ProtocolServersManager : public IServersManager
{
public:
using IServersManager::IServersManager;
void createServers(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
std::mutex & servers_lock,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
bool start_servers,
const ServerType & server_type) override;
using IServersManager::stopServers;
size_t stopServers(const ServerSettings & server_settings, std::mutex & servers_lock) override;
private:
std::unique_ptr<TCPProtocolStackFactory> buildProtocolStackFromConfig(
const Poco::Util::AbstractConfiguration & config,
IServer & server,
const std::string & protocol,
Poco::Net::HTTPServerParams::Ptr http_params,
AsynchronousMetrics & async_metrics,
bool & is_secure) const;
};
}