From cd3a73f9d38dddf1b82d5e7654f1c7dc9461be15 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Tue, 22 Dec 2020 00:47:10 +0300 Subject: [PATCH 1/4] done --- programs/server/Server.cpp | 44 ++++++++++++------------ src/Interpreters/AsynchronousMetrics.cpp | 20 +++++++---- src/Interpreters/AsynchronousMetrics.h | 16 ++++++--- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index e246be6c343..5681ca323c0 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -770,7 +770,7 @@ int Server::main(const std::vector & /*args*/) http_params->setTimeout(settings.http_receive_timeout); http_params->setKeepAliveTimeout(keep_alive_timeout); - std::vector servers_to_start_before_tables; + std::shared_ptr> servers_to_start_before_tables; std::vector listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host"); @@ -792,7 +792,7 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); - servers_to_start_before_tables.emplace_back( + servers_to_start_before_tables->emplace_back( port_name, std::make_unique( new TestKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); @@ -801,7 +801,7 @@ int Server::main(const std::vector & /*args*/) }); } - for (auto & server : servers_to_start_before_tables) + for (auto & server : *servers_to_start_before_tables) server.start(); SCOPE_EXIT({ @@ -816,11 +816,11 @@ int Server::main(const std::vector & /*args*/) LOG_DEBUG(log, "Shut down storages."); - if (!servers_to_start_before_tables.empty()) + if (!servers_to_start_before_tables->empty()) { LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish."); int current_connections = 0; - for (auto & server : servers_to_start_before_tables) + for (auto & server : *servers_to_start_before_tables) { server.stop(); current_connections += server.currentConnections(); @@ -832,7 +832,7 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "Closed all listening sockets."); if (current_connections > 0) - current_connections = waitServersToFinish(servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5)); + current_connections = waitServersToFinish(*servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5)); if (current_connections) LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections); @@ -978,7 +978,7 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled."); #endif - std::vector servers; + std::shared_ptr> servers; { /// This object will periodically calculate some metrics. AsynchronousMetrics async_metrics( @@ -996,7 +996,7 @@ int Server::main(const std::vector & /*args*/) socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for http://{}", address.toString()); @@ -1011,7 +1011,7 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for https://{}", address.toString()); @@ -1030,7 +1030,7 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false), server_pool, socket, @@ -1047,7 +1047,7 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true), server_pool, socket, @@ -1065,7 +1065,7 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false), server_pool, socket, @@ -1086,7 +1086,7 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString()); @@ -1100,7 +1100,7 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString()); @@ -1118,7 +1118,7 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(Poco::Timespan()); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( new MySQLHandlerFactory(*this), server_pool, socket, @@ -1134,7 +1134,7 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(Poco::Timespan()); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( new PostgreSQLHandlerFactory(*this), server_pool, socket, @@ -1148,7 +1148,7 @@ int Server::main(const std::vector & /*args*/) createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::SocketAddress server_address(listen_host, port); - servers.emplace_back(port_name, std::make_unique(*this, makeSocketAddress(listen_host, port, log))); + servers->emplace_back(port_name, std::make_unique(*this, makeSocketAddress(listen_host, port, log))); LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString()); }); #endif @@ -1161,14 +1161,14 @@ int Server::main(const std::vector & /*args*/) auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(port_name, std::make_unique( + servers->emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString()); }); } - if (servers.empty()) + if (servers->empty()) throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", ErrorCodes::NO_ELEMENTS_IN_CONFIG); @@ -1176,7 +1176,7 @@ int Server::main(const std::vector & /*args*/) async_metrics.start(); global_context->enableNamedSessions(); - for (auto & server : servers) + for (auto & server : *servers) server.start(); { @@ -1208,7 +1208,7 @@ int Server::main(const std::vector & /*args*/) is_cancelled = true; int current_connections = 0; - for (auto & server : servers) + for (auto & server : *servers) { server.stop(); current_connections += server.currentConnections(); @@ -1223,7 +1223,7 @@ int Server::main(const std::vector & /*args*/) global_context->getProcessList().killAllQueries(); if (current_connections) - current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5)); + current_connections = waitServersToFinish(*servers, config().getInt("shutdown_wait_unfinished", 5)); if (current_connections) LOG_INFO(log, "Closed connections. But {} remain." diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index 40a3aa520fa..223bc907e30 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -336,16 +336,22 @@ void AsynchronousMetrics::update() return it->second; }; - for (const auto & server : servers_to_start_before_tables) + if (servers_to_start_before_tables) { - if (const auto * name = get_metric_name(server.getPortName())) - new_values[name] = server.currentThreads(); + for (const auto & server : *servers_to_start_before_tables) + { + if (const auto * name = get_metric_name(server.getPortName())) + new_values[name] = server.currentThreads(); + } } - - for (const auto & server : servers) + + if (servers) { - if (const auto * name = get_metric_name(server.getPortName())) - new_values[name] = server.currentThreads(); + for (const auto & server : *servers) + { + if (const auto * name = get_metric_name(server.getPortName())) + new_values[name] = server.currentThreads(); + } } } diff --git a/src/Interpreters/AsynchronousMetrics.h b/src/Interpreters/AsynchronousMetrics.h index 610d8843537..a352d13812e 100644 --- a/src/Interpreters/AsynchronousMetrics.h +++ b/src/Interpreters/AsynchronousMetrics.h @@ -26,14 +26,22 @@ using AsynchronousMetricValues = std::unordered_map & servers_to_start_before_tables_, - const std::vector & servers_) + std::shared_ptr> servers_to_start_before_tables_, + std::shared_ptr> servers_) : global_context(global_context_) , update_period(update_period_seconds) , servers_to_start_before_tables(servers_to_start_before_tables_) @@ -55,8 +63,8 @@ public: private: Context & global_context; const std::chrono::seconds update_period; - const std::vector & servers_to_start_before_tables; - const std::vector & servers; + std::shared_ptr> servers_to_start_before_tables{nullptr}; + std::shared_ptr> servers{nullptr}; mutable std::mutex mutex; std::condition_variable wait_cond; From 3972c5cd82e14f1be1ea3af14f8736a83ce8cab2 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Tue, 22 Dec 2020 00:49:37 +0300 Subject: [PATCH 2/4] style --- src/Interpreters/AsynchronousMetrics.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index 223bc907e30..a4044a8eea5 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -344,7 +344,7 @@ void AsynchronousMetrics::update() new_values[name] = server.currentThreads(); } } - + if (servers) { for (const auto & server : *servers) From 8501c7a831d94cd9a0c01a145976db30d22fba98 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Tue, 22 Dec 2020 02:03:08 +0300 Subject: [PATCH 3/4] better --- programs/server/Server.cpp | 4 ++-- src/Interpreters/AsynchronousMetrics.h | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 5681ca323c0..9d7e78ac50d 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -770,7 +770,7 @@ int Server::main(const std::vector & /*args*/) http_params->setTimeout(settings.http_receive_timeout); http_params->setKeepAliveTimeout(keep_alive_timeout); - std::shared_ptr> servers_to_start_before_tables; + auto servers_to_start_before_tables = std::make_shared>(); std::vector listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host"); @@ -978,7 +978,7 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled."); #endif - std::shared_ptr> servers; + auto servers = std::make_shared>(); { /// This object will periodically calculate some metrics. AsynchronousMetrics async_metrics( diff --git a/src/Interpreters/AsynchronousMetrics.h b/src/Interpreters/AsynchronousMetrics.h index a352d13812e..97824cd3987 100644 --- a/src/Interpreters/AsynchronousMetrics.h +++ b/src/Interpreters/AsynchronousMetrics.h @@ -26,7 +26,8 @@ using AsynchronousMetricValues = std::unordered_map Date: Tue, 22 Dec 2020 09:24:14 +0300 Subject: [PATCH 4/4] Update AsynchronousMetrics.h --- src/Interpreters/AsynchronousMetrics.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/AsynchronousMetrics.h b/src/Interpreters/AsynchronousMetrics.h index 97824cd3987..88c2221be76 100644 --- a/src/Interpreters/AsynchronousMetrics.h +++ b/src/Interpreters/AsynchronousMetrics.h @@ -26,7 +26,8 @@ using AsynchronousMetricValues = std::unordered_map