diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index afd6a36ea15..d144b4d332e 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -379,11 +380,11 @@ int Keeper::main(const std::vector & /*args*/) socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); servers->emplace_back( + listen_host, port_name, - std::make_unique( - new KeeperTCPHandlerFactory(*this, false), server_pool, socket, new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for connections to Keeper (tcp): {}", address.toString()); + "Keeper (tcp): " + address.toString(), + std::make_unique( + new KeeperTCPHandlerFactory(*this, false), server_pool, socket)); }); const char * secure_port_name = "keeper_server.tcp_port_secure"; @@ -395,10 +396,11 @@ int Keeper::main(const std::vector & /*args*/) socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); servers->emplace_back( + listen_host, secure_port_name, - std::make_unique( - new KeeperTCPHandlerFactory(*this, true), server_pool, socket, new Poco::Net::TCPServerParams)); - LOG_INFO(log, "Listening for connections to Keeper with secure protocol (tcp_secure): {}", address.toString()); + "Keeper with secure protocol (tcp_secure): " + address.toString(), + std::make_unique( + new KeeperTCPHandlerFactory(*this, true), server_pool, socket)); #else UNUSED(port); throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", @@ -408,7 +410,10 @@ int Keeper::main(const std::vector & /*args*/) } for (auto & server : *servers) + { server.start(); + LOG_INFO(log, "Listening for {}", server.getDescription()); + } zkutil::EventPtr unused_event = std::make_shared(); zkutil::ZooKeeperNodeCache unused_cache([] { return nullptr; }); diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 14075f9fbf2..43d2b64c4f2 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -70,6 +71,7 @@ #include "MetricsTransmitter.h" #include #include +#include #include #include #include @@ -127,6 +129,11 @@ namespace CurrentMetrics extern const Metric MaxPushedDDLEntryID; } +namespace ProfileEvents +{ + extern const Event MainConfigLoads; +} + namespace fs = std::filesystem; #if USE_JEMALLOC @@ -344,16 +351,53 @@ Poco::Net::SocketAddress Server::socketBindListen(Poco::Net::ServerSocket & sock return address; } -void Server::createServer(const std::string & listen_host, const char * port_name, bool listen_try, CreateServerFunc && func) const +std::vector getListenHosts(const Poco::Util::AbstractConfiguration & config) +{ + auto listen_hosts = DB::getMultipleValuesFromConfig(config, "", "listen_host"); + if (listen_hosts.empty()) + { + listen_hosts.emplace_back("::1"); + listen_hosts.emplace_back("127.0.0.1"); + } + return listen_hosts; +} + +bool getListenTry(const Poco::Util::AbstractConfiguration & config) +{ + bool listen_try = config.getBool("listen_try", false); + if (!listen_try) + listen_try = DB::getMultipleValuesFromConfig(config, "", "listen_host").empty(); + return listen_try; +} + + +void Server::createServer( + Poco::Util::AbstractConfiguration & config, + const std::string & listen_host, + const char * port_name, + bool listen_try, + bool start_server, + std::vector & servers, + CreateServerFunc && func) const { /// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file. - if (!config().has(port_name)) + if (config.getString(port_name, "").empty()) return; - auto port = config().getInt(port_name); + /// If we already have an active server for this listen_host/port_name, don't create it again + for (const auto & server : servers) + if (!server.isStopping() && server.getListenHost() == listen_host && server.getPortName() == port_name) + return; + + auto port = config.getInt(port_name); try { - func(port); + servers.push_back(func(port)); + if (start_server) + { + servers.back().start(); + LOG_INFO(&logger(), "Listening for {}", servers.back().getDescription()); + } global_context->registerServerPort(port_name, port); } catch (const Poco::Exception &) @@ -515,6 +559,25 @@ if (ThreadFuzzer::instance().isEffective()) config().getUInt("thread_pool_queue_size", 10000) ); + Poco::ThreadPool server_pool(3, config().getUInt("max_connections", 1024)); + std::mutex servers_lock; + std::vector servers; + std::vector servers_to_start_before_tables; + /// This object will periodically calculate some metrics. + AsynchronousMetrics async_metrics( + global_context, config().getUInt("asynchronous_metrics_update_period_s", 1), + [&]() -> std::vector + { + std::vector metrics; + for (const auto & server : servers_to_start_before_tables) + metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); + std::lock_guard lock(servers_lock); + for (const auto & server : servers) + metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); + return metrics; + } + ); + ConnectionCollector::init(global_context, config().getUInt("max_threads_for_connection_collector", 10)); bool has_zookeeper = config().has("zookeeper"); @@ -870,12 +933,17 @@ if (ThreadFuzzer::instance().isEffective()) global_context->reloadZooKeeperIfChanged(config); global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config); + + std::lock_guard lock(servers_lock); + updateServers(*config, server_pool, async_metrics, servers); } global_context->updateStorageConfiguration(*config); global_context->updateInterserverCredentials(*config); CompressionCodecEncrypted::Configuration::instance().tryLoad(*config, "encryption_codecs"); + + ProfileEvents::increment(ProfileEvents::MainConfigLoads); }, /* already_loaded = */ false); /// Reload it right now (initial loading) @@ -987,24 +1055,8 @@ if (ThreadFuzzer::instance().isEffective()) /// try set up encryption. There are some errors in config, error will be printed and server wouldn't start. CompressionCodecEncrypted::Configuration::instance().load(config(), "encryption_codecs"); - Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0); - - Poco::ThreadPool server_pool(3, config().getUInt("max_connections", 1024)); - Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; - http_params->setTimeout(settings.http_receive_timeout); - http_params->setKeepAliveTimeout(keep_alive_timeout); - - auto servers_to_start_before_tables = std::make_shared>(); - - std::vector listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host"); - - bool listen_try = config().getBool("listen_try", false); - if (listen_hosts.empty()) - { - listen_hosts.emplace_back("::1"); - listen_hosts.emplace_back("127.0.0.1"); - listen_try = true; - } + const auto listen_hosts = getListenHosts(config()); + const auto listen_try = getListenTry(config()); if (config().has("keeper_server")) { @@ -1027,39 +1079,46 @@ if (ThreadFuzzer::instance().isEffective()) { /// TCP Keeper const char * port_name = "keeper_server.tcp_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers_to_start_before_tables->emplace_back( - port_name, - std::make_unique( - new KeeperTCPHandlerFactory(*this, false), server_pool, socket, new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for connections to Keeper (tcp): {}", address.toString()); - }); + createServer( + config(), listen_host, port_name, listen_try, /* start_server: */ false, + servers_to_start_before_tables, + [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "Keeper (tcp): " + address.toString(), + std::make_unique( + new KeeperTCPHandlerFactory(*this, false), server_pool, socket)); + }); const char * secure_port_name = "keeper_server.tcp_port_secure"; - createServer(listen_host, secure_port_name, listen_try, [&](UInt16 port) - { + createServer( + config(), listen_host, secure_port_name, listen_try, /* start_server: */ false, + servers_to_start_before_tables, + [&](UInt16 port) -> ProtocolServerAdapter + { #if USE_SSL - Poco::Net::SecureServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers_to_start_before_tables->emplace_back( - secure_port_name, - std::make_unique( - new KeeperTCPHandlerFactory(*this, true), server_pool, socket, new Poco::Net::TCPServerParams)); - LOG_INFO(log, "Listening for connections to Keeper with secure protocol (tcp_secure): {}", address.toString()); + Poco::Net::SecureServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + secure_port_name, + "Keeper with secure protocol (tcp_secure): " + address.toString(), + std::make_unique( + new KeeperTCPHandlerFactory(*this, true), server_pool, socket)); #else - UNUSED(port); - throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", - ErrorCodes::SUPPORT_IS_DISABLED}; + UNUSED(port); + throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", + ErrorCodes::SUPPORT_IS_DISABLED}; #endif - }); + }); } #else throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ClickHouse server built without NuRaft library. Cannot use internal coordination."); @@ -1067,14 +1126,19 @@ if (ThreadFuzzer::instance().isEffective()) } - for (auto & server : *servers_to_start_before_tables) + for (auto & server : servers_to_start_before_tables) + { server.start(); + LOG_INFO(log, "Listening for {}", server.getDescription()); + } SCOPE_EXIT({ /// Stop reloading of the main config. This must be done before `global_context->shutdown()` because /// otherwise the reloading may pass a changed config to some destroyed parts of ContextSharedPart. main_config_reloader.reset(); + async_metrics.stop(); + /** Ask to cancel background jobs all table engines, * and also query_log. * It is important to do early, not in destructor of Context, because @@ -1086,11 +1150,11 @@ if (ThreadFuzzer::instance().isEffective()) LOG_DEBUG(log, "Shut down storages."); - if (!servers_to_start_before_tables->empty()) + if (!servers_to_start_before_tables.empty()) { LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish."); int current_connections = 0; - for (auto & server : *servers_to_start_before_tables) + for (auto & server : servers_to_start_before_tables) { server.stop(); current_connections += server.currentConnections(); @@ -1102,7 +1166,7 @@ if (ThreadFuzzer::instance().isEffective()) LOG_INFO(log, "Closed all listening sockets."); if (current_connections > 0) - current_connections = waitServersToFinish(*servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5)); + current_connections = waitServersToFinish(servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5)); if (current_connections) LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections); @@ -1256,223 +1320,18 @@ if (ThreadFuzzer::instance().isEffective()) LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled."); #endif - auto servers = std::make_shared>(); { - /// This object will periodically calculate some metrics. - AsynchronousMetrics async_metrics( - global_context, config().getUInt("asynchronous_metrics_update_period_s", 1), servers_to_start_before_tables, servers); attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics); - for (const auto & listen_host : listen_hosts) { - /// HTTP - const char * port_name = "http_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - - servers->emplace_back( - port_name, - std::make_unique( - context(), createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); - - LOG_INFO(log, "Listening for http://{}", address.toString()); - }); - - /// HTTPS - port_name = "https_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { -#if USE_SSL - Poco::Net::SecureServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back( - port_name, - std::make_unique( - context(), createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); - - LOG_INFO(log, "Listening for https://{}", address.toString()); -#else - UNUSED(port); - throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.", - ErrorCodes::SUPPORT_IS_DISABLED}; -#endif - }); - - /// TCP - port_name = "tcp_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for connections with native protocol (tcp): {}", address.toString()); - }); - - /// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt - port_name = "tcp_with_proxy_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for connections with native protocol (tcp) with PROXY: {}", address.toString()); - }); - - /// TCP with SSL - port_name = "tcp_port_secure"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { -#if USE_SSL - Poco::Net::SecureServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - LOG_INFO(log, "Listening for connections with secure native protocol (tcp_secure): {}", address.toString()); -#else - UNUSED(port); - throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", - ErrorCodes::SUPPORT_IS_DISABLED}; -#endif - }); - - /// Interserver IO HTTP - port_name = "interserver_http_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back( - port_name, - std::make_unique( - context(), - createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), - server_pool, - socket, - http_params)); - - LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString()); - }); - - port_name = "interserver_https_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { -#if USE_SSL - Poco::Net::SecureServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back( - port_name, - std::make_unique( - context(), - createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), - server_pool, - socket, - http_params)); - - LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString()); -#else - UNUSED(port); - throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", - ErrorCodes::SUPPORT_IS_DISABLED}; -#endif - }); - - port_name = "mysql_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(Poco::Timespan()); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new MySQLHandlerFactory(*this), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for MySQL compatibility protocol: {}", address.toString()); - }); - - port_name = "postgresql_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(Poco::Timespan()); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new PostgreSQLHandlerFactory(*this), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for PostgreSQL compatibility protocol: " + address.toString()); - }); - -#if USE_GRPC - port_name = "grpc_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::SocketAddress server_address(listen_host, port); - servers->emplace_back(port_name, std::make_unique(*this, makeSocketAddress(listen_host, port, log))); - LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString()); - }); -#endif - - /// Prometheus (if defined and not setup yet with http_port) - port_name = "prometheus.port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back( - port_name, - std::make_unique( - context(), - createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), - server_pool, - socket, - http_params)); - - LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString()); - }); + std::lock_guard lock(servers_lock); + createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers); + if (servers.empty()) + throw Exception( + "No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", + ErrorCodes::NO_ELEMENTS_IN_CONFIG); } - if (servers->empty()) - throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", - ErrorCodes::NO_ELEMENTS_IN_CONFIG); - - /// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread. async_metrics.start(); { @@ -1551,9 +1410,15 @@ if (ThreadFuzzer::instance().isEffective()) &CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID)); } - for (auto & server : *servers) - server.start(); - LOG_INFO(log, "Ready for connections."); + { + std::lock_guard lock(servers_lock); + for (auto & server : servers) + { + server.start(); + LOG_INFO(log, "Listening for {}", server.getDescription()); + } + LOG_INFO(log, "Ready for connections."); + } SCOPE_EXIT_SAFE({ LOG_DEBUG(log, "Received termination signal."); @@ -1562,10 +1427,13 @@ if (ThreadFuzzer::instance().isEffective()) is_cancelled = true; int current_connections = 0; - for (auto & server : *servers) { - server.stop(); - current_connections += server.currentConnections(); + std::lock_guard lock(servers_lock); + for (auto & server : servers) + { + server.stop(); + current_connections += server.currentConnections(); + } } if (current_connections) @@ -1578,7 +1446,7 @@ if (ThreadFuzzer::instance().isEffective()) global_context->getProcessList().killAllQueries(); if (current_connections) - current_connections = waitServersToFinish(*servers, config().getInt("shutdown_wait_unfinished", 5)); + current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5)); if (current_connections) LOG_INFO(log, "Closed connections. But {} remain." @@ -1614,4 +1482,273 @@ if (ThreadFuzzer::instance().isEffective()) return Application::EXIT_OK; } + +void Server::createServers( + Poco::Util::AbstractConfiguration & config, + const std::vector & listen_hosts, + bool listen_try, + Poco::ThreadPool & server_pool, + AsynchronousMetrics & async_metrics, + std::vector & servers, + bool start_servers) +{ + const Settings & settings = global_context->getSettingsRef(); + + Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0); + Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; + http_params->setTimeout(settings.http_receive_timeout); + http_params->setKeepAliveTimeout(keep_alive_timeout); + + for (const auto & listen_host : listen_hosts) + { + /// HTTP + const char * port_name = "http_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + + return ProtocolServerAdapter( + listen_host, + port_name, + "http://" + address.toString(), + std::make_unique( + context(), createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); + }); + + /// HTTPS + port_name = "https_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { +#if USE_SSL + Poco::Net::SecureServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "https://" + address.toString(), + std::make_unique( + context(), createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); +#else + UNUSED(port); + throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.", + ErrorCodes::SUPPORT_IS_DISABLED}; +#endif + }); + + /// TCP + port_name = "tcp_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "native protocol (tcp): " + address.toString(), + std::make_unique( + new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false), + server_pool, + socket, + new Poco::Net::TCPServerParams)); + }); + + /// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt + port_name = "tcp_with_proxy_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "native protocol (tcp) with PROXY: " + address.toString(), + std::make_unique( + new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true), + server_pool, + socket, + new Poco::Net::TCPServerParams)); + }); + + /// TCP with SSL + port_name = "tcp_port_secure"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { +#if USE_SSL + Poco::Net::SecureServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "secure native protocol (tcp_secure): " + address.toString(), + std::make_unique( + new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false), + server_pool, + socket, + new Poco::Net::TCPServerParams)); +#else + UNUSED(port); + throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", + ErrorCodes::SUPPORT_IS_DISABLED}; +#endif + }); + + /// Interserver IO HTTP + port_name = "interserver_http_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "replica communication (interserver): http://" + address.toString(), + std::make_unique( + context(), + createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), + server_pool, + socket, + http_params)); + }); + + port_name = "interserver_https_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { +#if USE_SSL + Poco::Net::SecureServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "secure replica communication (interserver): https://" + address.toString(), + std::make_unique( + context(), + createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), + server_pool, + socket, + http_params)); +#else + UNUSED(port); + throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", + ErrorCodes::SUPPORT_IS_DISABLED}; +#endif + }); + + port_name = "mysql_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(Poco::Timespan()); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "MySQL compatibility protocol: " + address.toString(), + std::make_unique(new MySQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); + }); + + port_name = "postgresql_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(Poco::Timespan()); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "PostgreSQL compatibility protocol: " + address.toString(), + std::make_unique(new PostgreSQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); + }); + +#if USE_GRPC + port_name = "grpc_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::SocketAddress server_address(listen_host, port); + return ProtocolServerAdapter( + listen_host, + port_name, + "gRPC protocol: " + server_address.toString(), + std::make_unique(*this, makeSocketAddress(listen_host, port, &logger()))); + }); +#endif + + /// Prometheus (if defined and not setup yet with http_port) + port_name = "prometheus.port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "Prometheus: http://" + address.toString(), + std::make_unique( + context(), createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params)); + }); + } + +} + +void Server::updateServers( + Poco::Util::AbstractConfiguration & config, + Poco::ThreadPool & server_pool, + AsynchronousMetrics & async_metrics, + std::vector & servers) +{ + Poco::Logger * log = &logger(); + /// Gracefully shutdown servers when their port is removed from config + const auto listen_hosts = getListenHosts(config); + const auto listen_try = getListenTry(config); + + for (auto & server : servers) + if (!server.isStopping()) + { + bool has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server.getListenHost()) != listen_hosts.end(); + bool has_port = !config.getString(server.getPortName(), "").empty(); + if (!has_host || !has_port || config.getInt(server.getPortName()) != server.portNumber()) + { + server.stop(); + LOG_INFO(log, "Stopped listening for {}", server.getDescription()); + } + } + + createServers(config, listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers: */ true); + + /// Remove servers once all their connections are closed + while (std::any_of(servers.begin(), servers.end(), [](const auto & server) { return server.isStopping(); })) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::erase_if(servers, [&log](auto & server) + { + if (!server.isStopping()) + return false; + auto is_finished = server.currentConnections() == 0; + if (is_finished) + LOG_DEBUG(log, "Server finished: {}", server.getDescription()); + else + LOG_TRACE(log, "Waiting server to finish: {}", server.getDescription()); + return is_finished; + }); + } +} + } diff --git a/programs/server/Server.h b/programs/server/Server.h index 45e5fccd51d..b4f2ea3bb79 100644 --- a/programs/server/Server.h +++ b/programs/server/Server.h @@ -24,6 +24,8 @@ namespace Poco namespace DB { +class AsynchronousMetrics; +class ProtocolServerAdapter; class Server : public BaseDaemon, public IServer { @@ -67,8 +69,30 @@ private: ContextMutablePtr global_context; Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const; - using CreateServerFunc = std::function; - void createServer(const std::string & listen_host, const char * port_name, bool listen_try, CreateServerFunc && func) const; + using CreateServerFunc = std::function; + void createServer( + Poco::Util::AbstractConfiguration & config, + const std::string & listen_host, + const char * port_name, + bool listen_try, + bool start_server, + std::vector & servers, + CreateServerFunc && func) const; + + void createServers( + Poco::Util::AbstractConfiguration & config, + const std::vector & listen_hosts, + bool listen_try, + Poco::ThreadPool & server_pool, + AsynchronousMetrics & async_metrics, + std::vector & servers, + bool start_servers = false); + + void updateServers( + Poco::Util::AbstractConfiguration & config, + Poco::ThreadPool & server_pool, + AsynchronousMetrics & async_metrics, + std::vector & servers); }; } diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 982523a3ef2..ce321d17b48 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -274,7 +274,8 @@ M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from page cache.") \ \ M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \ - + \ + M(MainConfigLoads, "Number of times the main configuration was reloaded.") \ namespace ProfileEvents { diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index 121f7c4153f..d1c5fbebbc7 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -69,12 +69,10 @@ static std::unique_ptr openFileIfExists(const std::stri AsynchronousMetrics::AsynchronousMetrics( ContextPtr global_context_, int update_period_seconds, - std::shared_ptr> servers_to_start_before_tables_, - std::shared_ptr> servers_) + const ProtocolServerMetricsFunc & protocol_server_metrics_func_) : WithContext(global_context_) , update_period(update_period_seconds) - , servers_to_start_before_tables(servers_to_start_before_tables_) - , servers(servers_) + , protocol_server_metrics_func(protocol_server_metrics_func_) , log(&Poco::Logger::get("AsynchronousMetrics")) { #if defined(OS_LINUX) @@ -238,7 +236,7 @@ void AsynchronousMetrics::start() thread = std::make_unique([this] { run(); }); } -AsynchronousMetrics::~AsynchronousMetrics() +void AsynchronousMetrics::stop() { try { @@ -249,7 +247,10 @@ AsynchronousMetrics::~AsynchronousMetrics() wait_cond.notify_one(); if (thread) + { thread->join(); + thread.reset(); + } } catch (...) { @@ -257,6 +258,11 @@ AsynchronousMetrics::~AsynchronousMetrics() } } +AsynchronousMetrics::~AsynchronousMetrics() +{ + stop(); +} + AsynchronousMetricValues AsynchronousMetrics::getValues() const { @@ -1381,22 +1387,11 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti return it->second; }; - if (servers_to_start_before_tables) + const auto server_metrics = protocol_server_metrics_func(); + for (const auto & server_metric : server_metrics) { - for (const auto & server : *servers_to_start_before_tables) - { - if (const auto * name = get_metric_name(server.getPortName())) - new_values[name] = server.currentThreads(); - } - } - - if (servers) - { - for (const auto & server : *servers) - { - if (const auto * name = get_metric_name(server.getPortName())) - new_values[name] = server.currentThreads(); - } + if (const auto * name = get_metric_name(server_metric.port_name)) + new_values[name] = server_metric.current_threads; } } diff --git a/src/Interpreters/AsynchronousMetrics.h b/src/Interpreters/AsynchronousMetrics.h index 7a5c2d638d7..3c7581ce1a3 100644 --- a/src/Interpreters/AsynchronousMetrics.h +++ b/src/Interpreters/AsynchronousMetrics.h @@ -30,6 +30,11 @@ class ReadBuffer; using AsynchronousMetricValue = double; using AsynchronousMetricValues = std::unordered_map; +struct ProtocolServerMetrics +{ + String port_name; + size_t current_threads; +}; /** Periodically (by default, each minute, starting at 30 seconds offset) * calculates and updates some metrics, @@ -41,24 +46,25 @@ using AsynchronousMetricValues = std::unordered_map()>; AsynchronousMetrics( ContextPtr global_context_, int update_period_seconds, - std::shared_ptr> servers_to_start_before_tables_, - std::shared_ptr> servers_); + const ProtocolServerMetricsFunc & protocol_server_metrics_func_); ~AsynchronousMetrics(); /// Separate method allows to initialize the `servers` variable beforehand. void start(); + void stop(); + /// Returns copy of all values. AsynchronousMetricValues getValues() const; private: const std::chrono::seconds update_period; - std::shared_ptr> servers_to_start_before_tables{nullptr}; - std::shared_ptr> servers{nullptr}; + ProtocolServerMetricsFunc protocol_server_metrics_func; mutable std::mutex mutex; std::condition_variable wait_cond; diff --git a/src/Server/GRPCServer.h b/src/Server/GRPCServer.h index 25c3813c11d..e2b48f1c16b 100644 --- a/src/Server/GRPCServer.h +++ b/src/Server/GRPCServer.h @@ -4,6 +4,7 @@ #if USE_GRPC #include +#include #include "clickhouse_grpc.grpc.pb.h" namespace Poco { class Logger; } @@ -30,6 +31,9 @@ public: /// Stops the server. No new connections will be accepted. void stop(); + /// Returns the port this server is listening to. + UInt16 portNumber() const { return address_to_listen.port(); } + /// Returns the number of currently handled connections. size_t currentConnections() const; diff --git a/src/Server/HTTP/HTTPServer.cpp b/src/Server/HTTP/HTTPServer.cpp index 42e6467d0af..2e91fad1c0f 100644 --- a/src/Server/HTTP/HTTPServer.cpp +++ b/src/Server/HTTP/HTTPServer.cpp @@ -5,31 +5,13 @@ namespace DB { -HTTPServer::HTTPServer( - ContextPtr context, - HTTPRequestHandlerFactoryPtr factory_, - UInt16 port_number, - Poco::Net::HTTPServerParams::Ptr params) - : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), port_number, params), factory(factory_) -{ -} - -HTTPServer::HTTPServer( - ContextPtr context, - HTTPRequestHandlerFactoryPtr factory_, - const Poco::Net::ServerSocket & socket, - Poco::Net::HTTPServerParams::Ptr params) - : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), socket, params), factory(factory_) -{ -} - HTTPServer::HTTPServer( ContextPtr context, HTTPRequestHandlerFactoryPtr factory_, Poco::ThreadPool & thread_pool, - const Poco::Net::ServerSocket & socket, + Poco::Net::ServerSocket & socket_, Poco::Net::HTTPServerParams::Ptr params) - : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), thread_pool, socket, params), factory(factory_) + : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), thread_pool, socket_, params), factory(factory_) { } diff --git a/src/Server/HTTP/HTTPServer.h b/src/Server/HTTP/HTTPServer.h index 3518fd66d20..07ad54d267f 100644 --- a/src/Server/HTTP/HTTPServer.h +++ b/src/Server/HTTP/HTTPServer.h @@ -1,9 +1,9 @@ #pragma once #include +#include #include -#include #include @@ -13,26 +13,14 @@ namespace DB class Context; -class HTTPServer : public Poco::Net::TCPServer +class HTTPServer : public TCPServer { public: explicit HTTPServer( - ContextPtr context, - HTTPRequestHandlerFactoryPtr factory, - UInt16 port_number = 80, - Poco::Net::HTTPServerParams::Ptr params = new Poco::Net::HTTPServerParams); - - HTTPServer( - ContextPtr context, - HTTPRequestHandlerFactoryPtr factory, - const Poco::Net::ServerSocket & socket, - Poco::Net::HTTPServerParams::Ptr params); - - HTTPServer( ContextPtr context, HTTPRequestHandlerFactoryPtr factory, Poco::ThreadPool & thread_pool, - const Poco::Net::ServerSocket & socket, + Poco::Net::ServerSocket & socket, Poco::Net::HTTPServerParams::Ptr params); ~HTTPServer() override; diff --git a/src/Server/HTTP/HTTPServerConnection.cpp b/src/Server/HTTP/HTTPServerConnection.cpp index de81da20ead..7020b8e9a23 100644 --- a/src/Server/HTTP/HTTPServerConnection.cpp +++ b/src/Server/HTTP/HTTPServerConnection.cpp @@ -1,4 +1,5 @@ #include +#include #include @@ -7,10 +8,11 @@ namespace DB HTTPServerConnection::HTTPServerConnection( ContextPtr context_, + TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket, Poco::Net::HTTPServerParams::Ptr params_, HTTPRequestHandlerFactoryPtr factory_) - : TCPServerConnection(socket), context(Context::createCopy(context_)), params(params_), factory(factory_), stopped(false) + : TCPServerConnection(socket), context(Context::createCopy(context_)), tcp_server(tcp_server_), params(params_), factory(factory_), stopped(false) { poco_check_ptr(factory); } @@ -20,12 +22,12 @@ void HTTPServerConnection::run() std::string server = params->getSoftwareVersion(); Poco::Net::HTTPServerSession session(socket(), params); - while (!stopped && session.hasMoreRequests()) + while (!stopped && tcp_server.isOpen() && session.hasMoreRequests()) { try { std::unique_lock lock(mutex); - if (!stopped) + if (!stopped && tcp_server.isOpen()) { HTTPServerResponse response(session); HTTPServerRequest request(context, response, session); @@ -48,6 +50,11 @@ void HTTPServerConnection::run() response.set("Server", server); try { + if (!tcp_server.isOpen()) + { + sendErrorResponse(session, Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE); + break; + } std::unique_ptr handler(factory->createRequestHandler(request)); if (handler) diff --git a/src/Server/HTTP/HTTPServerConnection.h b/src/Server/HTTP/HTTPServerConnection.h index 1c7ae6cd2b7..db3969f6ffb 100644 --- a/src/Server/HTTP/HTTPServerConnection.h +++ b/src/Server/HTTP/HTTPServerConnection.h @@ -9,12 +9,14 @@ namespace DB { +class TCPServer; class HTTPServerConnection : public Poco::Net::TCPServerConnection { public: HTTPServerConnection( ContextPtr context, + TCPServer & tcp_server, const Poco::Net::StreamSocket & socket, Poco::Net::HTTPServerParams::Ptr params, HTTPRequestHandlerFactoryPtr factory); @@ -26,6 +28,7 @@ protected: private: ContextPtr context; + TCPServer & tcp_server; Poco::Net::HTTPServerParams::Ptr params; HTTPRequestHandlerFactoryPtr factory; bool stopped; diff --git a/src/Server/HTTP/HTTPServerConnectionFactory.cpp b/src/Server/HTTP/HTTPServerConnectionFactory.cpp index 0e4fb6cfcec..008da222c79 100644 --- a/src/Server/HTTP/HTTPServerConnectionFactory.cpp +++ b/src/Server/HTTP/HTTPServerConnectionFactory.cpp @@ -11,9 +11,9 @@ HTTPServerConnectionFactory::HTTPServerConnectionFactory( poco_check_ptr(factory); } -Poco::Net::TCPServerConnection * HTTPServerConnectionFactory::createConnection(const Poco::Net::StreamSocket & socket) +Poco::Net::TCPServerConnection * HTTPServerConnectionFactory::createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) { - return new HTTPServerConnection(context, socket, params, factory); + return new HTTPServerConnection(context, tcp_server, socket, params, factory); } } diff --git a/src/Server/HTTP/HTTPServerConnectionFactory.h b/src/Server/HTTP/HTTPServerConnectionFactory.h index 3f11eca0f69..a19dc6d4d5c 100644 --- a/src/Server/HTTP/HTTPServerConnectionFactory.h +++ b/src/Server/HTTP/HTTPServerConnectionFactory.h @@ -2,19 +2,19 @@ #include #include +#include #include -#include namespace DB { -class HTTPServerConnectionFactory : public Poco::Net::TCPServerConnectionFactory +class HTTPServerConnectionFactory : public TCPServerConnectionFactory { public: HTTPServerConnectionFactory(ContextPtr context, Poco::Net::HTTPServerParams::Ptr params, HTTPRequestHandlerFactoryPtr factory); - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override; + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override; private: ContextPtr context; diff --git a/src/Server/KeeperTCPHandlerFactory.h b/src/Server/KeeperTCPHandlerFactory.h index 67bb3dab268..58dc73d7c27 100644 --- a/src/Server/KeeperTCPHandlerFactory.h +++ b/src/Server/KeeperTCPHandlerFactory.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include @@ -10,7 +10,7 @@ namespace DB { -class KeeperTCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory +class KeeperTCPHandlerFactory : public TCPServerConnectionFactory { private: IServer & server; @@ -29,7 +29,7 @@ public: { } - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer &) override { try { diff --git a/src/Server/MySQLHandler.cpp b/src/Server/MySQLHandler.cpp index deebc073ad5..2836ee05c30 100644 --- a/src/Server/MySQLHandler.cpp +++ b/src/Server/MySQLHandler.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -62,10 +63,11 @@ static String showTableStatusReplacementQuery(const String & query); static String killConnectionIdReplacementQuery(const String & query); static String selectLimitReplacementQuery(const String & query); -MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, +MySQLHandler::MySQLHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_) : Poco::Net::TCPServerConnection(socket_) , server(server_) + , tcp_server(tcp_server_) , log(&Poco::Logger::get("MySQLHandler")) , connection_id(connection_id_) , auth_plugin(new MySQLProtocol::Authentication::Native41()) @@ -138,11 +140,14 @@ void MySQLHandler::run() OKPacket ok_packet(0, handshake_response.capability_flags, 0, 0, 0); packet_endpoint->sendPacket(ok_packet, true); - while (true) + while (tcp_server.isOpen()) { packet_endpoint->resetSequenceId(); MySQLPacketPayloadReadBuffer payload = packet_endpoint->getPayload(); + while (!in->poll(1000000)) + if (!tcp_server.isOpen()) + return; char command = 0; payload.readStrict(command); @@ -152,6 +157,8 @@ void MySQLHandler::run() LOG_DEBUG(log, "Received command: {}. Connection id: {}.", static_cast(static_cast(command)), connection_id); + if (!tcp_server.isOpen()) + return; try { switch (command) @@ -369,8 +376,8 @@ void MySQLHandler::finishHandshakeSSL( } #if USE_SSL -MySQLHandlerSSL::MySQLHandlerSSL(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_) - : MySQLHandler(server_, socket_, ssl_enabled, connection_id_) +MySQLHandlerSSL::MySQLHandlerSSL(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_) + : MySQLHandler(server_, tcp_server_, socket_, ssl_enabled, connection_id_) , public_key(public_key_) , private_key(private_key_) {} diff --git a/src/Server/MySQLHandler.h b/src/Server/MySQLHandler.h index 7ef212bf36e..3af5f7a0eb2 100644 --- a/src/Server/MySQLHandler.h +++ b/src/Server/MySQLHandler.h @@ -24,11 +24,14 @@ namespace CurrentMetrics namespace DB { +class ReadBufferFromPocoSocket; +class TCPServer; + /// Handler for MySQL wire protocol connections. Allows to connect to ClickHouse using MySQL client. class MySQLHandler : public Poco::Net::TCPServerConnection { public: - MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_); + MySQLHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_); void run() final; @@ -52,6 +55,7 @@ protected: virtual void finishHandshakeSSL(size_t packet_size, char * buf, size_t pos, std::function read_bytes, MySQLProtocol::ConnectionPhase::HandshakeResponse & packet); IServer & server; + TCPServer & tcp_server; Poco::Logger * log; UInt64 connection_id = 0; @@ -68,7 +72,7 @@ protected: Replacements replacements; std::unique_ptr auth_plugin; - std::shared_ptr in; + std::shared_ptr in; std::shared_ptr out; bool secure_connection = false; }; @@ -77,7 +81,7 @@ protected: class MySQLHandlerSSL : public MySQLHandler { public: - MySQLHandlerSSL(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_); + MySQLHandlerSSL(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_); private: void authPluginSSL() override; diff --git a/src/Server/MySQLHandlerFactory.cpp b/src/Server/MySQLHandlerFactory.cpp index 7a0bfd8ab09..f7bb073e275 100644 --- a/src/Server/MySQLHandlerFactory.cpp +++ b/src/Server/MySQLHandlerFactory.cpp @@ -118,14 +118,14 @@ void MySQLHandlerFactory::generateRSAKeys() } #endif -Poco::Net::TCPServerConnection * MySQLHandlerFactory::createConnection(const Poco::Net::StreamSocket & socket) +Poco::Net::TCPServerConnection * MySQLHandlerFactory::createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) { size_t connection_id = last_connection_id++; LOG_TRACE(log, "MySQL connection. Id: {}. Address: {}", connection_id, socket.peerAddress().toString()); #if USE_SSL - return new MySQLHandlerSSL(server, socket, ssl_enabled, connection_id, *public_key, *private_key); + return new MySQLHandlerSSL(server, tcp_server, socket, ssl_enabled, connection_id, *public_key, *private_key); #else - return new MySQLHandler(server, socket, ssl_enabled, connection_id); + return new MySQLHandler(server, tcp_server, socket, ssl_enabled, connection_id); #endif } diff --git a/src/Server/MySQLHandlerFactory.h b/src/Server/MySQLHandlerFactory.h index 106fdfdf341..25f1af85273 100644 --- a/src/Server/MySQLHandlerFactory.h +++ b/src/Server/MySQLHandlerFactory.h @@ -1,9 +1,9 @@ #pragma once -#include #include #include #include +#include #include @@ -13,8 +13,9 @@ namespace DB { +class TCPServer; -class MySQLHandlerFactory : public Poco::Net::TCPServerConnectionFactory +class MySQLHandlerFactory : public TCPServerConnectionFactory { private: IServer & server; @@ -43,7 +44,7 @@ public: void generateRSAKeys(); - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override; + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override; }; } diff --git a/src/Server/PostgreSQLHandler.cpp b/src/Server/PostgreSQLHandler.cpp index fee4ace3452..9808b538280 100644 --- a/src/Server/PostgreSQLHandler.cpp +++ b/src/Server/PostgreSQLHandler.cpp @@ -6,6 +6,7 @@ #include #include "PostgreSQLHandler.h" #include +#include #include #include #include @@ -28,11 +29,13 @@ namespace ErrorCodes PostgreSQLHandler::PostgreSQLHandler( const Poco::Net::StreamSocket & socket_, IServer & server_, + TCPServer & tcp_server_, bool ssl_enabled_, Int32 connection_id_, std::vector> & auth_methods_) : Poco::Net::TCPServerConnection(socket_) , server(server_) + , tcp_server(tcp_server_) , ssl_enabled(ssl_enabled_) , connection_id(connection_id_) , authentication_manager(auth_methods_) @@ -60,11 +63,18 @@ void PostgreSQLHandler::run() if (!startup()) return; - while (true) + while (tcp_server.isOpen()) { message_transport->send(PostgreSQLProtocol::Messaging::ReadyForQuery(), true); + + constexpr size_t connection_check_timeout = 1; // 1 second + while (!in->poll(1000000 * connection_check_timeout)) + if (!tcp_server.isOpen()) + return; PostgreSQLProtocol::Messaging::FrontMessageType message_type = message_transport->receiveMessageType(); + if (!tcp_server.isOpen()) + return; switch (message_type) { case PostgreSQLProtocol::Messaging::FrontMessageType::QUERY: diff --git a/src/Server/PostgreSQLHandler.h b/src/Server/PostgreSQLHandler.h index 1d33f41f255..4fd08cc2606 100644 --- a/src/Server/PostgreSQLHandler.h +++ b/src/Server/PostgreSQLHandler.h @@ -18,8 +18,9 @@ namespace CurrentMetrics namespace DB { - +class ReadBufferFromPocoSocket; class Session; +class TCPServer; /** PostgreSQL wire protocol implementation. * For more info see https://www.postgresql.org/docs/current/protocol.html @@ -30,6 +31,7 @@ public: PostgreSQLHandler( const Poco::Net::StreamSocket & socket_, IServer & server_, + TCPServer & tcp_server_, bool ssl_enabled_, Int32 connection_id_, std::vector> & auth_methods_); @@ -40,12 +42,13 @@ private: Poco::Logger * log = &Poco::Logger::get("PostgreSQLHandler"); IServer & server; + TCPServer & tcp_server; std::unique_ptr session; bool ssl_enabled = false; Int32 connection_id = 0; Int32 secret_key = 0; - std::shared_ptr in; + std::shared_ptr in; std::shared_ptr out; std::shared_ptr message_transport; diff --git a/src/Server/PostgreSQLHandlerFactory.cpp b/src/Server/PostgreSQLHandlerFactory.cpp index 1158cf5835e..6f2124861e7 100644 --- a/src/Server/PostgreSQLHandlerFactory.cpp +++ b/src/Server/PostgreSQLHandlerFactory.cpp @@ -1,5 +1,4 @@ #include "PostgreSQLHandlerFactory.h" -#include #include #include @@ -17,11 +16,11 @@ PostgreSQLHandlerFactory::PostgreSQLHandlerFactory(IServer & server_) }; } -Poco::Net::TCPServerConnection * PostgreSQLHandlerFactory::createConnection(const Poco::Net::StreamSocket & socket) +Poco::Net::TCPServerConnection * PostgreSQLHandlerFactory::createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) { Int32 connection_id = last_connection_id++; LOG_TRACE(log, "PostgreSQL connection. Id: {}. Address: {}", connection_id, socket.peerAddress().toString()); - return new PostgreSQLHandler(socket, server, ssl_enabled, connection_id, auth_methods); + return new PostgreSQLHandler(socket, server, tcp_server, ssl_enabled, connection_id, auth_methods); } } diff --git a/src/Server/PostgreSQLHandlerFactory.h b/src/Server/PostgreSQLHandlerFactory.h index dc3d4047d2a..e9241da6f0e 100644 --- a/src/Server/PostgreSQLHandlerFactory.h +++ b/src/Server/PostgreSQLHandlerFactory.h @@ -1,16 +1,16 @@ #pragma once -#include #include #include #include +#include #include #include namespace DB { -class PostgreSQLHandlerFactory : public Poco::Net::TCPServerConnectionFactory +class PostgreSQLHandlerFactory : public TCPServerConnectionFactory { private: IServer & server; @@ -28,6 +28,6 @@ private: public: explicit PostgreSQLHandlerFactory(IServer & server_); - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override; + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & server) override; }; } diff --git a/src/Server/ProtocolServerAdapter.cpp b/src/Server/ProtocolServerAdapter.cpp index 6ec1ec572f7..b41ad2376f1 100644 --- a/src/Server/ProtocolServerAdapter.cpp +++ b/src/Server/ProtocolServerAdapter.cpp @@ -1,5 +1,5 @@ #include -#include +#include #if USE_GRPC #include @@ -11,20 +11,29 @@ namespace DB class ProtocolServerAdapter::TCPServerAdapterImpl : public Impl { public: - explicit TCPServerAdapterImpl(std::unique_ptr tcp_server_) : tcp_server(std::move(tcp_server_)) {} + explicit TCPServerAdapterImpl(std::unique_ptr tcp_server_) : tcp_server(std::move(tcp_server_)) {} ~TCPServerAdapterImpl() override = default; void start() override { tcp_server->start(); } void stop() override { tcp_server->stop(); } + bool isStopping() const override { return !tcp_server->isOpen(); } + UInt16 portNumber() const override { return tcp_server->portNumber(); } size_t currentConnections() const override { return tcp_server->currentConnections(); } size_t currentThreads() const override { return tcp_server->currentThreads(); } private: - std::unique_ptr tcp_server; + std::unique_ptr tcp_server; }; -ProtocolServerAdapter::ProtocolServerAdapter(const char * port_name_, std::unique_ptr tcp_server_) - : port_name(port_name_), impl(std::make_unique(std::move(tcp_server_))) +ProtocolServerAdapter::ProtocolServerAdapter( + const std::string & listen_host_, + const char * port_name_, + const std::string & description_, + std::unique_ptr tcp_server_) + : listen_host(listen_host_) + , port_name(port_name_) + , description(description_) + , impl(std::make_unique(std::move(tcp_server_))) { } @@ -36,16 +45,30 @@ public: ~GRPCServerAdapterImpl() override = default; void start() override { grpc_server->start(); } - void stop() override { grpc_server->stop(); } + void stop() override + { + is_stopping = true; + grpc_server->stop(); + } + bool isStopping() const override { return is_stopping; } + UInt16 portNumber() const override { return grpc_server->portNumber(); } size_t currentConnections() const override { return grpc_server->currentConnections(); } size_t currentThreads() const override { return grpc_server->currentThreads(); } private: std::unique_ptr grpc_server; + bool is_stopping = false; }; -ProtocolServerAdapter::ProtocolServerAdapter(const char * port_name_, std::unique_ptr grpc_server_) - : port_name(port_name_), impl(std::make_unique(std::move(grpc_server_))) +ProtocolServerAdapter::ProtocolServerAdapter( + const std::string & listen_host_, + const char * port_name_, + const std::string & description_, + std::unique_ptr grpc_server_) + : listen_host(listen_host_) + , port_name(port_name_) + , description(description_) + , impl(std::make_unique(std::move(grpc_server_))) { } #endif diff --git a/src/Server/ProtocolServerAdapter.h b/src/Server/ProtocolServerAdapter.h index 04c46b53356..9b3b1af0301 100644 --- a/src/Server/ProtocolServerAdapter.h +++ b/src/Server/ProtocolServerAdapter.h @@ -2,14 +2,14 @@ #include +#include #include #include -namespace Poco::Net { class TCPServer; } - namespace DB { class GRPCServer; +class TCPServer; /// Provides an unified interface to access a protocol implementing server /// no matter what type it has (HTTPServer, TCPServer, MySQLServer, GRPCServer, ...). @@ -19,10 +19,10 @@ class ProtocolServerAdapter public: ProtocolServerAdapter(ProtocolServerAdapter && src) = default; ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default; - ProtocolServerAdapter(const char * port_name_, std::unique_ptr tcp_server_); + ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr tcp_server_); #if USE_GRPC - ProtocolServerAdapter(const char * port_name_, std::unique_ptr grpc_server_); + ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr grpc_server_); #endif /// Starts the server. A new thread will be created that waits for and accepts incoming connections. @@ -31,14 +31,23 @@ public: /// Stops the server. No new connections will be accepted. void stop() { impl->stop(); } + bool isStopping() const { return impl->isStopping(); } + /// Returns the number of currently handled connections. size_t currentConnections() const { return impl->currentConnections(); } /// Returns the number of current threads. size_t currentThreads() const { return impl->currentThreads(); } + /// Returns the port this server is listening to. + UInt16 portNumber() const { return impl->portNumber(); } + + const std::string & getListenHost() const { return listen_host; } + const std::string & getPortName() const { return port_name; } + const std::string & getDescription() const { return description; } + private: class Impl { @@ -46,13 +55,17 @@ private: virtual ~Impl() {} virtual void start() = 0; virtual void stop() = 0; + virtual bool isStopping() const = 0; + virtual UInt16 portNumber() const = 0; virtual size_t currentConnections() const = 0; virtual size_t currentThreads() const = 0; }; class TCPServerAdapterImpl; class GRPCServerAdapterImpl; + std::string listen_host; std::string port_name; + std::string description; std::unique_ptr impl; }; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index cdf1838c06b..c2dcd5d7222 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -81,9 +82,10 @@ namespace ErrorCodes extern const int UNKNOWN_PROTOCOL; } -TCPHandler::TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_) +TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_) : Poco::Net::TCPServerConnection(socket_) , server(server_) + , tcp_server(tcp_server_) , parse_proxy_protocol(parse_proxy_protocol_) , log(&Poco::Logger::get("TCPHandler")) , server_display_name(std::move(server_display_name_)) @@ -172,13 +174,13 @@ void TCPHandler::runImpl() throw; } - while (true) + while (tcp_server.isOpen()) { /// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down. { Stopwatch idle_time; UInt64 timeout_ms = std::min(poll_interval, idle_connection_timeout) * 1000000; - while (!server.isCancelled() && !static_cast(*in).poll(timeout_ms)) + while (tcp_server.isOpen() && !server.isCancelled() && !static_cast(*in).poll(timeout_ms)) { if (idle_time.elapsedSeconds() > idle_connection_timeout) { @@ -189,7 +191,7 @@ void TCPHandler::runImpl() } /// If we need to shut down, or client disconnects. - if (server.isCancelled() || in->eof()) + if (!tcp_server.isOpen() || server.isCancelled() || in->eof()) break; Stopwatch watch; diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index 4a340e328ed..791222dd0dc 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -35,6 +35,7 @@ class Session; struct Settings; class ColumnsDescription; struct ProfileInfo; +class TCPServer; /// State of query processing. struct QueryState @@ -127,7 +128,7 @@ public: * because it allows to check the IP ranges of the trusted proxy. * Proxy-forwarded (original client) IP address is used for quota accounting if quota is keyed by forwarded IP. */ - TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_); + TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_); ~TCPHandler() override; void run() override; @@ -137,6 +138,7 @@ public: private: IServer & server; + TCPServer & tcp_server; bool parse_proxy_protocol = false; Poco::Logger * log; diff --git a/src/Server/TCPHandlerFactory.h b/src/Server/TCPHandlerFactory.h index e610bea330c..03b2592198d 100644 --- a/src/Server/TCPHandlerFactory.h +++ b/src/Server/TCPHandlerFactory.h @@ -1,17 +1,17 @@ #pragma once -#include #include #include #include #include +#include namespace Poco { class Logger; } namespace DB { -class TCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory +class TCPHandlerFactory : public TCPServerConnectionFactory { private: IServer & server; @@ -38,13 +38,13 @@ public: server_display_name = server.config().getString("display_name", getFQDNOrHostName()); } - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override { try { LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString()); - return new TCPHandler(server, socket, parse_proxy_protocol, server_display_name); + return new TCPHandler(server, tcp_server, socket, parse_proxy_protocol, server_display_name); } catch (const Poco::Net::NetException &) { diff --git a/src/Server/TCPServer.cpp b/src/Server/TCPServer.cpp new file mode 100644 index 00000000000..380c4ef9924 --- /dev/null +++ b/src/Server/TCPServer.cpp @@ -0,0 +1,36 @@ +#include +#include + +namespace DB +{ + +class TCPServerConnectionFactoryImpl : public Poco::Net::TCPServerConnectionFactory +{ +public: + TCPServerConnectionFactoryImpl(TCPServer & tcp_server_, DB::TCPServerConnectionFactory::Ptr factory_) + : tcp_server(tcp_server_) + , factory(factory_) + {} + + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override + { + return factory->createConnection(socket, tcp_server); + } +private: + TCPServer & tcp_server; + DB::TCPServerConnectionFactory::Ptr factory; +}; + +TCPServer::TCPServer( + TCPServerConnectionFactory::Ptr factory_, + Poco::ThreadPool & thread_pool, + Poco::Net::ServerSocket & socket_, + Poco::Net::TCPServerParams::Ptr params) + : Poco::Net::TCPServer(new TCPServerConnectionFactoryImpl(*this, factory_), thread_pool, socket_, params) + , factory(factory_) + , socket(socket_) + , is_open(true) + , port_number(socket.address().port()) +{} + +} diff --git a/src/Server/TCPServer.h b/src/Server/TCPServer.h new file mode 100644 index 00000000000..219fed5342b --- /dev/null +++ b/src/Server/TCPServer.h @@ -0,0 +1,47 @@ +#pragma once + +#include + +#include +#include + + +namespace DB +{ +class Context; + +class TCPServer : public Poco::Net::TCPServer +{ +public: + explicit TCPServer( + TCPServerConnectionFactory::Ptr factory, + Poco::ThreadPool & thread_pool, + Poco::Net::ServerSocket & socket, + Poco::Net::TCPServerParams::Ptr params = new Poco::Net::TCPServerParams); + + /// Close the socket and ask existing connections to stop serving queries + void stop() + { + Poco::Net::TCPServer::stop(); + // This notifies already established connections that they should stop serving + // queries and close their socket as soon as they can. + is_open = false; + // Poco's stop() stops listening on the socket but leaves it open. + // To be able to hand over control of the listening port to a new server, and + // to get fast connection refusal instead of timeouts, we also need to close + // the listening socket. + socket.close(); + } + + bool isOpen() const { return is_open; } + + UInt16 portNumber() const { return port_number; } + +private: + TCPServerConnectionFactory::Ptr factory; + Poco::Net::ServerSocket socket; + std::atomic is_open; + UInt16 port_number; +}; + +} diff --git a/src/Server/TCPServerConnectionFactory.h b/src/Server/TCPServerConnectionFactory.h new file mode 100644 index 00000000000..613f98352bd --- /dev/null +++ b/src/Server/TCPServerConnectionFactory.h @@ -0,0 +1,27 @@ +#pragma once + +#include + +namespace Poco +{ +namespace Net +{ + class StreamSocket; + class TCPServerConnection; +} +} +namespace DB +{ +class TCPServer; + +class TCPServerConnectionFactory +{ +public: + using Ptr = Poco::SharedPtr; + + virtual ~TCPServerConnectionFactory() = default; + + /// Same as Poco::Net::TCPServerConnectionFactory except we can pass the TCPServer + virtual Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) = 0; +}; +} diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index ff945068732..d113c825205 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -2045,7 +2045,8 @@ class ClickHouseInstance: user=user, password=password, database=database) # Connects to the instance via HTTP interface, sends a query and returns the answer - def http_query(self, sql, data=None, params=None, user=None, password=None, expect_fail_and_get_error=False): + def http_query(self, sql, data=None, params=None, user=None, password=None, expect_fail_and_get_error=False, + port=8123, timeout=None, retry_strategy=None): logging.debug(f"Executing query {sql} on {self.name} via HTTP interface") if params is None: params = {} @@ -2059,12 +2060,19 @@ class ClickHouseInstance: auth = requests.auth.HTTPBasicAuth(user, password) elif user: auth = requests.auth.HTTPBasicAuth(user, '') - url = "http://" + self.ip_address + ":8123/?" + urllib.parse.urlencode(params) + url = f"http://{self.ip_address}:{port}/?" + urllib.parse.urlencode(params) - if data: - r = requests.post(url, data, auth=auth) + if retry_strategy is None: + requester = requests else: - r = requests.get(url, auth=auth) + adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) + requester = requests.Session() + requester.mount("https://", adapter) + requester.mount("http://", adapter) + if data: + r = requester.post(url, data, auth=auth, timeout=timeout) + else: + r = requester.get(url, auth=auth, timeout=timeout) def http_code_and_message(): code = r.status_code diff --git a/tests/integration/test_server_reload/.gitignore b/tests/integration/test_server_reload/.gitignore new file mode 100644 index 00000000000..edf565ec632 --- /dev/null +++ b/tests/integration/test_server_reload/.gitignore @@ -0,0 +1 @@ +_gen diff --git a/tests/integration/test_server_reload/__init__.py b/tests/integration/test_server_reload/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_server_reload/configs/default_passwd.xml b/tests/integration/test_server_reload/configs/default_passwd.xml new file mode 100644 index 00000000000..5c23be0dcb0 --- /dev/null +++ b/tests/integration/test_server_reload/configs/default_passwd.xml @@ -0,0 +1,13 @@ + + + + + + + + + + 123 + + + diff --git a/tests/integration/test_server_reload/configs/dhparam.pem b/tests/integration/test_server_reload/configs/dhparam.pem new file mode 100644 index 00000000000..fb935b9c898 --- /dev/null +++ b/tests/integration/test_server_reload/configs/dhparam.pem @@ -0,0 +1,8 @@ +-----BEGIN DH PARAMETERS----- +MIIBCAKCAQEAkPGhfLY5nppeQkFBKYRpiisxzrRQfyyTUu6aabZP2CbAMAuoYzaC +Z+iqeWSQZKRYeA21SZXkC9xE1e5FJsc5IWzCRiMNZeLuj4ApUNysMu89DpX8/b91 ++Ka6wRJnaO43ZqHj/9FpU4JiYtxoIpXDC9HeiSAnwLwJc3L+nkYfnSGgvzWIxhGV +gCoVmVBoTe7wrqCyVlM5nrNZSjhlSugvXmu2bSK3MwYF08QLKvlF68eedbs0PMWh +WC0bFM/X7gMBEqL4DiINufAShbZPKxD6eL2APiHPUo6xun3ed/Po/5j8QBmiku0c +5Jb12ZhOTRTQjaRg2aFF8LPdW2tDE7HmewIBAg== +-----END DH PARAMETERS----- diff --git a/tests/integration/test_server_reload/configs/ports_from_zk.xml b/tests/integration/test_server_reload/configs/ports_from_zk.xml new file mode 100644 index 00000000000..ae3435a3d3c --- /dev/null +++ b/tests/integration/test_server_reload/configs/ports_from_zk.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/tests/integration/test_server_reload/configs/server.crt b/tests/integration/test_server_reload/configs/server.crt new file mode 100644 index 00000000000..6f4deca038f --- /dev/null +++ b/tests/integration/test_server_reload/configs/server.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC+zCCAeOgAwIBAgIJAIhI9ozZJ+TWMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV +BAMMCWxvY2FsaG9zdDAeFw0xOTA0MjIwNDMyNTJaFw0yMDA0MjEwNDMyNTJaMBQx +EjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC +ggEBAK+wVUEdqF2uXvN0MJBgnAHyXi6JTi4p/F6igsrCjSNjJWzHH0vQmK8ujfcF +CkifW88i+W5eHctuEtQqNHK+t9x9YiZtXrj6m/XkOXs20mYgENSmbbbHbriTPnZB +zZrq6UqMlwIHNNAa+I3NMORQxVRaI0ybXnGVO5elr70xHpk03xL0JWKHpEqYp4db +2aBQgF6y3Ww4khxjIYqpUYXWXGFnVIRU7FKVEAM1xyKqvQzXjQ5sVM/wyHknveEF +3b/X4ggN+KNl5KOc0cWDh1/XaatJAPaUUPqZcq76tynLbP64Xm3dxHcj+gtRkO67 +ef6MSg6l63m3XQP6Qb+MIkd06OsCAwEAAaNQME4wHQYDVR0OBBYEFDmODTO8QLDN +ykR3x0LIOnjNhrKhMB8GA1UdIwQYMBaAFDmODTO8QLDNykR3x0LIOnjNhrKhMAwG +A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAAwaiJc7uqEpnH3aukbftDwX +m8GfEnj1HVdgg+9GGNq+9rvUYBF6gdPmjRCX9dO0cclLFx8jc2org0rTSq9WoOhX +E6qL4Eqrmc5SE3Y9jZM0h6GRD4oXK014FmtZ3T6ddZU3dQLj3BS2r1XrvmubTvGN +ZuTJNY8nx8Hh6H5XINmsEjUF9E5hog+PwCE03xt2adIdYL+gsbxASeNYyeUFpZv5 +zcXR3VoakBWnAaOVgCHq2qh96QAnL7ZKzFkGf/MdwV10KU3dmb+ICbQUUdf9Gc17 +aaDCIRws312F433FdXBkGs2UkB7ZZme9dfn6O1QbeTNvex2VLMqYx/CTkfFbOQA= +-----END CERTIFICATE----- diff --git a/tests/integration/test_server_reload/configs/server.key b/tests/integration/test_server_reload/configs/server.key new file mode 100644 index 00000000000..6eddb3295db --- /dev/null +++ b/tests/integration/test_server_reload/configs/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCvsFVBHahdrl7z +dDCQYJwB8l4uiU4uKfxeooLKwo0jYyVsxx9L0JivLo33BQpIn1vPIvluXh3LbhLU +KjRyvrfcfWImbV64+pv15Dl7NtJmIBDUpm22x264kz52Qc2a6ulKjJcCBzTQGviN +zTDkUMVUWiNMm15xlTuXpa+9MR6ZNN8S9CVih6RKmKeHW9mgUIBest1sOJIcYyGK +qVGF1lxhZ1SEVOxSlRADNcciqr0M140ObFTP8Mh5J73hBd2/1+IIDfijZeSjnNHF +g4df12mrSQD2lFD6mXKu+rcpy2z+uF5t3cR3I/oLUZDuu3n+jEoOpet5t10D+kG/ +jCJHdOjrAgMBAAECggEARF66zrxb6RkSmmt8+rKeA6PuQu3sHsr4C1vyyjUr97l9 +tvdGlpp20LWtSZQMjHZ3pARYTTsTHTeY3DgQcRcHNicVKx8k3ZepWeeW9vw+pL+V +zSt3RsoVrH6gsCSrfr4sS3aqzX9AbjwQvh48CJ3mLQ1m70kHV+xbZIh1+4pB/hyP +1wKyUE18ZkOptXvO/TtoHzLQCecpkXtWzmry1Eh2isvXA+NMrAtLibGsyM1mtm7i +5ozevzHabvvCDBEe+KgZdONgVhhhvm2eOd+/s4w3rw4ETud4fI/ZAJyWXhiIKFnA +VJbElWruSAoVBW7p2bsF5PbmVzvo8vXL+VylxYD+AQKBgQDhLoRKTVhNkn/QjKxq +sdOh+QZra0LzjVpAmkQzu7wZMSHEz9qePQciDQQrYKrmRF1vNcIRCVUTqWYheJ/1 +lKRrCGa0ab6k96zkWMqLHD5u+UeJV7r1dJIx08ME9kNJ+x/XtB8klRIji16NiQUS +qc6p8z0M2AnbJzsRfWZRH8FeYwKBgQDHu8dzdtVGI7MtxfPOE/bfajiopDg8BdTC +pdug2T8XofRHRq7Q+0vYjTAZFT/slib91Pk6VvvPdo9VBZiL4omv4dAq6mOOdX/c +U14mJe1X5GCrr8ExZ8BfNJ3t/6sV1fcxyJwAw7iBguqxA2JqdM/wFk10K8XqvzVn +CD6O9yGt2QKBgFX1BMi8N538809vs41S7l9hCQNOQZNo/O+2M5yv6ECRkbtoQKKw +1x03bMUGNJaLuELweXE5Z8GGo5bZTe5X3F+DKHlr+DtO1C+ieUaa9HY2MAmMdLCn +2/qrREGLo+oEs4YKmuzC/taUp/ZNPKOAMISNdluFyFVg51pozPrgrVbTAoGBAKkE +LBl3O67o0t0vH8sJdeVFG8EJhlS0koBMnfgVHqC++dm+5HwPyvTrNQJkyv1HaqNt +r6FArkG3ED9gRuBIyT6+lctbIPgSUip9mbQqcBfqOCvQxGksZMur2ODncz09HLtS +CUFUXjOqNzOnq4ZuZu/Bz7U4vXiSaXxQq6+LTUKxAoGAFZU/qrI06XxnrE9A1X0W +l7DSkpZaDcu11NrZ473yONih/xOZNh4SSBpX8a7F6Pmh9BdtGqphML8NFPvQKcfP +b9H2iid2tc292uyrUEb5uTMmv61zoTwtitqLzO0+tS6PT3fXobX+eyeEWKzPBljL +HFtxG5CCXpkdnWRmaJnhTzA= +-----END PRIVATE KEY----- diff --git a/tests/integration/test_server_reload/configs/ssl_conf.xml b/tests/integration/test_server_reload/configs/ssl_conf.xml new file mode 100644 index 00000000000..43b25032059 --- /dev/null +++ b/tests/integration/test_server_reload/configs/ssl_conf.xml @@ -0,0 +1,18 @@ + + + + + + + /etc/clickhouse-server/config.d/server.crt + /etc/clickhouse-server/config.d/server.key + + /etc/clickhouse-server/config.d/dhparam.pem + none + true + true + sslv2,sslv3 + true + + + diff --git a/tests/integration/test_server_reload/protos/clickhouse_grpc.proto b/tests/integration/test_server_reload/protos/clickhouse_grpc.proto new file mode 100644 index 00000000000..c6cafaf6e40 --- /dev/null +++ b/tests/integration/test_server_reload/protos/clickhouse_grpc.proto @@ -0,0 +1,174 @@ +/* This file describes gRPC protocol supported in ClickHouse. + * + * To use this protocol a client should send one or more messages of the QueryInfo type + * and then receive one or more messages of the Result type. + * According to that the service provides four methods for that: + * ExecuteQuery(QueryInfo) returns (Result) + * ExecuteQueryWithStreamInput(stream QueryInfo) returns (Result) + * ExecuteQueryWithStreamOutput(QueryInfo) returns (stream Result) + * ExecuteQueryWithStreamIO(stream QueryInfo) returns (stream Result) + * It's up to the client to choose which method to use. + * For example, ExecuteQueryWithStreamInput() allows the client to add data multiple times + * while executing a query, which is suitable for inserting many rows. + */ + +syntax = "proto3"; + +package clickhouse.grpc; + +message NameAndType { + string name = 1; + string type = 2; +} + +// Describes an external table - a table which will exists only while a query is executing. +message ExternalTable { + // Name of the table. If omitted, "_data" is used. + string name = 1; + + // Columns of the table. Types are required, names can be omitted. If the names are omitted, "_1", "_2", ... is used. + repeated NameAndType columns = 2; + + // Data to insert to the external table. + // If a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used, + // then data for insertion to the same external table can be split between multiple QueryInfos. + bytes data = 3; + + // Format of the data to insert to the external table. + string format = 4; + + // Settings for executing that insertion, applied after QueryInfo.settings. + map settings = 5; +} + +enum CompressionAlgorithm { + NO_COMPRESSION = 0; + DEFLATE = 1; + GZIP = 2; + STREAM_GZIP = 3; +} + +enum CompressionLevel { + COMPRESSION_NONE = 0; + COMPRESSION_LOW = 1; + COMPRESSION_MEDIUM = 2; + COMPRESSION_HIGH = 3; +} + +message Compression { + CompressionAlgorithm algorithm = 1; + CompressionLevel level = 2; +} + +// Information about a query which a client sends to a ClickHouse server. +// The first QueryInfo can set any of the following fields. Extra QueryInfos only add extra data. +// In extra QueryInfos only `input_data`, `external_tables`, `next_query_info` and `cancel` fields can be set. +message QueryInfo { + string query = 1; + string query_id = 2; + map settings = 3; + + // Default database. + string database = 4; + + // Input data, used both as data for INSERT query and as data for the input() function. + bytes input_data = 5; + + // Delimiter for input_data, inserted between input_data from adjacent QueryInfos. + bytes input_data_delimiter = 6; + + // Default output format. If not specified, 'TabSeparated' is used. + string output_format = 7; + + repeated ExternalTable external_tables = 8; + + string user_name = 9; + string password = 10; + string quota = 11; + + // Works exactly like sessions in the HTTP protocol. + string session_id = 12; + bool session_check = 13; + uint32 session_timeout = 14; + + // Set `cancel` to true to stop executing the query. + bool cancel = 15; + + // If true there will be at least one more QueryInfo in the input stream. + // `next_query_info` is allowed to be set only if a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used. + bool next_query_info = 16; + + /// Controls how a ClickHouse server will compress query execution results before sending back to the client. + /// If not set the compression settings from the configuration file will be used. + Compression result_compression = 17; +} + +enum LogsLevel { + LOG_NONE = 0; + LOG_FATAL = 1; + LOG_CRITICAL = 2; + LOG_ERROR = 3; + LOG_WARNING = 4; + LOG_NOTICE = 5; + LOG_INFORMATION = 6; + LOG_DEBUG = 7; + LOG_TRACE = 8; +} + +message LogEntry { + uint32 time = 1; + uint32 time_microseconds = 2; + uint64 thread_id = 3; + string query_id = 4; + LogsLevel level = 5; + string source = 6; + string text = 7; +} + +message Progress { + uint64 read_rows = 1; + uint64 read_bytes = 2; + uint64 total_rows_to_read = 3; + uint64 written_rows = 4; + uint64 written_bytes = 5; +} + +message Stats { + uint64 rows = 1; + uint64 blocks = 2; + uint64 allocated_bytes = 3; + bool applied_limit = 4; + uint64 rows_before_limit = 5; +} + +message Exception { + int32 code = 1; + string name = 2; + string display_text = 3; + string stack_trace = 4; +} + +// Result of execution of a query which is sent back by the ClickHouse server to the client. +message Result { + // Output of the query, represented in the `output_format` or in a format specified in `query`. + bytes output = 1; + bytes totals = 2; + bytes extremes = 3; + + repeated LogEntry logs = 4; + Progress progress = 5; + Stats stats = 6; + + // Set by the ClickHouse server if there was an exception thrown while executing. + Exception exception = 7; + + // Set by the ClickHouse server if executing was cancelled by the `cancel` field in QueryInfo. + bool cancelled = 8; +} + +service ClickHouse { + rpc ExecuteQuery(QueryInfo) returns (Result) {} + rpc ExecuteQueryWithStreamInput(stream QueryInfo) returns (Result) {} + rpc ExecuteQueryWithStreamOutput(QueryInfo) returns (stream Result) {} + rpc ExecuteQueryWithStreamIO(stream QueryInfo) returns (stream Result) {} +} diff --git a/tests/integration/test_server_reload/test.py b/tests/integration/test_server_reload/test.py new file mode 100644 index 00000000000..3c22b476f64 --- /dev/null +++ b/tests/integration/test_server_reload/test.py @@ -0,0 +1,284 @@ +import contextlib +import grpc +import psycopg2 +import pymysql.connections +import pymysql.err +import pytest +import sys +import time +from helpers.cluster import ClickHouseCluster, run_and_check +from helpers.client import Client, QueryRuntimeException +from kazoo.exceptions import NodeExistsError +from pathlib import Path +from requests.exceptions import ConnectionError +from urllib3.util.retry import Retry + +cluster = ClickHouseCluster(__file__) +instance = cluster.add_instance( + "instance", + main_configs=[ + "configs/ports_from_zk.xml", "configs/ssl_conf.xml", "configs/dhparam.pem", "configs/server.crt", "configs/server.key" + ], + user_configs=["configs/default_passwd.xml"], + with_zookeeper=True) + + +LOADS_QUERY = "SELECT value FROM system.events WHERE event = 'MainConfigLoads'" + + +# Use grpcio-tools to generate *pb2.py files from *.proto. + +proto_dir = Path(__file__).parent / "protos" +gen_dir = Path(__file__).parent / "_gen" +gen_dir.mkdir(exist_ok=True) +run_and_check( + f"python3 -m grpc_tools.protoc -I{proto_dir!s} --python_out={gen_dir!s} --grpc_python_out={gen_dir!s} \ + {proto_dir!s}/clickhouse_grpc.proto", shell=True) + +sys.path.append(str(gen_dir)) +import clickhouse_grpc_pb2 +import clickhouse_grpc_pb2_grpc + + +@pytest.fixture(name="cluster", scope="module") +def fixture_cluster(): + try: + cluster.add_zookeeper_startup_command(configure_ports_from_zk) + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +@pytest.fixture(name="zk", scope="module") +def fixture_zk(cluster): + return cluster.get_kazoo_client("zoo1") + + +def get_client(cluster, port): + return Client(host=cluster.get_instance_ip("instance"), port=port, command=cluster.client_bin_path) + + +def get_mysql_client(cluster, port): + start_time = time.monotonic() + while True: + try: + return pymysql.connections.Connection( + host=cluster.get_instance_ip("instance"), user="default", password="", database="default", port=port) + except pymysql.err.OperationalError: + if time.monotonic() - start_time > 10: + raise + time.sleep(0.1) + + +def get_pgsql_client(cluster, port): + start_time = time.monotonic() + while True: + try: + return psycopg2.connect( + host=cluster.get_instance_ip("instance"), user="postgresql", password="123", database="default", port=port) + except psycopg2.OperationalError: + if time.monotonic() - start_time > 10: + raise + time.sleep(0.1) + + +def get_grpc_channel(cluster, port): + host_port = cluster.get_instance_ip("instance") + f":{port}" + channel = grpc.insecure_channel(host_port) + grpc.channel_ready_future(channel).result(timeout=10) + return channel + + +def grpc_query(channel, query_text): + query_info = clickhouse_grpc_pb2.QueryInfo(query=query_text) + stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel) + result = stub.ExecuteQuery(query_info) + if result and result.HasField("exception"): + raise Exception(result.exception.display_text) + return result.output.decode() + + +def configure_ports_from_zk(zk, querier=None): + default_config = [ + ("/clickhouse/listen_hosts", b"0.0.0.0"), + ("/clickhouse/ports/tcp", b"9000"), + ("/clickhouse/ports/http", b"8123"), + ("/clickhouse/ports/mysql", b"9004"), + ("/clickhouse/ports/postgresql", b"9005"), + ("/clickhouse/ports/grpc", b"9100"), + ] + for path, value in default_config: + if querier is not None: + loads_before = querier(LOADS_QUERY) + has_changed = False + try: + zk.create(path=path, value=value, makepath=True) + has_changed = True + except NodeExistsError: + if zk.get(path) != value: + zk.set(path=path, value=value) + has_changed = True + if has_changed and querier is not None: + wait_loaded_config_changed(loads_before, querier) + + +@contextlib.contextmanager +def sync_loaded_config(querier): + # Depending on whether we test a change on tcp or http + # we monitor canges using the other, untouched, protocol + loads_before = querier(LOADS_QUERY) + yield + wait_loaded_config_changed(loads_before, querier) + + +def wait_loaded_config_changed(loads_before, querier): + loads_after = None + start_time = time.monotonic() + while time.monotonic() - start_time < 10: + try: + loads_after = querier(LOADS_QUERY) + if loads_after != loads_before: + return + except (QueryRuntimeException, ConnectionError): + pass + time.sleep(0.1) + assert loads_after is not None and loads_after != loads_before + + +@contextlib.contextmanager +def default_client(cluster, zk, restore_via_http=False): + client = get_client(cluster, port=9000) + try: + yield client + finally: + querier = instance.http_query if restore_via_http else client.query + configure_ports_from_zk(zk, querier) + + +def test_change_tcp_port(cluster, zk): + with default_client(cluster, zk, restore_via_http=True) as client: + assert client.query("SELECT 1") == "1\n" + with sync_loaded_config(instance.http_query): + zk.set("/clickhouse/ports/tcp", b"9090") + with pytest.raises(QueryRuntimeException, match="Connection refused"): + client.query("SELECT 1") + client_on_new_port = get_client(cluster, port=9090) + assert client_on_new_port.query("SELECT 1") == "1\n" + + +def test_change_http_port(cluster, zk): + with default_client(cluster, zk) as client: + retry_strategy = Retry(total=10, backoff_factor=0.1) + assert instance.http_query("SELECT 1", retry_strategy=retry_strategy) == "1\n" + with sync_loaded_config(client.query): + zk.set("/clickhouse/ports/http", b"9090") + with pytest.raises(ConnectionError, match="Connection refused"): + instance.http_query("SELECT 1") + instance.http_query("SELECT 1", port=9090) == "1\n" + + +def test_change_mysql_port(cluster, zk): + with default_client(cluster, zk) as client: + mysql_client = get_mysql_client(cluster, port=9004) + assert mysql_client.query("SELECT 1") == 1 + with sync_loaded_config(client.query): + zk.set("/clickhouse/ports/mysql", b"9090") + with pytest.raises(pymysql.err.OperationalError, match="Lost connection"): + mysql_client.query("SELECT 1") + mysql_client_on_new_port = get_mysql_client(cluster, port=9090) + assert mysql_client_on_new_port.query("SELECT 1") == 1 + + +def test_change_postgresql_port(cluster, zk): + with default_client(cluster, zk) as client: + pgsql_client = get_pgsql_client(cluster, port=9005) + cursor = pgsql_client.cursor() + cursor.execute("SELECT 1") + assert cursor.fetchall() == [(1,)] + with sync_loaded_config(client.query): + zk.set("/clickhouse/ports/postgresql", b"9090") + with pytest.raises(psycopg2.OperationalError, match="closed"): + cursor.execute("SELECT 1") + pgsql_client_on_new_port = get_pgsql_client(cluster, port=9090) + cursor = pgsql_client_on_new_port.cursor() + cursor.execute("SELECT 1") + cursor.fetchall() == [(1,)] + + +def test_change_grpc_port(cluster, zk): + with default_client(cluster, zk) as client: + grpc_channel = get_grpc_channel(cluster, port=9100) + assert grpc_query(grpc_channel, "SELECT 1") == "1\n" + with sync_loaded_config(client.query): + zk.set("/clickhouse/ports/grpc", b"9090") + with pytest.raises(grpc._channel._InactiveRpcError, match="StatusCode.UNAVAILABLE"): + grpc_query(grpc_channel, "SELECT 1") + grpc_channel_on_new_port = get_grpc_channel(cluster, port=9090) + assert grpc_query(grpc_channel_on_new_port, "SELECT 1") == "1\n" + + +def test_remove_tcp_port(cluster, zk): + with default_client(cluster, zk, restore_via_http=True) as client: + assert client.query("SELECT 1") == "1\n" + with sync_loaded_config(instance.http_query): + zk.delete("/clickhouse/ports/tcp") + with pytest.raises(QueryRuntimeException, match="Connection refused"): + client.query("SELECT 1") + + +def test_remove_http_port(cluster, zk): + with default_client(cluster, zk) as client: + assert instance.http_query("SELECT 1") == "1\n" + with sync_loaded_config(client.query): + zk.delete("/clickhouse/ports/http") + with pytest.raises(ConnectionError, match="Connection refused"): + instance.http_query("SELECT 1") + + +def test_remove_mysql_port(cluster, zk): + with default_client(cluster, zk) as client: + mysql_client = get_mysql_client(cluster, port=9004) + assert mysql_client.query("SELECT 1") == 1 + with sync_loaded_config(client.query): + zk.delete("/clickhouse/ports/mysql") + with pytest.raises(pymysql.err.OperationalError, match="Lost connection"): + mysql_client.query("SELECT 1") + + +def test_remove_postgresql_port(cluster, zk): + with default_client(cluster, zk) as client: + pgsql_client = get_pgsql_client(cluster, port=9005) + cursor = pgsql_client.cursor() + cursor.execute("SELECT 1") + assert cursor.fetchall() == [(1,)] + with sync_loaded_config(client.query): + zk.delete("/clickhouse/ports/postgresql") + with pytest.raises(psycopg2.OperationalError, match="closed"): + cursor.execute("SELECT 1") + + +def test_remove_grpc_port(cluster, zk): + with default_client(cluster, zk) as client: + grpc_channel = get_grpc_channel(cluster, port=9100) + assert grpc_query(grpc_channel, "SELECT 1") == "1\n" + with sync_loaded_config(client.query): + zk.delete("/clickhouse/ports/grpc") + with pytest.raises(grpc._channel._InactiveRpcError, match="StatusCode.UNAVAILABLE"): + grpc_query(grpc_channel, "SELECT 1") + + +def test_change_listen_host(cluster, zk): + localhost_client = Client(host="127.0.0.1", port=9000, command="/usr/bin/clickhouse") + localhost_client.command = ["docker", "exec", "-i", instance.docker_id] + localhost_client.command + try: + client = get_client(cluster, port=9000) + with sync_loaded_config(localhost_client.query): + zk.set("/clickhouse/listen_hosts", b"127.0.0.1") + with pytest.raises(QueryRuntimeException, match="Connection refused"): + client.query("SELECT 1") + assert localhost_client.query("SELECT 1") == "1\n" + finally: + with sync_loaded_config(localhost_client.query): + configure_ports_from_zk(zk) +