From ffc1fca29609251157a6d64e1801f7a34ba9e70b Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Fri, 22 Oct 2021 09:15:34 +0200 Subject: [PATCH] Start/stop servers when listen_host/*_port changes This allows starting and stopping separately each protocol server without restarting ClickHouse. This also allows adding or removing `listen_host` entries, which start and stops servers for all enabled ports. When stopping a server, the listening socket is immediately closed (and available for another server). Protocols with persistent connections try to wait for any currently running query to finish before closing the connection, but idle connection are closed quickly (depending on how often the protocol is polled). An extra ProfileEvent is added, `MainConfigLoads`, it is incremented every time the configuration is reloaded. This helps when trying to assess whether the new configuration was applied. --- programs/keeper/Keeper.cpp | 19 +- programs/server/Server.cpp | 681 +++++++++++------- programs/server/Server.h | 28 +- src/Common/ProfileEvents.cpp | 3 +- src/Interpreters/AsynchronousMetrics.cpp | 35 +- src/Interpreters/AsynchronousMetrics.h | 14 +- src/Server/GRPCServer.h | 4 + src/Server/HTTP/HTTPServer.cpp | 22 +- src/Server/HTTP/HTTPServer.h | 18 +- src/Server/HTTP/HTTPServerConnection.cpp | 13 +- src/Server/HTTP/HTTPServerConnection.h | 3 + .../HTTP/HTTPServerConnectionFactory.cpp | 4 +- src/Server/HTTP/HTTPServerConnectionFactory.h | 6 +- src/Server/KeeperTCPHandlerFactory.h | 6 +- src/Server/MySQLHandler.cpp | 15 +- src/Server/MySQLHandler.h | 10 +- src/Server/MySQLHandlerFactory.cpp | 6 +- src/Server/MySQLHandlerFactory.h | 7 +- src/Server/PostgreSQLHandler.cpp | 12 +- src/Server/PostgreSQLHandler.h | 7 +- src/Server/PostgreSQLHandlerFactory.cpp | 5 +- src/Server/PostgreSQLHandlerFactory.h | 6 +- src/Server/ProtocolServerAdapter.cpp | 39 +- src/Server/ProtocolServerAdapter.h | 21 +- src/Server/TCPHandler.cpp | 10 +- src/Server/TCPHandler.h | 4 +- src/Server/TCPHandlerFactory.h | 8 +- src/Server/TCPServer.cpp | 36 + src/Server/TCPServer.h | 47 ++ src/Server/TCPServerConnectionFactory.h | 27 + tests/integration/helpers/cluster.py | 18 +- .../integration/test_server_reload/.gitignore | 1 + .../test_server_reload/__init__.py | 0 .../configs/default_passwd.xml | 13 + .../test_server_reload/configs/dhparam.pem | 8 + .../configs/ports_from_zk.xml | 9 + .../test_server_reload/configs/server.crt | 18 + .../test_server_reload/configs/server.key | 28 + .../test_server_reload/configs/ssl_conf.xml | 18 + .../protos/clickhouse_grpc.proto | 174 +++++ tests/integration/test_server_reload/test.py | 284 ++++++++ 41 files changed, 1287 insertions(+), 400 deletions(-) create mode 100644 src/Server/TCPServer.cpp create mode 100644 src/Server/TCPServer.h create mode 100644 src/Server/TCPServerConnectionFactory.h create mode 100644 tests/integration/test_server_reload/.gitignore create mode 100644 tests/integration/test_server_reload/__init__.py create mode 100644 tests/integration/test_server_reload/configs/default_passwd.xml create mode 100644 tests/integration/test_server_reload/configs/dhparam.pem create mode 100644 tests/integration/test_server_reload/configs/ports_from_zk.xml create mode 100644 tests/integration/test_server_reload/configs/server.crt create mode 100644 tests/integration/test_server_reload/configs/server.key create mode 100644 tests/integration/test_server_reload/configs/ssl_conf.xml create mode 100644 tests/integration/test_server_reload/protos/clickhouse_grpc.proto create mode 100644 tests/integration/test_server_reload/test.py diff --git a/programs/keeper/Keeper.cpp b/programs/keeper/Keeper.cpp index afd6a36ea15..d144b4d332e 100644 --- a/programs/keeper/Keeper.cpp +++ b/programs/keeper/Keeper.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -379,11 +380,11 @@ int Keeper::main(const std::vector & /*args*/) socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); servers->emplace_back( + listen_host, port_name, - std::make_unique( - new KeeperTCPHandlerFactory(*this, false), server_pool, socket, new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for connections to Keeper (tcp): {}", address.toString()); + "Keeper (tcp): " + address.toString(), + std::make_unique( + new KeeperTCPHandlerFactory(*this, false), server_pool, socket)); }); const char * secure_port_name = "keeper_server.tcp_port_secure"; @@ -395,10 +396,11 @@ int Keeper::main(const std::vector & /*args*/) socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); servers->emplace_back( + listen_host, secure_port_name, - std::make_unique( - new KeeperTCPHandlerFactory(*this, true), server_pool, socket, new Poco::Net::TCPServerParams)); - LOG_INFO(log, "Listening for connections to Keeper with secure protocol (tcp_secure): {}", address.toString()); + "Keeper with secure protocol (tcp_secure): " + address.toString(), + std::make_unique( + new KeeperTCPHandlerFactory(*this, true), server_pool, socket)); #else UNUSED(port); throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", @@ -408,7 +410,10 @@ int Keeper::main(const std::vector & /*args*/) } for (auto & server : *servers) + { server.start(); + LOG_INFO(log, "Listening for {}", server.getDescription()); + } zkutil::EventPtr unused_event = std::make_shared(); zkutil::ZooKeeperNodeCache unused_cache([] { return nullptr; }); diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 14075f9fbf2..43d2b64c4f2 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -34,6 +34,7 @@ #include #include #include +#include #include #include #include @@ -70,6 +71,7 @@ #include "MetricsTransmitter.h" #include #include +#include #include #include #include @@ -127,6 +129,11 @@ namespace CurrentMetrics extern const Metric MaxPushedDDLEntryID; } +namespace ProfileEvents +{ + extern const Event MainConfigLoads; +} + namespace fs = std::filesystem; #if USE_JEMALLOC @@ -344,16 +351,53 @@ Poco::Net::SocketAddress Server::socketBindListen(Poco::Net::ServerSocket & sock return address; } -void Server::createServer(const std::string & listen_host, const char * port_name, bool listen_try, CreateServerFunc && func) const +std::vector getListenHosts(const Poco::Util::AbstractConfiguration & config) +{ + auto listen_hosts = DB::getMultipleValuesFromConfig(config, "", "listen_host"); + if (listen_hosts.empty()) + { + listen_hosts.emplace_back("::1"); + listen_hosts.emplace_back("127.0.0.1"); + } + return listen_hosts; +} + +bool getListenTry(const Poco::Util::AbstractConfiguration & config) +{ + bool listen_try = config.getBool("listen_try", false); + if (!listen_try) + listen_try = DB::getMultipleValuesFromConfig(config, "", "listen_host").empty(); + return listen_try; +} + + +void Server::createServer( + Poco::Util::AbstractConfiguration & config, + const std::string & listen_host, + const char * port_name, + bool listen_try, + bool start_server, + std::vector & servers, + CreateServerFunc && func) const { /// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file. - if (!config().has(port_name)) + if (config.getString(port_name, "").empty()) return; - auto port = config().getInt(port_name); + /// If we already have an active server for this listen_host/port_name, don't create it again + for (const auto & server : servers) + if (!server.isStopping() && server.getListenHost() == listen_host && server.getPortName() == port_name) + return; + + auto port = config.getInt(port_name); try { - func(port); + servers.push_back(func(port)); + if (start_server) + { + servers.back().start(); + LOG_INFO(&logger(), "Listening for {}", servers.back().getDescription()); + } global_context->registerServerPort(port_name, port); } catch (const Poco::Exception &) @@ -515,6 +559,25 @@ if (ThreadFuzzer::instance().isEffective()) config().getUInt("thread_pool_queue_size", 10000) ); + Poco::ThreadPool server_pool(3, config().getUInt("max_connections", 1024)); + std::mutex servers_lock; + std::vector servers; + std::vector servers_to_start_before_tables; + /// This object will periodically calculate some metrics. + AsynchronousMetrics async_metrics( + global_context, config().getUInt("asynchronous_metrics_update_period_s", 1), + [&]() -> std::vector + { + std::vector metrics; + for (const auto & server : servers_to_start_before_tables) + metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); + std::lock_guard lock(servers_lock); + for (const auto & server : servers) + metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); + return metrics; + } + ); + ConnectionCollector::init(global_context, config().getUInt("max_threads_for_connection_collector", 10)); bool has_zookeeper = config().has("zookeeper"); @@ -870,12 +933,17 @@ if (ThreadFuzzer::instance().isEffective()) global_context->reloadZooKeeperIfChanged(config); global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config); + + std::lock_guard lock(servers_lock); + updateServers(*config, server_pool, async_metrics, servers); } global_context->updateStorageConfiguration(*config); global_context->updateInterserverCredentials(*config); CompressionCodecEncrypted::Configuration::instance().tryLoad(*config, "encryption_codecs"); + + ProfileEvents::increment(ProfileEvents::MainConfigLoads); }, /* already_loaded = */ false); /// Reload it right now (initial loading) @@ -987,24 +1055,8 @@ if (ThreadFuzzer::instance().isEffective()) /// try set up encryption. There are some errors in config, error will be printed and server wouldn't start. CompressionCodecEncrypted::Configuration::instance().load(config(), "encryption_codecs"); - Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0); - - Poco::ThreadPool server_pool(3, config().getUInt("max_connections", 1024)); - Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; - http_params->setTimeout(settings.http_receive_timeout); - http_params->setKeepAliveTimeout(keep_alive_timeout); - - auto servers_to_start_before_tables = std::make_shared>(); - - std::vector listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host"); - - bool listen_try = config().getBool("listen_try", false); - if (listen_hosts.empty()) - { - listen_hosts.emplace_back("::1"); - listen_hosts.emplace_back("127.0.0.1"); - listen_try = true; - } + const auto listen_hosts = getListenHosts(config()); + const auto listen_try = getListenTry(config()); if (config().has("keeper_server")) { @@ -1027,39 +1079,46 @@ if (ThreadFuzzer::instance().isEffective()) { /// TCP Keeper const char * port_name = "keeper_server.tcp_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers_to_start_before_tables->emplace_back( - port_name, - std::make_unique( - new KeeperTCPHandlerFactory(*this, false), server_pool, socket, new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for connections to Keeper (tcp): {}", address.toString()); - }); + createServer( + config(), listen_host, port_name, listen_try, /* start_server: */ false, + servers_to_start_before_tables, + [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "Keeper (tcp): " + address.toString(), + std::make_unique( + new KeeperTCPHandlerFactory(*this, false), server_pool, socket)); + }); const char * secure_port_name = "keeper_server.tcp_port_secure"; - createServer(listen_host, secure_port_name, listen_try, [&](UInt16 port) - { + createServer( + config(), listen_host, secure_port_name, listen_try, /* start_server: */ false, + servers_to_start_before_tables, + [&](UInt16 port) -> ProtocolServerAdapter + { #if USE_SSL - Poco::Net::SecureServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers_to_start_before_tables->emplace_back( - secure_port_name, - std::make_unique( - new KeeperTCPHandlerFactory(*this, true), server_pool, socket, new Poco::Net::TCPServerParams)); - LOG_INFO(log, "Listening for connections to Keeper with secure protocol (tcp_secure): {}", address.toString()); + Poco::Net::SecureServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + secure_port_name, + "Keeper with secure protocol (tcp_secure): " + address.toString(), + std::make_unique( + new KeeperTCPHandlerFactory(*this, true), server_pool, socket)); #else - UNUSED(port); - throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", - ErrorCodes::SUPPORT_IS_DISABLED}; + UNUSED(port); + throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", + ErrorCodes::SUPPORT_IS_DISABLED}; #endif - }); + }); } #else throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "ClickHouse server built without NuRaft library. Cannot use internal coordination."); @@ -1067,14 +1126,19 @@ if (ThreadFuzzer::instance().isEffective()) } - for (auto & server : *servers_to_start_before_tables) + for (auto & server : servers_to_start_before_tables) + { server.start(); + LOG_INFO(log, "Listening for {}", server.getDescription()); + } SCOPE_EXIT({ /// Stop reloading of the main config. This must be done before `global_context->shutdown()` because /// otherwise the reloading may pass a changed config to some destroyed parts of ContextSharedPart. main_config_reloader.reset(); + async_metrics.stop(); + /** Ask to cancel background jobs all table engines, * and also query_log. * It is important to do early, not in destructor of Context, because @@ -1086,11 +1150,11 @@ if (ThreadFuzzer::instance().isEffective()) LOG_DEBUG(log, "Shut down storages."); - if (!servers_to_start_before_tables->empty()) + if (!servers_to_start_before_tables.empty()) { LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish."); int current_connections = 0; - for (auto & server : *servers_to_start_before_tables) + for (auto & server : servers_to_start_before_tables) { server.stop(); current_connections += server.currentConnections(); @@ -1102,7 +1166,7 @@ if (ThreadFuzzer::instance().isEffective()) LOG_INFO(log, "Closed all listening sockets."); if (current_connections > 0) - current_connections = waitServersToFinish(*servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5)); + current_connections = waitServersToFinish(servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5)); if (current_connections) LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections); @@ -1256,223 +1320,18 @@ if (ThreadFuzzer::instance().isEffective()) LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled."); #endif - auto servers = std::make_shared>(); { - /// This object will periodically calculate some metrics. - AsynchronousMetrics async_metrics( - global_context, config().getUInt("asynchronous_metrics_update_period_s", 1), servers_to_start_before_tables, servers); attachSystemTablesAsync(global_context, *DatabaseCatalog::instance().getSystemDatabase(), async_metrics); - for (const auto & listen_host : listen_hosts) { - /// HTTP - const char * port_name = "http_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - - servers->emplace_back( - port_name, - std::make_unique( - context(), createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); - - LOG_INFO(log, "Listening for http://{}", address.toString()); - }); - - /// HTTPS - port_name = "https_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { -#if USE_SSL - Poco::Net::SecureServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back( - port_name, - std::make_unique( - context(), createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); - - LOG_INFO(log, "Listening for https://{}", address.toString()); -#else - UNUSED(port); - throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.", - ErrorCodes::SUPPORT_IS_DISABLED}; -#endif - }); - - /// TCP - port_name = "tcp_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for connections with native protocol (tcp): {}", address.toString()); - }); - - /// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt - port_name = "tcp_with_proxy_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for connections with native protocol (tcp) with PROXY: {}", address.toString()); - }); - - /// TCP with SSL - port_name = "tcp_port_secure"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { -#if USE_SSL - Poco::Net::SecureServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(settings.receive_timeout); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - LOG_INFO(log, "Listening for connections with secure native protocol (tcp_secure): {}", address.toString()); -#else - UNUSED(port); - throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", - ErrorCodes::SUPPORT_IS_DISABLED}; -#endif - }); - - /// Interserver IO HTTP - port_name = "interserver_http_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back( - port_name, - std::make_unique( - context(), - createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), - server_pool, - socket, - http_params)); - - LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString()); - }); - - port_name = "interserver_https_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { -#if USE_SSL - Poco::Net::SecureServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back( - port_name, - std::make_unique( - context(), - createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), - server_pool, - socket, - http_params)); - - LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString()); -#else - UNUSED(port); - throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", - ErrorCodes::SUPPORT_IS_DISABLED}; -#endif - }); - - port_name = "mysql_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(Poco::Timespan()); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new MySQLHandlerFactory(*this), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for MySQL compatibility protocol: {}", address.toString()); - }); - - port_name = "postgresql_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); - socket.setReceiveTimeout(Poco::Timespan()); - socket.setSendTimeout(settings.send_timeout); - servers->emplace_back(port_name, std::make_unique( - new PostgreSQLHandlerFactory(*this), - server_pool, - socket, - new Poco::Net::TCPServerParams)); - - LOG_INFO(log, "Listening for PostgreSQL compatibility protocol: " + address.toString()); - }); - -#if USE_GRPC - port_name = "grpc_port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::SocketAddress server_address(listen_host, port); - servers->emplace_back(port_name, std::make_unique(*this, makeSocketAddress(listen_host, port, log))); - LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString()); - }); -#endif - - /// Prometheus (if defined and not setup yet with http_port) - port_name = "prometheus.port"; - createServer(listen_host, port_name, listen_try, [&](UInt16 port) - { - Poco::Net::ServerSocket socket; - auto address = socketBindListen(socket, listen_host, port); - socket.setReceiveTimeout(settings.http_receive_timeout); - socket.setSendTimeout(settings.http_send_timeout); - servers->emplace_back( - port_name, - std::make_unique( - context(), - createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), - server_pool, - socket, - http_params)); - - LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString()); - }); + std::lock_guard lock(servers_lock); + createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers); + if (servers.empty()) + throw Exception( + "No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", + ErrorCodes::NO_ELEMENTS_IN_CONFIG); } - if (servers->empty()) - throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", - ErrorCodes::NO_ELEMENTS_IN_CONFIG); - - /// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread. async_metrics.start(); { @@ -1551,9 +1410,15 @@ if (ThreadFuzzer::instance().isEffective()) &CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID)); } - for (auto & server : *servers) - server.start(); - LOG_INFO(log, "Ready for connections."); + { + std::lock_guard lock(servers_lock); + for (auto & server : servers) + { + server.start(); + LOG_INFO(log, "Listening for {}", server.getDescription()); + } + LOG_INFO(log, "Ready for connections."); + } SCOPE_EXIT_SAFE({ LOG_DEBUG(log, "Received termination signal."); @@ -1562,10 +1427,13 @@ if (ThreadFuzzer::instance().isEffective()) is_cancelled = true; int current_connections = 0; - for (auto & server : *servers) { - server.stop(); - current_connections += server.currentConnections(); + std::lock_guard lock(servers_lock); + for (auto & server : servers) + { + server.stop(); + current_connections += server.currentConnections(); + } } if (current_connections) @@ -1578,7 +1446,7 @@ if (ThreadFuzzer::instance().isEffective()) global_context->getProcessList().killAllQueries(); if (current_connections) - current_connections = waitServersToFinish(*servers, config().getInt("shutdown_wait_unfinished", 5)); + current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5)); if (current_connections) LOG_INFO(log, "Closed connections. But {} remain." @@ -1614,4 +1482,273 @@ if (ThreadFuzzer::instance().isEffective()) return Application::EXIT_OK; } + +void Server::createServers( + Poco::Util::AbstractConfiguration & config, + const std::vector & listen_hosts, + bool listen_try, + Poco::ThreadPool & server_pool, + AsynchronousMetrics & async_metrics, + std::vector & servers, + bool start_servers) +{ + const Settings & settings = global_context->getSettingsRef(); + + Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0); + Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams; + http_params->setTimeout(settings.http_receive_timeout); + http_params->setKeepAliveTimeout(keep_alive_timeout); + + for (const auto & listen_host : listen_hosts) + { + /// HTTP + const char * port_name = "http_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + + return ProtocolServerAdapter( + listen_host, + port_name, + "http://" + address.toString(), + std::make_unique( + context(), createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); + }); + + /// HTTPS + port_name = "https_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { +#if USE_SSL + Poco::Net::SecureServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "https://" + address.toString(), + std::make_unique( + context(), createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); +#else + UNUSED(port); + throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.", + ErrorCodes::SUPPORT_IS_DISABLED}; +#endif + }); + + /// TCP + port_name = "tcp_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "native protocol (tcp): " + address.toString(), + std::make_unique( + new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false), + server_pool, + socket, + new Poco::Net::TCPServerParams)); + }); + + /// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt + port_name = "tcp_with_proxy_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "native protocol (tcp) with PROXY: " + address.toString(), + std::make_unique( + new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true), + server_pool, + socket, + new Poco::Net::TCPServerParams)); + }); + + /// TCP with SSL + port_name = "tcp_port_secure"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { +#if USE_SSL + Poco::Net::SecureServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(settings.receive_timeout); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "secure native protocol (tcp_secure): " + address.toString(), + std::make_unique( + new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false), + server_pool, + socket, + new Poco::Net::TCPServerParams)); +#else + UNUSED(port); + throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", + ErrorCodes::SUPPORT_IS_DISABLED}; +#endif + }); + + /// Interserver IO HTTP + port_name = "interserver_http_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "replica communication (interserver): http://" + address.toString(), + std::make_unique( + context(), + createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), + server_pool, + socket, + http_params)); + }); + + port_name = "interserver_https_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { +#if USE_SSL + Poco::Net::SecureServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "secure replica communication (interserver): https://" + address.toString(), + std::make_unique( + context(), + createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), + server_pool, + socket, + http_params)); +#else + UNUSED(port); + throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.", + ErrorCodes::SUPPORT_IS_DISABLED}; +#endif + }); + + port_name = "mysql_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(Poco::Timespan()); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "MySQL compatibility protocol: " + address.toString(), + std::make_unique(new MySQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); + }); + + port_name = "postgresql_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); + socket.setReceiveTimeout(Poco::Timespan()); + socket.setSendTimeout(settings.send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "PostgreSQL compatibility protocol: " + address.toString(), + std::make_unique(new PostgreSQLHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); + }); + +#if USE_GRPC + port_name = "grpc_port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::SocketAddress server_address(listen_host, port); + return ProtocolServerAdapter( + listen_host, + port_name, + "gRPC protocol: " + server_address.toString(), + std::make_unique(*this, makeSocketAddress(listen_host, port, &logger()))); + }); +#endif + + /// Prometheus (if defined and not setup yet with http_port) + port_name = "prometheus.port"; + createServer(config, listen_host, port_name, listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter + { + Poco::Net::ServerSocket socket; + auto address = socketBindListen(socket, listen_host, port); + socket.setReceiveTimeout(settings.http_receive_timeout); + socket.setSendTimeout(settings.http_send_timeout); + return ProtocolServerAdapter( + listen_host, + port_name, + "Prometheus: http://" + address.toString(), + std::make_unique( + context(), createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params)); + }); + } + +} + +void Server::updateServers( + Poco::Util::AbstractConfiguration & config, + Poco::ThreadPool & server_pool, + AsynchronousMetrics & async_metrics, + std::vector & servers) +{ + Poco::Logger * log = &logger(); + /// Gracefully shutdown servers when their port is removed from config + const auto listen_hosts = getListenHosts(config); + const auto listen_try = getListenTry(config); + + for (auto & server : servers) + if (!server.isStopping()) + { + bool has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server.getListenHost()) != listen_hosts.end(); + bool has_port = !config.getString(server.getPortName(), "").empty(); + if (!has_host || !has_port || config.getInt(server.getPortName()) != server.portNumber()) + { + server.stop(); + LOG_INFO(log, "Stopped listening for {}", server.getDescription()); + } + } + + createServers(config, listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers: */ true); + + /// Remove servers once all their connections are closed + while (std::any_of(servers.begin(), servers.end(), [](const auto & server) { return server.isStopping(); })) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + std::erase_if(servers, [&log](auto & server) + { + if (!server.isStopping()) + return false; + auto is_finished = server.currentConnections() == 0; + if (is_finished) + LOG_DEBUG(log, "Server finished: {}", server.getDescription()); + else + LOG_TRACE(log, "Waiting server to finish: {}", server.getDescription()); + return is_finished; + }); + } +} + } diff --git a/programs/server/Server.h b/programs/server/Server.h index 45e5fccd51d..b4f2ea3bb79 100644 --- a/programs/server/Server.h +++ b/programs/server/Server.h @@ -24,6 +24,8 @@ namespace Poco namespace DB { +class AsynchronousMetrics; +class ProtocolServerAdapter; class Server : public BaseDaemon, public IServer { @@ -67,8 +69,30 @@ private: ContextMutablePtr global_context; Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = false) const; - using CreateServerFunc = std::function; - void createServer(const std::string & listen_host, const char * port_name, bool listen_try, CreateServerFunc && func) const; + using CreateServerFunc = std::function; + void createServer( + Poco::Util::AbstractConfiguration & config, + const std::string & listen_host, + const char * port_name, + bool listen_try, + bool start_server, + std::vector & servers, + CreateServerFunc && func) const; + + void createServers( + Poco::Util::AbstractConfiguration & config, + const std::vector & listen_hosts, + bool listen_try, + Poco::ThreadPool & server_pool, + AsynchronousMetrics & async_metrics, + std::vector & servers, + bool start_servers = false); + + void updateServers( + Poco::Util::AbstractConfiguration & config, + Poco::ThreadPool & server_pool, + AsynchronousMetrics & async_metrics, + std::vector & servers); }; } diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 982523a3ef2..ce321d17b48 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -274,7 +274,8 @@ M(ThreadPoolReaderPageCacheMissElapsedMicroseconds, "Time spent reading data inside the asynchronous job in ThreadPoolReader - when read was not done from page cache.") \ \ M(AsynchronousReadWaitMicroseconds, "Time spent in waiting for asynchronous reads.") \ - + \ + M(MainConfigLoads, "Number of times the main configuration was reloaded.") \ namespace ProfileEvents { diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index 121f7c4153f..d1c5fbebbc7 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -69,12 +69,10 @@ static std::unique_ptr openFileIfExists(const std::stri AsynchronousMetrics::AsynchronousMetrics( ContextPtr global_context_, int update_period_seconds, - std::shared_ptr> servers_to_start_before_tables_, - std::shared_ptr> servers_) + const ProtocolServerMetricsFunc & protocol_server_metrics_func_) : WithContext(global_context_) , update_period(update_period_seconds) - , servers_to_start_before_tables(servers_to_start_before_tables_) - , servers(servers_) + , protocol_server_metrics_func(protocol_server_metrics_func_) , log(&Poco::Logger::get("AsynchronousMetrics")) { #if defined(OS_LINUX) @@ -238,7 +236,7 @@ void AsynchronousMetrics::start() thread = std::make_unique([this] { run(); }); } -AsynchronousMetrics::~AsynchronousMetrics() +void AsynchronousMetrics::stop() { try { @@ -249,7 +247,10 @@ AsynchronousMetrics::~AsynchronousMetrics() wait_cond.notify_one(); if (thread) + { thread->join(); + thread.reset(); + } } catch (...) { @@ -257,6 +258,11 @@ AsynchronousMetrics::~AsynchronousMetrics() } } +AsynchronousMetrics::~AsynchronousMetrics() +{ + stop(); +} + AsynchronousMetricValues AsynchronousMetrics::getValues() const { @@ -1381,22 +1387,11 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti return it->second; }; - if (servers_to_start_before_tables) + const auto server_metrics = protocol_server_metrics_func(); + for (const auto & server_metric : server_metrics) { - for (const auto & server : *servers_to_start_before_tables) - { - if (const auto * name = get_metric_name(server.getPortName())) - new_values[name] = server.currentThreads(); - } - } - - if (servers) - { - for (const auto & server : *servers) - { - if (const auto * name = get_metric_name(server.getPortName())) - new_values[name] = server.currentThreads(); - } + if (const auto * name = get_metric_name(server_metric.port_name)) + new_values[name] = server_metric.current_threads; } } diff --git a/src/Interpreters/AsynchronousMetrics.h b/src/Interpreters/AsynchronousMetrics.h index 7a5c2d638d7..3c7581ce1a3 100644 --- a/src/Interpreters/AsynchronousMetrics.h +++ b/src/Interpreters/AsynchronousMetrics.h @@ -30,6 +30,11 @@ class ReadBuffer; using AsynchronousMetricValue = double; using AsynchronousMetricValues = std::unordered_map; +struct ProtocolServerMetrics +{ + String port_name; + size_t current_threads; +}; /** Periodically (by default, each minute, starting at 30 seconds offset) * calculates and updates some metrics, @@ -41,24 +46,25 @@ using AsynchronousMetricValues = std::unordered_map()>; AsynchronousMetrics( ContextPtr global_context_, int update_period_seconds, - std::shared_ptr> servers_to_start_before_tables_, - std::shared_ptr> servers_); + const ProtocolServerMetricsFunc & protocol_server_metrics_func_); ~AsynchronousMetrics(); /// Separate method allows to initialize the `servers` variable beforehand. void start(); + void stop(); + /// Returns copy of all values. AsynchronousMetricValues getValues() const; private: const std::chrono::seconds update_period; - std::shared_ptr> servers_to_start_before_tables{nullptr}; - std::shared_ptr> servers{nullptr}; + ProtocolServerMetricsFunc protocol_server_metrics_func; mutable std::mutex mutex; std::condition_variable wait_cond; diff --git a/src/Server/GRPCServer.h b/src/Server/GRPCServer.h index 25c3813c11d..e2b48f1c16b 100644 --- a/src/Server/GRPCServer.h +++ b/src/Server/GRPCServer.h @@ -4,6 +4,7 @@ #if USE_GRPC #include +#include #include "clickhouse_grpc.grpc.pb.h" namespace Poco { class Logger; } @@ -30,6 +31,9 @@ public: /// Stops the server. No new connections will be accepted. void stop(); + /// Returns the port this server is listening to. + UInt16 portNumber() const { return address_to_listen.port(); } + /// Returns the number of currently handled connections. size_t currentConnections() const; diff --git a/src/Server/HTTP/HTTPServer.cpp b/src/Server/HTTP/HTTPServer.cpp index 42e6467d0af..2e91fad1c0f 100644 --- a/src/Server/HTTP/HTTPServer.cpp +++ b/src/Server/HTTP/HTTPServer.cpp @@ -5,31 +5,13 @@ namespace DB { -HTTPServer::HTTPServer( - ContextPtr context, - HTTPRequestHandlerFactoryPtr factory_, - UInt16 port_number, - Poco::Net::HTTPServerParams::Ptr params) - : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), port_number, params), factory(factory_) -{ -} - -HTTPServer::HTTPServer( - ContextPtr context, - HTTPRequestHandlerFactoryPtr factory_, - const Poco::Net::ServerSocket & socket, - Poco::Net::HTTPServerParams::Ptr params) - : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), socket, params), factory(factory_) -{ -} - HTTPServer::HTTPServer( ContextPtr context, HTTPRequestHandlerFactoryPtr factory_, Poco::ThreadPool & thread_pool, - const Poco::Net::ServerSocket & socket, + Poco::Net::ServerSocket & socket_, Poco::Net::HTTPServerParams::Ptr params) - : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), thread_pool, socket, params), factory(factory_) + : TCPServer(new HTTPServerConnectionFactory(context, params, factory_), thread_pool, socket_, params), factory(factory_) { } diff --git a/src/Server/HTTP/HTTPServer.h b/src/Server/HTTP/HTTPServer.h index 3518fd66d20..07ad54d267f 100644 --- a/src/Server/HTTP/HTTPServer.h +++ b/src/Server/HTTP/HTTPServer.h @@ -1,9 +1,9 @@ #pragma once #include +#include #include -#include #include @@ -13,26 +13,14 @@ namespace DB class Context; -class HTTPServer : public Poco::Net::TCPServer +class HTTPServer : public TCPServer { public: explicit HTTPServer( - ContextPtr context, - HTTPRequestHandlerFactoryPtr factory, - UInt16 port_number = 80, - Poco::Net::HTTPServerParams::Ptr params = new Poco::Net::HTTPServerParams); - - HTTPServer( - ContextPtr context, - HTTPRequestHandlerFactoryPtr factory, - const Poco::Net::ServerSocket & socket, - Poco::Net::HTTPServerParams::Ptr params); - - HTTPServer( ContextPtr context, HTTPRequestHandlerFactoryPtr factory, Poco::ThreadPool & thread_pool, - const Poco::Net::ServerSocket & socket, + Poco::Net::ServerSocket & socket, Poco::Net::HTTPServerParams::Ptr params); ~HTTPServer() override; diff --git a/src/Server/HTTP/HTTPServerConnection.cpp b/src/Server/HTTP/HTTPServerConnection.cpp index de81da20ead..7020b8e9a23 100644 --- a/src/Server/HTTP/HTTPServerConnection.cpp +++ b/src/Server/HTTP/HTTPServerConnection.cpp @@ -1,4 +1,5 @@ #include +#include #include @@ -7,10 +8,11 @@ namespace DB HTTPServerConnection::HTTPServerConnection( ContextPtr context_, + TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket, Poco::Net::HTTPServerParams::Ptr params_, HTTPRequestHandlerFactoryPtr factory_) - : TCPServerConnection(socket), context(Context::createCopy(context_)), params(params_), factory(factory_), stopped(false) + : TCPServerConnection(socket), context(Context::createCopy(context_)), tcp_server(tcp_server_), params(params_), factory(factory_), stopped(false) { poco_check_ptr(factory); } @@ -20,12 +22,12 @@ void HTTPServerConnection::run() std::string server = params->getSoftwareVersion(); Poco::Net::HTTPServerSession session(socket(), params); - while (!stopped && session.hasMoreRequests()) + while (!stopped && tcp_server.isOpen() && session.hasMoreRequests()) { try { std::unique_lock lock(mutex); - if (!stopped) + if (!stopped && tcp_server.isOpen()) { HTTPServerResponse response(session); HTTPServerRequest request(context, response, session); @@ -48,6 +50,11 @@ void HTTPServerConnection::run() response.set("Server", server); try { + if (!tcp_server.isOpen()) + { + sendErrorResponse(session, Poco::Net::HTTPResponse::HTTP_SERVICE_UNAVAILABLE); + break; + } std::unique_ptr handler(factory->createRequestHandler(request)); if (handler) diff --git a/src/Server/HTTP/HTTPServerConnection.h b/src/Server/HTTP/HTTPServerConnection.h index 1c7ae6cd2b7..db3969f6ffb 100644 --- a/src/Server/HTTP/HTTPServerConnection.h +++ b/src/Server/HTTP/HTTPServerConnection.h @@ -9,12 +9,14 @@ namespace DB { +class TCPServer; class HTTPServerConnection : public Poco::Net::TCPServerConnection { public: HTTPServerConnection( ContextPtr context, + TCPServer & tcp_server, const Poco::Net::StreamSocket & socket, Poco::Net::HTTPServerParams::Ptr params, HTTPRequestHandlerFactoryPtr factory); @@ -26,6 +28,7 @@ protected: private: ContextPtr context; + TCPServer & tcp_server; Poco::Net::HTTPServerParams::Ptr params; HTTPRequestHandlerFactoryPtr factory; bool stopped; diff --git a/src/Server/HTTP/HTTPServerConnectionFactory.cpp b/src/Server/HTTP/HTTPServerConnectionFactory.cpp index 0e4fb6cfcec..008da222c79 100644 --- a/src/Server/HTTP/HTTPServerConnectionFactory.cpp +++ b/src/Server/HTTP/HTTPServerConnectionFactory.cpp @@ -11,9 +11,9 @@ HTTPServerConnectionFactory::HTTPServerConnectionFactory( poco_check_ptr(factory); } -Poco::Net::TCPServerConnection * HTTPServerConnectionFactory::createConnection(const Poco::Net::StreamSocket & socket) +Poco::Net::TCPServerConnection * HTTPServerConnectionFactory::createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) { - return new HTTPServerConnection(context, socket, params, factory); + return new HTTPServerConnection(context, tcp_server, socket, params, factory); } } diff --git a/src/Server/HTTP/HTTPServerConnectionFactory.h b/src/Server/HTTP/HTTPServerConnectionFactory.h index 3f11eca0f69..a19dc6d4d5c 100644 --- a/src/Server/HTTP/HTTPServerConnectionFactory.h +++ b/src/Server/HTTP/HTTPServerConnectionFactory.h @@ -2,19 +2,19 @@ #include #include +#include #include -#include namespace DB { -class HTTPServerConnectionFactory : public Poco::Net::TCPServerConnectionFactory +class HTTPServerConnectionFactory : public TCPServerConnectionFactory { public: HTTPServerConnectionFactory(ContextPtr context, Poco::Net::HTTPServerParams::Ptr params, HTTPRequestHandlerFactoryPtr factory); - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override; + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override; private: ContextPtr context; diff --git a/src/Server/KeeperTCPHandlerFactory.h b/src/Server/KeeperTCPHandlerFactory.h index 67bb3dab268..58dc73d7c27 100644 --- a/src/Server/KeeperTCPHandlerFactory.h +++ b/src/Server/KeeperTCPHandlerFactory.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include @@ -10,7 +10,7 @@ namespace DB { -class KeeperTCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory +class KeeperTCPHandlerFactory : public TCPServerConnectionFactory { private: IServer & server; @@ -29,7 +29,7 @@ public: { } - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer &) override { try { diff --git a/src/Server/MySQLHandler.cpp b/src/Server/MySQLHandler.cpp index deebc073ad5..2836ee05c30 100644 --- a/src/Server/MySQLHandler.cpp +++ b/src/Server/MySQLHandler.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -62,10 +63,11 @@ static String showTableStatusReplacementQuery(const String & query); static String killConnectionIdReplacementQuery(const String & query); static String selectLimitReplacementQuery(const String & query); -MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, +MySQLHandler::MySQLHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_) : Poco::Net::TCPServerConnection(socket_) , server(server_) + , tcp_server(tcp_server_) , log(&Poco::Logger::get("MySQLHandler")) , connection_id(connection_id_) , auth_plugin(new MySQLProtocol::Authentication::Native41()) @@ -138,11 +140,14 @@ void MySQLHandler::run() OKPacket ok_packet(0, handshake_response.capability_flags, 0, 0, 0); packet_endpoint->sendPacket(ok_packet, true); - while (true) + while (tcp_server.isOpen()) { packet_endpoint->resetSequenceId(); MySQLPacketPayloadReadBuffer payload = packet_endpoint->getPayload(); + while (!in->poll(1000000)) + if (!tcp_server.isOpen()) + return; char command = 0; payload.readStrict(command); @@ -152,6 +157,8 @@ void MySQLHandler::run() LOG_DEBUG(log, "Received command: {}. Connection id: {}.", static_cast(static_cast(command)), connection_id); + if (!tcp_server.isOpen()) + return; try { switch (command) @@ -369,8 +376,8 @@ void MySQLHandler::finishHandshakeSSL( } #if USE_SSL -MySQLHandlerSSL::MySQLHandlerSSL(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_) - : MySQLHandler(server_, socket_, ssl_enabled, connection_id_) +MySQLHandlerSSL::MySQLHandlerSSL(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_) + : MySQLHandler(server_, tcp_server_, socket_, ssl_enabled, connection_id_) , public_key(public_key_) , private_key(private_key_) {} diff --git a/src/Server/MySQLHandler.h b/src/Server/MySQLHandler.h index 7ef212bf36e..3af5f7a0eb2 100644 --- a/src/Server/MySQLHandler.h +++ b/src/Server/MySQLHandler.h @@ -24,11 +24,14 @@ namespace CurrentMetrics namespace DB { +class ReadBufferFromPocoSocket; +class TCPServer; + /// Handler for MySQL wire protocol connections. Allows to connect to ClickHouse using MySQL client. class MySQLHandler : public Poco::Net::TCPServerConnection { public: - MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_); + MySQLHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_); void run() final; @@ -52,6 +55,7 @@ protected: virtual void finishHandshakeSSL(size_t packet_size, char * buf, size_t pos, std::function read_bytes, MySQLProtocol::ConnectionPhase::HandshakeResponse & packet); IServer & server; + TCPServer & tcp_server; Poco::Logger * log; UInt64 connection_id = 0; @@ -68,7 +72,7 @@ protected: Replacements replacements; std::unique_ptr auth_plugin; - std::shared_ptr in; + std::shared_ptr in; std::shared_ptr out; bool secure_connection = false; }; @@ -77,7 +81,7 @@ protected: class MySQLHandlerSSL : public MySQLHandler { public: - MySQLHandlerSSL(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_); + MySQLHandlerSSL(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_, RSA & public_key_, RSA & private_key_); private: void authPluginSSL() override; diff --git a/src/Server/MySQLHandlerFactory.cpp b/src/Server/MySQLHandlerFactory.cpp index 7a0bfd8ab09..f7bb073e275 100644 --- a/src/Server/MySQLHandlerFactory.cpp +++ b/src/Server/MySQLHandlerFactory.cpp @@ -118,14 +118,14 @@ void MySQLHandlerFactory::generateRSAKeys() } #endif -Poco::Net::TCPServerConnection * MySQLHandlerFactory::createConnection(const Poco::Net::StreamSocket & socket) +Poco::Net::TCPServerConnection * MySQLHandlerFactory::createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) { size_t connection_id = last_connection_id++; LOG_TRACE(log, "MySQL connection. Id: {}. Address: {}", connection_id, socket.peerAddress().toString()); #if USE_SSL - return new MySQLHandlerSSL(server, socket, ssl_enabled, connection_id, *public_key, *private_key); + return new MySQLHandlerSSL(server, tcp_server, socket, ssl_enabled, connection_id, *public_key, *private_key); #else - return new MySQLHandler(server, socket, ssl_enabled, connection_id); + return new MySQLHandler(server, tcp_server, socket, ssl_enabled, connection_id); #endif } diff --git a/src/Server/MySQLHandlerFactory.h b/src/Server/MySQLHandlerFactory.h index 106fdfdf341..25f1af85273 100644 --- a/src/Server/MySQLHandlerFactory.h +++ b/src/Server/MySQLHandlerFactory.h @@ -1,9 +1,9 @@ #pragma once -#include #include #include #include +#include #include @@ -13,8 +13,9 @@ namespace DB { +class TCPServer; -class MySQLHandlerFactory : public Poco::Net::TCPServerConnectionFactory +class MySQLHandlerFactory : public TCPServerConnectionFactory { private: IServer & server; @@ -43,7 +44,7 @@ public: void generateRSAKeys(); - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override; + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override; }; } diff --git a/src/Server/PostgreSQLHandler.cpp b/src/Server/PostgreSQLHandler.cpp index fee4ace3452..9808b538280 100644 --- a/src/Server/PostgreSQLHandler.cpp +++ b/src/Server/PostgreSQLHandler.cpp @@ -6,6 +6,7 @@ #include #include "PostgreSQLHandler.h" #include +#include #include #include #include @@ -28,11 +29,13 @@ namespace ErrorCodes PostgreSQLHandler::PostgreSQLHandler( const Poco::Net::StreamSocket & socket_, IServer & server_, + TCPServer & tcp_server_, bool ssl_enabled_, Int32 connection_id_, std::vector> & auth_methods_) : Poco::Net::TCPServerConnection(socket_) , server(server_) + , tcp_server(tcp_server_) , ssl_enabled(ssl_enabled_) , connection_id(connection_id_) , authentication_manager(auth_methods_) @@ -60,11 +63,18 @@ void PostgreSQLHandler::run() if (!startup()) return; - while (true) + while (tcp_server.isOpen()) { message_transport->send(PostgreSQLProtocol::Messaging::ReadyForQuery(), true); + + constexpr size_t connection_check_timeout = 1; // 1 second + while (!in->poll(1000000 * connection_check_timeout)) + if (!tcp_server.isOpen()) + return; PostgreSQLProtocol::Messaging::FrontMessageType message_type = message_transport->receiveMessageType(); + if (!tcp_server.isOpen()) + return; switch (message_type) { case PostgreSQLProtocol::Messaging::FrontMessageType::QUERY: diff --git a/src/Server/PostgreSQLHandler.h b/src/Server/PostgreSQLHandler.h index 1d33f41f255..4fd08cc2606 100644 --- a/src/Server/PostgreSQLHandler.h +++ b/src/Server/PostgreSQLHandler.h @@ -18,8 +18,9 @@ namespace CurrentMetrics namespace DB { - +class ReadBufferFromPocoSocket; class Session; +class TCPServer; /** PostgreSQL wire protocol implementation. * For more info see https://www.postgresql.org/docs/current/protocol.html @@ -30,6 +31,7 @@ public: PostgreSQLHandler( const Poco::Net::StreamSocket & socket_, IServer & server_, + TCPServer & tcp_server_, bool ssl_enabled_, Int32 connection_id_, std::vector> & auth_methods_); @@ -40,12 +42,13 @@ private: Poco::Logger * log = &Poco::Logger::get("PostgreSQLHandler"); IServer & server; + TCPServer & tcp_server; std::unique_ptr session; bool ssl_enabled = false; Int32 connection_id = 0; Int32 secret_key = 0; - std::shared_ptr in; + std::shared_ptr in; std::shared_ptr out; std::shared_ptr message_transport; diff --git a/src/Server/PostgreSQLHandlerFactory.cpp b/src/Server/PostgreSQLHandlerFactory.cpp index 1158cf5835e..6f2124861e7 100644 --- a/src/Server/PostgreSQLHandlerFactory.cpp +++ b/src/Server/PostgreSQLHandlerFactory.cpp @@ -1,5 +1,4 @@ #include "PostgreSQLHandlerFactory.h" -#include #include #include @@ -17,11 +16,11 @@ PostgreSQLHandlerFactory::PostgreSQLHandlerFactory(IServer & server_) }; } -Poco::Net::TCPServerConnection * PostgreSQLHandlerFactory::createConnection(const Poco::Net::StreamSocket & socket) +Poco::Net::TCPServerConnection * PostgreSQLHandlerFactory::createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) { Int32 connection_id = last_connection_id++; LOG_TRACE(log, "PostgreSQL connection. Id: {}. Address: {}", connection_id, socket.peerAddress().toString()); - return new PostgreSQLHandler(socket, server, ssl_enabled, connection_id, auth_methods); + return new PostgreSQLHandler(socket, server, tcp_server, ssl_enabled, connection_id, auth_methods); } } diff --git a/src/Server/PostgreSQLHandlerFactory.h b/src/Server/PostgreSQLHandlerFactory.h index dc3d4047d2a..e9241da6f0e 100644 --- a/src/Server/PostgreSQLHandlerFactory.h +++ b/src/Server/PostgreSQLHandlerFactory.h @@ -1,16 +1,16 @@ #pragma once -#include #include #include #include +#include #include #include namespace DB { -class PostgreSQLHandlerFactory : public Poco::Net::TCPServerConnectionFactory +class PostgreSQLHandlerFactory : public TCPServerConnectionFactory { private: IServer & server; @@ -28,6 +28,6 @@ private: public: explicit PostgreSQLHandlerFactory(IServer & server_); - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override; + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & server) override; }; } diff --git a/src/Server/ProtocolServerAdapter.cpp b/src/Server/ProtocolServerAdapter.cpp index 6ec1ec572f7..b41ad2376f1 100644 --- a/src/Server/ProtocolServerAdapter.cpp +++ b/src/Server/ProtocolServerAdapter.cpp @@ -1,5 +1,5 @@ #include -#include +#include #if USE_GRPC #include @@ -11,20 +11,29 @@ namespace DB class ProtocolServerAdapter::TCPServerAdapterImpl : public Impl { public: - explicit TCPServerAdapterImpl(std::unique_ptr tcp_server_) : tcp_server(std::move(tcp_server_)) {} + explicit TCPServerAdapterImpl(std::unique_ptr tcp_server_) : tcp_server(std::move(tcp_server_)) {} ~TCPServerAdapterImpl() override = default; void start() override { tcp_server->start(); } void stop() override { tcp_server->stop(); } + bool isStopping() const override { return !tcp_server->isOpen(); } + UInt16 portNumber() const override { return tcp_server->portNumber(); } size_t currentConnections() const override { return tcp_server->currentConnections(); } size_t currentThreads() const override { return tcp_server->currentThreads(); } private: - std::unique_ptr tcp_server; + std::unique_ptr tcp_server; }; -ProtocolServerAdapter::ProtocolServerAdapter(const char * port_name_, std::unique_ptr tcp_server_) - : port_name(port_name_), impl(std::make_unique(std::move(tcp_server_))) +ProtocolServerAdapter::ProtocolServerAdapter( + const std::string & listen_host_, + const char * port_name_, + const std::string & description_, + std::unique_ptr tcp_server_) + : listen_host(listen_host_) + , port_name(port_name_) + , description(description_) + , impl(std::make_unique(std::move(tcp_server_))) { } @@ -36,16 +45,30 @@ public: ~GRPCServerAdapterImpl() override = default; void start() override { grpc_server->start(); } - void stop() override { grpc_server->stop(); } + void stop() override + { + is_stopping = true; + grpc_server->stop(); + } + bool isStopping() const override { return is_stopping; } + UInt16 portNumber() const override { return grpc_server->portNumber(); } size_t currentConnections() const override { return grpc_server->currentConnections(); } size_t currentThreads() const override { return grpc_server->currentThreads(); } private: std::unique_ptr grpc_server; + bool is_stopping = false; }; -ProtocolServerAdapter::ProtocolServerAdapter(const char * port_name_, std::unique_ptr grpc_server_) - : port_name(port_name_), impl(std::make_unique(std::move(grpc_server_))) +ProtocolServerAdapter::ProtocolServerAdapter( + const std::string & listen_host_, + const char * port_name_, + const std::string & description_, + std::unique_ptr grpc_server_) + : listen_host(listen_host_) + , port_name(port_name_) + , description(description_) + , impl(std::make_unique(std::move(grpc_server_))) { } #endif diff --git a/src/Server/ProtocolServerAdapter.h b/src/Server/ProtocolServerAdapter.h index 04c46b53356..9b3b1af0301 100644 --- a/src/Server/ProtocolServerAdapter.h +++ b/src/Server/ProtocolServerAdapter.h @@ -2,14 +2,14 @@ #include +#include #include #include -namespace Poco::Net { class TCPServer; } - namespace DB { class GRPCServer; +class TCPServer; /// Provides an unified interface to access a protocol implementing server /// no matter what type it has (HTTPServer, TCPServer, MySQLServer, GRPCServer, ...). @@ -19,10 +19,10 @@ class ProtocolServerAdapter public: ProtocolServerAdapter(ProtocolServerAdapter && src) = default; ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default; - ProtocolServerAdapter(const char * port_name_, std::unique_ptr tcp_server_); + ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr tcp_server_); #if USE_GRPC - ProtocolServerAdapter(const char * port_name_, std::unique_ptr grpc_server_); + ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr grpc_server_); #endif /// Starts the server. A new thread will be created that waits for and accepts incoming connections. @@ -31,14 +31,23 @@ public: /// Stops the server. No new connections will be accepted. void stop() { impl->stop(); } + bool isStopping() const { return impl->isStopping(); } + /// Returns the number of currently handled connections. size_t currentConnections() const { return impl->currentConnections(); } /// Returns the number of current threads. size_t currentThreads() const { return impl->currentThreads(); } + /// Returns the port this server is listening to. + UInt16 portNumber() const { return impl->portNumber(); } + + const std::string & getListenHost() const { return listen_host; } + const std::string & getPortName() const { return port_name; } + const std::string & getDescription() const { return description; } + private: class Impl { @@ -46,13 +55,17 @@ private: virtual ~Impl() {} virtual void start() = 0; virtual void stop() = 0; + virtual bool isStopping() const = 0; + virtual UInt16 portNumber() const = 0; virtual size_t currentConnections() const = 0; virtual size_t currentThreads() const = 0; }; class TCPServerAdapterImpl; class GRPCServerAdapterImpl; + std::string listen_host; std::string port_name; + std::string description; std::unique_ptr impl; }; diff --git a/src/Server/TCPHandler.cpp b/src/Server/TCPHandler.cpp index cdf1838c06b..c2dcd5d7222 100644 --- a/src/Server/TCPHandler.cpp +++ b/src/Server/TCPHandler.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -81,9 +82,10 @@ namespace ErrorCodes extern const int UNKNOWN_PROTOCOL; } -TCPHandler::TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_) +TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_) : Poco::Net::TCPServerConnection(socket_) , server(server_) + , tcp_server(tcp_server_) , parse_proxy_protocol(parse_proxy_protocol_) , log(&Poco::Logger::get("TCPHandler")) , server_display_name(std::move(server_display_name_)) @@ -172,13 +174,13 @@ void TCPHandler::runImpl() throw; } - while (true) + while (tcp_server.isOpen()) { /// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down. { Stopwatch idle_time; UInt64 timeout_ms = std::min(poll_interval, idle_connection_timeout) * 1000000; - while (!server.isCancelled() && !static_cast(*in).poll(timeout_ms)) + while (tcp_server.isOpen() && !server.isCancelled() && !static_cast(*in).poll(timeout_ms)) { if (idle_time.elapsedSeconds() > idle_connection_timeout) { @@ -189,7 +191,7 @@ void TCPHandler::runImpl() } /// If we need to shut down, or client disconnects. - if (server.isCancelled() || in->eof()) + if (!tcp_server.isOpen() || server.isCancelled() || in->eof()) break; Stopwatch watch; diff --git a/src/Server/TCPHandler.h b/src/Server/TCPHandler.h index 4a340e328ed..791222dd0dc 100644 --- a/src/Server/TCPHandler.h +++ b/src/Server/TCPHandler.h @@ -35,6 +35,7 @@ class Session; struct Settings; class ColumnsDescription; struct ProfileInfo; +class TCPServer; /// State of query processing. struct QueryState @@ -127,7 +128,7 @@ public: * because it allows to check the IP ranges of the trusted proxy. * Proxy-forwarded (original client) IP address is used for quota accounting if quota is keyed by forwarded IP. */ - TCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_); + TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_); ~TCPHandler() override; void run() override; @@ -137,6 +138,7 @@ public: private: IServer & server; + TCPServer & tcp_server; bool parse_proxy_protocol = false; Poco::Logger * log; diff --git a/src/Server/TCPHandlerFactory.h b/src/Server/TCPHandlerFactory.h index e610bea330c..03b2592198d 100644 --- a/src/Server/TCPHandlerFactory.h +++ b/src/Server/TCPHandlerFactory.h @@ -1,17 +1,17 @@ #pragma once -#include #include #include #include #include +#include namespace Poco { class Logger; } namespace DB { -class TCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory +class TCPHandlerFactory : public TCPServerConnectionFactory { private: IServer & server; @@ -38,13 +38,13 @@ public: server_display_name = server.config().getString("display_name", getFQDNOrHostName()); } - Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override { try { LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString()); - return new TCPHandler(server, socket, parse_proxy_protocol, server_display_name); + return new TCPHandler(server, tcp_server, socket, parse_proxy_protocol, server_display_name); } catch (const Poco::Net::NetException &) { diff --git a/src/Server/TCPServer.cpp b/src/Server/TCPServer.cpp new file mode 100644 index 00000000000..380c4ef9924 --- /dev/null +++ b/src/Server/TCPServer.cpp @@ -0,0 +1,36 @@ +#include +#include + +namespace DB +{ + +class TCPServerConnectionFactoryImpl : public Poco::Net::TCPServerConnectionFactory +{ +public: + TCPServerConnectionFactoryImpl(TCPServer & tcp_server_, DB::TCPServerConnectionFactory::Ptr factory_) + : tcp_server(tcp_server_) + , factory(factory_) + {} + + Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket) override + { + return factory->createConnection(socket, tcp_server); + } +private: + TCPServer & tcp_server; + DB::TCPServerConnectionFactory::Ptr factory; +}; + +TCPServer::TCPServer( + TCPServerConnectionFactory::Ptr factory_, + Poco::ThreadPool & thread_pool, + Poco::Net::ServerSocket & socket_, + Poco::Net::TCPServerParams::Ptr params) + : Poco::Net::TCPServer(new TCPServerConnectionFactoryImpl(*this, factory_), thread_pool, socket_, params) + , factory(factory_) + , socket(socket_) + , is_open(true) + , port_number(socket.address().port()) +{} + +} diff --git a/src/Server/TCPServer.h b/src/Server/TCPServer.h new file mode 100644 index 00000000000..219fed5342b --- /dev/null +++ b/src/Server/TCPServer.h @@ -0,0 +1,47 @@ +#pragma once + +#include + +#include +#include + + +namespace DB +{ +class Context; + +class TCPServer : public Poco::Net::TCPServer +{ +public: + explicit TCPServer( + TCPServerConnectionFactory::Ptr factory, + Poco::ThreadPool & thread_pool, + Poco::Net::ServerSocket & socket, + Poco::Net::TCPServerParams::Ptr params = new Poco::Net::TCPServerParams); + + /// Close the socket and ask existing connections to stop serving queries + void stop() + { + Poco::Net::TCPServer::stop(); + // This notifies already established connections that they should stop serving + // queries and close their socket as soon as they can. + is_open = false; + // Poco's stop() stops listening on the socket but leaves it open. + // To be able to hand over control of the listening port to a new server, and + // to get fast connection refusal instead of timeouts, we also need to close + // the listening socket. + socket.close(); + } + + bool isOpen() const { return is_open; } + + UInt16 portNumber() const { return port_number; } + +private: + TCPServerConnectionFactory::Ptr factory; + Poco::Net::ServerSocket socket; + std::atomic is_open; + UInt16 port_number; +}; + +} diff --git a/src/Server/TCPServerConnectionFactory.h b/src/Server/TCPServerConnectionFactory.h new file mode 100644 index 00000000000..613f98352bd --- /dev/null +++ b/src/Server/TCPServerConnectionFactory.h @@ -0,0 +1,27 @@ +#pragma once + +#include + +namespace Poco +{ +namespace Net +{ + class StreamSocket; + class TCPServerConnection; +} +} +namespace DB +{ +class TCPServer; + +class TCPServerConnectionFactory +{ +public: + using Ptr = Poco::SharedPtr; + + virtual ~TCPServerConnectionFactory() = default; + + /// Same as Poco::Net::TCPServerConnectionFactory except we can pass the TCPServer + virtual Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) = 0; +}; +} diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index ff945068732..d113c825205 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -2045,7 +2045,8 @@ class ClickHouseInstance: user=user, password=password, database=database) # Connects to the instance via HTTP interface, sends a query and returns the answer - def http_query(self, sql, data=None, params=None, user=None, password=None, expect_fail_and_get_error=False): + def http_query(self, sql, data=None, params=None, user=None, password=None, expect_fail_and_get_error=False, + port=8123, timeout=None, retry_strategy=None): logging.debug(f"Executing query {sql} on {self.name} via HTTP interface") if params is None: params = {} @@ -2059,12 +2060,19 @@ class ClickHouseInstance: auth = requests.auth.HTTPBasicAuth(user, password) elif user: auth = requests.auth.HTTPBasicAuth(user, '') - url = "http://" + self.ip_address + ":8123/?" + urllib.parse.urlencode(params) + url = f"http://{self.ip_address}:{port}/?" + urllib.parse.urlencode(params) - if data: - r = requests.post(url, data, auth=auth) + if retry_strategy is None: + requester = requests else: - r = requests.get(url, auth=auth) + adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy) + requester = requests.Session() + requester.mount("https://", adapter) + requester.mount("http://", adapter) + if data: + r = requester.post(url, data, auth=auth, timeout=timeout) + else: + r = requester.get(url, auth=auth, timeout=timeout) def http_code_and_message(): code = r.status_code diff --git a/tests/integration/test_server_reload/.gitignore b/tests/integration/test_server_reload/.gitignore new file mode 100644 index 00000000000..edf565ec632 --- /dev/null +++ b/tests/integration/test_server_reload/.gitignore @@ -0,0 +1 @@ +_gen diff --git a/tests/integration/test_server_reload/__init__.py b/tests/integration/test_server_reload/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_server_reload/configs/default_passwd.xml b/tests/integration/test_server_reload/configs/default_passwd.xml new file mode 100644 index 00000000000..5c23be0dcb0 --- /dev/null +++ b/tests/integration/test_server_reload/configs/default_passwd.xml @@ -0,0 +1,13 @@ + + + + + + + + + + 123 + + + diff --git a/tests/integration/test_server_reload/configs/dhparam.pem b/tests/integration/test_server_reload/configs/dhparam.pem new file mode 100644 index 00000000000..fb935b9c898 --- /dev/null +++ b/tests/integration/test_server_reload/configs/dhparam.pem @@ -0,0 +1,8 @@ +-----BEGIN DH PARAMETERS----- +MIIBCAKCAQEAkPGhfLY5nppeQkFBKYRpiisxzrRQfyyTUu6aabZP2CbAMAuoYzaC +Z+iqeWSQZKRYeA21SZXkC9xE1e5FJsc5IWzCRiMNZeLuj4ApUNysMu89DpX8/b91 ++Ka6wRJnaO43ZqHj/9FpU4JiYtxoIpXDC9HeiSAnwLwJc3L+nkYfnSGgvzWIxhGV +gCoVmVBoTe7wrqCyVlM5nrNZSjhlSugvXmu2bSK3MwYF08QLKvlF68eedbs0PMWh +WC0bFM/X7gMBEqL4DiINufAShbZPKxD6eL2APiHPUo6xun3ed/Po/5j8QBmiku0c +5Jb12ZhOTRTQjaRg2aFF8LPdW2tDE7HmewIBAg== +-----END DH PARAMETERS----- diff --git a/tests/integration/test_server_reload/configs/ports_from_zk.xml b/tests/integration/test_server_reload/configs/ports_from_zk.xml new file mode 100644 index 00000000000..ae3435a3d3c --- /dev/null +++ b/tests/integration/test_server_reload/configs/ports_from_zk.xml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/tests/integration/test_server_reload/configs/server.crt b/tests/integration/test_server_reload/configs/server.crt new file mode 100644 index 00000000000..6f4deca038f --- /dev/null +++ b/tests/integration/test_server_reload/configs/server.crt @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC+zCCAeOgAwIBAgIJAIhI9ozZJ+TWMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV +BAMMCWxvY2FsaG9zdDAeFw0xOTA0MjIwNDMyNTJaFw0yMDA0MjEwNDMyNTJaMBQx +EjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC +ggEBAK+wVUEdqF2uXvN0MJBgnAHyXi6JTi4p/F6igsrCjSNjJWzHH0vQmK8ujfcF +CkifW88i+W5eHctuEtQqNHK+t9x9YiZtXrj6m/XkOXs20mYgENSmbbbHbriTPnZB +zZrq6UqMlwIHNNAa+I3NMORQxVRaI0ybXnGVO5elr70xHpk03xL0JWKHpEqYp4db +2aBQgF6y3Ww4khxjIYqpUYXWXGFnVIRU7FKVEAM1xyKqvQzXjQ5sVM/wyHknveEF +3b/X4ggN+KNl5KOc0cWDh1/XaatJAPaUUPqZcq76tynLbP64Xm3dxHcj+gtRkO67 +ef6MSg6l63m3XQP6Qb+MIkd06OsCAwEAAaNQME4wHQYDVR0OBBYEFDmODTO8QLDN +ykR3x0LIOnjNhrKhMB8GA1UdIwQYMBaAFDmODTO8QLDNykR3x0LIOnjNhrKhMAwG +A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAAwaiJc7uqEpnH3aukbftDwX +m8GfEnj1HVdgg+9GGNq+9rvUYBF6gdPmjRCX9dO0cclLFx8jc2org0rTSq9WoOhX +E6qL4Eqrmc5SE3Y9jZM0h6GRD4oXK014FmtZ3T6ddZU3dQLj3BS2r1XrvmubTvGN +ZuTJNY8nx8Hh6H5XINmsEjUF9E5hog+PwCE03xt2adIdYL+gsbxASeNYyeUFpZv5 +zcXR3VoakBWnAaOVgCHq2qh96QAnL7ZKzFkGf/MdwV10KU3dmb+ICbQUUdf9Gc17 +aaDCIRws312F433FdXBkGs2UkB7ZZme9dfn6O1QbeTNvex2VLMqYx/CTkfFbOQA= +-----END CERTIFICATE----- diff --git a/tests/integration/test_server_reload/configs/server.key b/tests/integration/test_server_reload/configs/server.key new file mode 100644 index 00000000000..6eddb3295db --- /dev/null +++ b/tests/integration/test_server_reload/configs/server.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCvsFVBHahdrl7z +dDCQYJwB8l4uiU4uKfxeooLKwo0jYyVsxx9L0JivLo33BQpIn1vPIvluXh3LbhLU +KjRyvrfcfWImbV64+pv15Dl7NtJmIBDUpm22x264kz52Qc2a6ulKjJcCBzTQGviN +zTDkUMVUWiNMm15xlTuXpa+9MR6ZNN8S9CVih6RKmKeHW9mgUIBest1sOJIcYyGK +qVGF1lxhZ1SEVOxSlRADNcciqr0M140ObFTP8Mh5J73hBd2/1+IIDfijZeSjnNHF +g4df12mrSQD2lFD6mXKu+rcpy2z+uF5t3cR3I/oLUZDuu3n+jEoOpet5t10D+kG/ +jCJHdOjrAgMBAAECggEARF66zrxb6RkSmmt8+rKeA6PuQu3sHsr4C1vyyjUr97l9 +tvdGlpp20LWtSZQMjHZ3pARYTTsTHTeY3DgQcRcHNicVKx8k3ZepWeeW9vw+pL+V +zSt3RsoVrH6gsCSrfr4sS3aqzX9AbjwQvh48CJ3mLQ1m70kHV+xbZIh1+4pB/hyP +1wKyUE18ZkOptXvO/TtoHzLQCecpkXtWzmry1Eh2isvXA+NMrAtLibGsyM1mtm7i +5ozevzHabvvCDBEe+KgZdONgVhhhvm2eOd+/s4w3rw4ETud4fI/ZAJyWXhiIKFnA +VJbElWruSAoVBW7p2bsF5PbmVzvo8vXL+VylxYD+AQKBgQDhLoRKTVhNkn/QjKxq +sdOh+QZra0LzjVpAmkQzu7wZMSHEz9qePQciDQQrYKrmRF1vNcIRCVUTqWYheJ/1 +lKRrCGa0ab6k96zkWMqLHD5u+UeJV7r1dJIx08ME9kNJ+x/XtB8klRIji16NiQUS +qc6p8z0M2AnbJzsRfWZRH8FeYwKBgQDHu8dzdtVGI7MtxfPOE/bfajiopDg8BdTC +pdug2T8XofRHRq7Q+0vYjTAZFT/slib91Pk6VvvPdo9VBZiL4omv4dAq6mOOdX/c +U14mJe1X5GCrr8ExZ8BfNJ3t/6sV1fcxyJwAw7iBguqxA2JqdM/wFk10K8XqvzVn +CD6O9yGt2QKBgFX1BMi8N538809vs41S7l9hCQNOQZNo/O+2M5yv6ECRkbtoQKKw +1x03bMUGNJaLuELweXE5Z8GGo5bZTe5X3F+DKHlr+DtO1C+ieUaa9HY2MAmMdLCn +2/qrREGLo+oEs4YKmuzC/taUp/ZNPKOAMISNdluFyFVg51pozPrgrVbTAoGBAKkE +LBl3O67o0t0vH8sJdeVFG8EJhlS0koBMnfgVHqC++dm+5HwPyvTrNQJkyv1HaqNt +r6FArkG3ED9gRuBIyT6+lctbIPgSUip9mbQqcBfqOCvQxGksZMur2ODncz09HLtS +CUFUXjOqNzOnq4ZuZu/Bz7U4vXiSaXxQq6+LTUKxAoGAFZU/qrI06XxnrE9A1X0W +l7DSkpZaDcu11NrZ473yONih/xOZNh4SSBpX8a7F6Pmh9BdtGqphML8NFPvQKcfP +b9H2iid2tc292uyrUEb5uTMmv61zoTwtitqLzO0+tS6PT3fXobX+eyeEWKzPBljL +HFtxG5CCXpkdnWRmaJnhTzA= +-----END PRIVATE KEY----- diff --git a/tests/integration/test_server_reload/configs/ssl_conf.xml b/tests/integration/test_server_reload/configs/ssl_conf.xml new file mode 100644 index 00000000000..43b25032059 --- /dev/null +++ b/tests/integration/test_server_reload/configs/ssl_conf.xml @@ -0,0 +1,18 @@ + + + + + + + /etc/clickhouse-server/config.d/server.crt + /etc/clickhouse-server/config.d/server.key + + /etc/clickhouse-server/config.d/dhparam.pem + none + true + true + sslv2,sslv3 + true + + + diff --git a/tests/integration/test_server_reload/protos/clickhouse_grpc.proto b/tests/integration/test_server_reload/protos/clickhouse_grpc.proto new file mode 100644 index 00000000000..c6cafaf6e40 --- /dev/null +++ b/tests/integration/test_server_reload/protos/clickhouse_grpc.proto @@ -0,0 +1,174 @@ +/* This file describes gRPC protocol supported in ClickHouse. + * + * To use this protocol a client should send one or more messages of the QueryInfo type + * and then receive one or more messages of the Result type. + * According to that the service provides four methods for that: + * ExecuteQuery(QueryInfo) returns (Result) + * ExecuteQueryWithStreamInput(stream QueryInfo) returns (Result) + * ExecuteQueryWithStreamOutput(QueryInfo) returns (stream Result) + * ExecuteQueryWithStreamIO(stream QueryInfo) returns (stream Result) + * It's up to the client to choose which method to use. + * For example, ExecuteQueryWithStreamInput() allows the client to add data multiple times + * while executing a query, which is suitable for inserting many rows. + */ + +syntax = "proto3"; + +package clickhouse.grpc; + +message NameAndType { + string name = 1; + string type = 2; +} + +// Describes an external table - a table which will exists only while a query is executing. +message ExternalTable { + // Name of the table. If omitted, "_data" is used. + string name = 1; + + // Columns of the table. Types are required, names can be omitted. If the names are omitted, "_1", "_2", ... is used. + repeated NameAndType columns = 2; + + // Data to insert to the external table. + // If a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used, + // then data for insertion to the same external table can be split between multiple QueryInfos. + bytes data = 3; + + // Format of the data to insert to the external table. + string format = 4; + + // Settings for executing that insertion, applied after QueryInfo.settings. + map settings = 5; +} + +enum CompressionAlgorithm { + NO_COMPRESSION = 0; + DEFLATE = 1; + GZIP = 2; + STREAM_GZIP = 3; +} + +enum CompressionLevel { + COMPRESSION_NONE = 0; + COMPRESSION_LOW = 1; + COMPRESSION_MEDIUM = 2; + COMPRESSION_HIGH = 3; +} + +message Compression { + CompressionAlgorithm algorithm = 1; + CompressionLevel level = 2; +} + +// Information about a query which a client sends to a ClickHouse server. +// The first QueryInfo can set any of the following fields. Extra QueryInfos only add extra data. +// In extra QueryInfos only `input_data`, `external_tables`, `next_query_info` and `cancel` fields can be set. +message QueryInfo { + string query = 1; + string query_id = 2; + map settings = 3; + + // Default database. + string database = 4; + + // Input data, used both as data for INSERT query and as data for the input() function. + bytes input_data = 5; + + // Delimiter for input_data, inserted between input_data from adjacent QueryInfos. + bytes input_data_delimiter = 6; + + // Default output format. If not specified, 'TabSeparated' is used. + string output_format = 7; + + repeated ExternalTable external_tables = 8; + + string user_name = 9; + string password = 10; + string quota = 11; + + // Works exactly like sessions in the HTTP protocol. + string session_id = 12; + bool session_check = 13; + uint32 session_timeout = 14; + + // Set `cancel` to true to stop executing the query. + bool cancel = 15; + + // If true there will be at least one more QueryInfo in the input stream. + // `next_query_info` is allowed to be set only if a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used. + bool next_query_info = 16; + + /// Controls how a ClickHouse server will compress query execution results before sending back to the client. + /// If not set the compression settings from the configuration file will be used. + Compression result_compression = 17; +} + +enum LogsLevel { + LOG_NONE = 0; + LOG_FATAL = 1; + LOG_CRITICAL = 2; + LOG_ERROR = 3; + LOG_WARNING = 4; + LOG_NOTICE = 5; + LOG_INFORMATION = 6; + LOG_DEBUG = 7; + LOG_TRACE = 8; +} + +message LogEntry { + uint32 time = 1; + uint32 time_microseconds = 2; + uint64 thread_id = 3; + string query_id = 4; + LogsLevel level = 5; + string source = 6; + string text = 7; +} + +message Progress { + uint64 read_rows = 1; + uint64 read_bytes = 2; + uint64 total_rows_to_read = 3; + uint64 written_rows = 4; + uint64 written_bytes = 5; +} + +message Stats { + uint64 rows = 1; + uint64 blocks = 2; + uint64 allocated_bytes = 3; + bool applied_limit = 4; + uint64 rows_before_limit = 5; +} + +message Exception { + int32 code = 1; + string name = 2; + string display_text = 3; + string stack_trace = 4; +} + +// Result of execution of a query which is sent back by the ClickHouse server to the client. +message Result { + // Output of the query, represented in the `output_format` or in a format specified in `query`. + bytes output = 1; + bytes totals = 2; + bytes extremes = 3; + + repeated LogEntry logs = 4; + Progress progress = 5; + Stats stats = 6; + + // Set by the ClickHouse server if there was an exception thrown while executing. + Exception exception = 7; + + // Set by the ClickHouse server if executing was cancelled by the `cancel` field in QueryInfo. + bool cancelled = 8; +} + +service ClickHouse { + rpc ExecuteQuery(QueryInfo) returns (Result) {} + rpc ExecuteQueryWithStreamInput(stream QueryInfo) returns (Result) {} + rpc ExecuteQueryWithStreamOutput(QueryInfo) returns (stream Result) {} + rpc ExecuteQueryWithStreamIO(stream QueryInfo) returns (stream Result) {} +} diff --git a/tests/integration/test_server_reload/test.py b/tests/integration/test_server_reload/test.py new file mode 100644 index 00000000000..3c22b476f64 --- /dev/null +++ b/tests/integration/test_server_reload/test.py @@ -0,0 +1,284 @@ +import contextlib +import grpc +import psycopg2 +import pymysql.connections +import pymysql.err +import pytest +import sys +import time +from helpers.cluster import ClickHouseCluster, run_and_check +from helpers.client import Client, QueryRuntimeException +from kazoo.exceptions import NodeExistsError +from pathlib import Path +from requests.exceptions import ConnectionError +from urllib3.util.retry import Retry + +cluster = ClickHouseCluster(__file__) +instance = cluster.add_instance( + "instance", + main_configs=[ + "configs/ports_from_zk.xml", "configs/ssl_conf.xml", "configs/dhparam.pem", "configs/server.crt", "configs/server.key" + ], + user_configs=["configs/default_passwd.xml"], + with_zookeeper=True) + + +LOADS_QUERY = "SELECT value FROM system.events WHERE event = 'MainConfigLoads'" + + +# Use grpcio-tools to generate *pb2.py files from *.proto. + +proto_dir = Path(__file__).parent / "protos" +gen_dir = Path(__file__).parent / "_gen" +gen_dir.mkdir(exist_ok=True) +run_and_check( + f"python3 -m grpc_tools.protoc -I{proto_dir!s} --python_out={gen_dir!s} --grpc_python_out={gen_dir!s} \ + {proto_dir!s}/clickhouse_grpc.proto", shell=True) + +sys.path.append(str(gen_dir)) +import clickhouse_grpc_pb2 +import clickhouse_grpc_pb2_grpc + + +@pytest.fixture(name="cluster", scope="module") +def fixture_cluster(): + try: + cluster.add_zookeeper_startup_command(configure_ports_from_zk) + cluster.start() + yield cluster + finally: + cluster.shutdown() + + +@pytest.fixture(name="zk", scope="module") +def fixture_zk(cluster): + return cluster.get_kazoo_client("zoo1") + + +def get_client(cluster, port): + return Client(host=cluster.get_instance_ip("instance"), port=port, command=cluster.client_bin_path) + + +def get_mysql_client(cluster, port): + start_time = time.monotonic() + while True: + try: + return pymysql.connections.Connection( + host=cluster.get_instance_ip("instance"), user="default", password="", database="default", port=port) + except pymysql.err.OperationalError: + if time.monotonic() - start_time > 10: + raise + time.sleep(0.1) + + +def get_pgsql_client(cluster, port): + start_time = time.monotonic() + while True: + try: + return psycopg2.connect( + host=cluster.get_instance_ip("instance"), user="postgresql", password="123", database="default", port=port) + except psycopg2.OperationalError: + if time.monotonic() - start_time > 10: + raise + time.sleep(0.1) + + +def get_grpc_channel(cluster, port): + host_port = cluster.get_instance_ip("instance") + f":{port}" + channel = grpc.insecure_channel(host_port) + grpc.channel_ready_future(channel).result(timeout=10) + return channel + + +def grpc_query(channel, query_text): + query_info = clickhouse_grpc_pb2.QueryInfo(query=query_text) + stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel) + result = stub.ExecuteQuery(query_info) + if result and result.HasField("exception"): + raise Exception(result.exception.display_text) + return result.output.decode() + + +def configure_ports_from_zk(zk, querier=None): + default_config = [ + ("/clickhouse/listen_hosts", b"0.0.0.0"), + ("/clickhouse/ports/tcp", b"9000"), + ("/clickhouse/ports/http", b"8123"), + ("/clickhouse/ports/mysql", b"9004"), + ("/clickhouse/ports/postgresql", b"9005"), + ("/clickhouse/ports/grpc", b"9100"), + ] + for path, value in default_config: + if querier is not None: + loads_before = querier(LOADS_QUERY) + has_changed = False + try: + zk.create(path=path, value=value, makepath=True) + has_changed = True + except NodeExistsError: + if zk.get(path) != value: + zk.set(path=path, value=value) + has_changed = True + if has_changed and querier is not None: + wait_loaded_config_changed(loads_before, querier) + + +@contextlib.contextmanager +def sync_loaded_config(querier): + # Depending on whether we test a change on tcp or http + # we monitor canges using the other, untouched, protocol + loads_before = querier(LOADS_QUERY) + yield + wait_loaded_config_changed(loads_before, querier) + + +def wait_loaded_config_changed(loads_before, querier): + loads_after = None + start_time = time.monotonic() + while time.monotonic() - start_time < 10: + try: + loads_after = querier(LOADS_QUERY) + if loads_after != loads_before: + return + except (QueryRuntimeException, ConnectionError): + pass + time.sleep(0.1) + assert loads_after is not None and loads_after != loads_before + + +@contextlib.contextmanager +def default_client(cluster, zk, restore_via_http=False): + client = get_client(cluster, port=9000) + try: + yield client + finally: + querier = instance.http_query if restore_via_http else client.query + configure_ports_from_zk(zk, querier) + + +def test_change_tcp_port(cluster, zk): + with default_client(cluster, zk, restore_via_http=True) as client: + assert client.query("SELECT 1") == "1\n" + with sync_loaded_config(instance.http_query): + zk.set("/clickhouse/ports/tcp", b"9090") + with pytest.raises(QueryRuntimeException, match="Connection refused"): + client.query("SELECT 1") + client_on_new_port = get_client(cluster, port=9090) + assert client_on_new_port.query("SELECT 1") == "1\n" + + +def test_change_http_port(cluster, zk): + with default_client(cluster, zk) as client: + retry_strategy = Retry(total=10, backoff_factor=0.1) + assert instance.http_query("SELECT 1", retry_strategy=retry_strategy) == "1\n" + with sync_loaded_config(client.query): + zk.set("/clickhouse/ports/http", b"9090") + with pytest.raises(ConnectionError, match="Connection refused"): + instance.http_query("SELECT 1") + instance.http_query("SELECT 1", port=9090) == "1\n" + + +def test_change_mysql_port(cluster, zk): + with default_client(cluster, zk) as client: + mysql_client = get_mysql_client(cluster, port=9004) + assert mysql_client.query("SELECT 1") == 1 + with sync_loaded_config(client.query): + zk.set("/clickhouse/ports/mysql", b"9090") + with pytest.raises(pymysql.err.OperationalError, match="Lost connection"): + mysql_client.query("SELECT 1") + mysql_client_on_new_port = get_mysql_client(cluster, port=9090) + assert mysql_client_on_new_port.query("SELECT 1") == 1 + + +def test_change_postgresql_port(cluster, zk): + with default_client(cluster, zk) as client: + pgsql_client = get_pgsql_client(cluster, port=9005) + cursor = pgsql_client.cursor() + cursor.execute("SELECT 1") + assert cursor.fetchall() == [(1,)] + with sync_loaded_config(client.query): + zk.set("/clickhouse/ports/postgresql", b"9090") + with pytest.raises(psycopg2.OperationalError, match="closed"): + cursor.execute("SELECT 1") + pgsql_client_on_new_port = get_pgsql_client(cluster, port=9090) + cursor = pgsql_client_on_new_port.cursor() + cursor.execute("SELECT 1") + cursor.fetchall() == [(1,)] + + +def test_change_grpc_port(cluster, zk): + with default_client(cluster, zk) as client: + grpc_channel = get_grpc_channel(cluster, port=9100) + assert grpc_query(grpc_channel, "SELECT 1") == "1\n" + with sync_loaded_config(client.query): + zk.set("/clickhouse/ports/grpc", b"9090") + with pytest.raises(grpc._channel._InactiveRpcError, match="StatusCode.UNAVAILABLE"): + grpc_query(grpc_channel, "SELECT 1") + grpc_channel_on_new_port = get_grpc_channel(cluster, port=9090) + assert grpc_query(grpc_channel_on_new_port, "SELECT 1") == "1\n" + + +def test_remove_tcp_port(cluster, zk): + with default_client(cluster, zk, restore_via_http=True) as client: + assert client.query("SELECT 1") == "1\n" + with sync_loaded_config(instance.http_query): + zk.delete("/clickhouse/ports/tcp") + with pytest.raises(QueryRuntimeException, match="Connection refused"): + client.query("SELECT 1") + + +def test_remove_http_port(cluster, zk): + with default_client(cluster, zk) as client: + assert instance.http_query("SELECT 1") == "1\n" + with sync_loaded_config(client.query): + zk.delete("/clickhouse/ports/http") + with pytest.raises(ConnectionError, match="Connection refused"): + instance.http_query("SELECT 1") + + +def test_remove_mysql_port(cluster, zk): + with default_client(cluster, zk) as client: + mysql_client = get_mysql_client(cluster, port=9004) + assert mysql_client.query("SELECT 1") == 1 + with sync_loaded_config(client.query): + zk.delete("/clickhouse/ports/mysql") + with pytest.raises(pymysql.err.OperationalError, match="Lost connection"): + mysql_client.query("SELECT 1") + + +def test_remove_postgresql_port(cluster, zk): + with default_client(cluster, zk) as client: + pgsql_client = get_pgsql_client(cluster, port=9005) + cursor = pgsql_client.cursor() + cursor.execute("SELECT 1") + assert cursor.fetchall() == [(1,)] + with sync_loaded_config(client.query): + zk.delete("/clickhouse/ports/postgresql") + with pytest.raises(psycopg2.OperationalError, match="closed"): + cursor.execute("SELECT 1") + + +def test_remove_grpc_port(cluster, zk): + with default_client(cluster, zk) as client: + grpc_channel = get_grpc_channel(cluster, port=9100) + assert grpc_query(grpc_channel, "SELECT 1") == "1\n" + with sync_loaded_config(client.query): + zk.delete("/clickhouse/ports/grpc") + with pytest.raises(grpc._channel._InactiveRpcError, match="StatusCode.UNAVAILABLE"): + grpc_query(grpc_channel, "SELECT 1") + + +def test_change_listen_host(cluster, zk): + localhost_client = Client(host="127.0.0.1", port=9000, command="/usr/bin/clickhouse") + localhost_client.command = ["docker", "exec", "-i", instance.docker_id] + localhost_client.command + try: + client = get_client(cluster, port=9000) + with sync_loaded_config(localhost_client.query): + zk.set("/clickhouse/listen_hosts", b"127.0.0.1") + with pytest.raises(QueryRuntimeException, match="Connection refused"): + client.query("SELECT 1") + assert localhost_client.query("SELECT 1") == "1\n" + finally: + with sync_loaded_config(localhost_client.query): + configure_ports_from_zk(zk) +