diff --git a/base/common/defines.h b/base/common/defines.h index e4c456796d3..6dc61155649 100644 --- a/base/common/defines.h +++ b/base/common/defines.h @@ -76,6 +76,8 @@ # define NO_SANITIZE_THREAD #endif -/// A macro for suppressing warnings about unused variables or function results. -/// Useful for structured bindings which have no standard way to declare this. -#define UNUSED(...) (void)(__VA_ARGS__) +/// A template function for suppressing warnings about unused variables or function results. +template +constexpr void UNUSED(Args &&... args [[maybe_unused]]) +{ +} diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 1eeec398f44..e246be6c343 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -785,17 +785,17 @@ int Server::main(const std::vector & /*args*/) for (const auto & listen_host : listen_hosts) { /// TCP TestKeeper - createServer(listen_host, "test_keeper_server.tcp_port", listen_try, [&](UInt16 port) + const char * port_name = "test_keeper_server.tcp_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); - servers_to_start_before_tables.emplace_back(std::make_unique( - new TestKeeperTCPHandlerFactory(*this), - server_pool, - socket, - new Poco::Net::TCPServerParams)); + servers_to_start_before_tables.emplace_back( + port_name, + std::make_unique( + new TestKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); LOG_INFO(log, "Listening for connections to fake zookeeper (tcp): {}", address.toString()); }); @@ -981,35 +981,37 @@ int Server::main(const std::vector & /*args*/) std::vector servers; { /// This object will periodically calculate some metrics. - AsynchronousMetrics async_metrics(*global_context, - config().getUInt("asynchronous_metrics_update_period_s", 60)); + AsynchronousMetrics async_metrics( + *global_context, config().getUInt("asynchronous_metrics_update_period_s", 60), servers_to_start_before_tables, servers); attachSystemTablesAsync(*DatabaseCatalog::instance().getSystemDatabase(), async_metrics); for (const auto & listen_host : listen_hosts) { /// HTTP - createServer(listen_host, "http_port", listen_try, [&](UInt16 port) + const char * port_name = "http_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for http://{}", address.toString()); }); /// HTTPS - createServer(listen_host, "https_port", listen_try, [&](UInt16 port) + port_name = "https_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { #if USE_SSL Poco::Net::SecureServerSocket socket; auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for https://{}", address.toString()); @@ -1021,13 +1023,14 @@ int Server::main(const std::vector & /*args*/) }); /// TCP - createServer(listen_host, "tcp_port", listen_try, [&](UInt16 port) + port_name = "tcp_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false), server_pool, socket, @@ -1037,13 +1040,14 @@ int Server::main(const std::vector & /*args*/) }); /// TCP with PROXY protocol, see https://github.com/wolfeidau/proxyv2/blob/master/docs/proxy-protocol.txt - createServer(listen_host, "tcp_with_proxy_port", listen_try, [&](UInt16 port) + port_name = "tcp_with_proxy_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true), server_pool, socket, @@ -1053,14 +1057,15 @@ int Server::main(const std::vector & /*args*/) }); /// TCP with SSL - createServer(listen_host, "tcp_port_secure", listen_try, [&](UInt16 port) + port_name = "tcp_port_secure"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { #if USE_SSL Poco::Net::SecureServerSocket socket; auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false), server_pool, socket, @@ -1074,26 +1079,28 @@ int Server::main(const std::vector & /*args*/) }); /// Interserver IO HTTP - createServer(listen_host, "interserver_http_port", listen_try, [&](UInt16 port) + port_name = "interserver_http_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString()); }); - createServer(listen_host, "interserver_https_port", listen_try, [&](UInt16 port) + port_name = "interserver_https_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { #if USE_SSL Poco::Net::SecureServerSocket socket; auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString()); @@ -1104,13 +1111,14 @@ int Server::main(const std::vector & /*args*/) #endif }); - createServer(listen_host, "mysql_port", listen_try, [&](UInt16 port) + port_name = "mysql_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(Poco::Timespan()); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( new MySQLHandlerFactory(*this), server_pool, socket, @@ -1119,13 +1127,14 @@ int Server::main(const std::vector & /*args*/) LOG_INFO(log, "Listening for MySQL compatibility protocol: {}", address.toString()); }); - createServer(listen_host, "postgresql_port", listen_try, [&](UInt16 port) + port_name = "postgresql_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); socket.setReceiveTimeout(Poco::Timespan()); socket.setSendTimeout(settings.send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( new PostgreSQLHandlerFactory(*this), server_pool, socket, @@ -1135,22 +1144,24 @@ int Server::main(const std::vector & /*args*/) }); #if USE_GRPC - createServer(listen_host, "grpc_port", listen_try, [&](UInt16 port) + port_name = "grpc_port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::SocketAddress server_address(listen_host, port); - servers.emplace_back(std::make_unique(*this, makeSocketAddress(listen_host, port, log))); + servers.emplace_back(port_name, std::make_unique(*this, makeSocketAddress(listen_host, port, log))); LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString()); }); #endif /// Prometheus (if defined and not setup yet with http_port) - createServer(listen_host, "prometheus.port", listen_try, [&](UInt16 port) + port_name = "prometheus.port"; + createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::ServerSocket socket; auto address = socketBindListen(socket, listen_host, port); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); - servers.emplace_back(std::make_unique( + servers.emplace_back(port_name, std::make_unique( createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params)); LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString()); @@ -1161,6 +1172,8 @@ int Server::main(const std::vector & /*args*/) throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", ErrorCodes::NO_ELEMENTS_IN_CONFIG); + /// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread. + async_metrics.start(); global_context->enableNamedSessions(); for (auto & server : servers) diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index 4e052349b6b..40a3aa520fa 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -43,7 +44,8 @@ AsynchronousMetrics::~AsynchronousMetrics() } wait_cond.notify_one(); - thread.join(); + if (thread) + thread->join(); } catch (...) { @@ -169,7 +171,7 @@ void AsynchronousMetrics::update() AsynchronousMetricValues new_values; { - if (auto mark_cache = context.getMarkCache()) + if (auto mark_cache = global_context.getMarkCache()) { new_values["MarkCacheBytes"] = mark_cache->weight(); new_values["MarkCacheFiles"] = mark_cache->count(); @@ -177,7 +179,7 @@ void AsynchronousMetrics::update() } { - if (auto uncompressed_cache = context.getUncompressedCache()) + if (auto uncompressed_cache = global_context.getUncompressedCache()) { new_values["UncompressedCacheBytes"] = uncompressed_cache->weight(); new_values["UncompressedCacheCells"] = uncompressed_cache->count(); @@ -186,12 +188,12 @@ void AsynchronousMetrics::update() #if USE_EMBEDDED_COMPILER { - if (auto compiled_expression_cache = context.getCompiledExpressionCache()) + if (auto compiled_expression_cache = global_context.getCompiledExpressionCache()) new_values["CompiledExpressionCacheCount"] = compiled_expression_cache->count(); } #endif - new_values["Uptime"] = context.getUptimeSeconds(); + new_values["Uptime"] = global_context.getUptimeSeconds(); /// Process memory usage according to OS #if defined(OS_LINUX) @@ -250,7 +252,7 @@ void AsynchronousMetrics::update() /// Check if database can contain MergeTree tables if (!db.second->canContainMergeTreeTables()) continue; - for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next()) + for (auto iterator = db.second->getTablesIterator(global_context); iterator->isValid(); iterator->next()) { ++total_number_of_tables; const auto & table = iterator->table(); @@ -312,6 +314,39 @@ void AsynchronousMetrics::update() new_values["NumberOfDatabases"] = number_of_databases; new_values["NumberOfTables"] = total_number_of_tables; + + auto get_metric_name = [](const String & name) -> const char * + { + static std::map metric_map = { + {"tcp_port", "TCPThreads"}, + {"tcp_port_secure", "TCPSecureThreads"}, + {"http_port", "HTTPThreads"}, + {"https_port", "HTTPSecureThreads"}, + {"interserver_http_port", "InterserverThreads"}, + {"interserver_https_port", "InterserverSecureThreads"}, + {"mysql_port", "MySQLThreads"}, + {"postgresql_port", "PostgreSQLThreads"}, + {"grpc_port", "GRPCThreads"}, + {"prometheus.port", "PrometheusThreads"} + }; + auto it = metric_map.find(name); + if (it == metric_map.end()) + return nullptr; + else + return it->second; + }; + + for (const auto & server : servers_to_start_before_tables) + { + if (const auto * name = get_metric_name(server.getPortName())) + new_values[name] = server.currentThreads(); + } + + for (const auto & server : servers) + { + if (const auto * name = get_metric_name(server.getPortName())) + new_values[name] = server.currentThreads(); + } } #if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 4 @@ -386,7 +421,7 @@ void AsynchronousMetrics::update() /// Add more metrics as you wish. // Log the new metrics. - if (auto log = context.getAsynchronousMetricLog()) + if (auto log = global_context.getAsynchronousMetricLog()) { log->addValues(new_values); } diff --git a/src/Interpreters/AsynchronousMetrics.h b/src/Interpreters/AsynchronousMetrics.h index 6ab32ff9ab6..610d8843537 100644 --- a/src/Interpreters/AsynchronousMetrics.h +++ b/src/Interpreters/AsynchronousMetrics.h @@ -13,9 +13,10 @@ namespace DB { class Context; +class ProtocolServerAdapter; -typedef double AsynchronousMetricValue; -typedef std::unordered_map AsynchronousMetricValues; +using AsynchronousMetricValue = double; +using AsynchronousMetricValues = std::unordered_map; /** Periodically (by default, each minute, starting at 30 seconds offset) @@ -28,22 +29,34 @@ public: // The default value of update_period_seconds is for ClickHouse-over-YT // in Arcadia -- it uses its own server implementation that also uses these // metrics. - AsynchronousMetrics(Context & context_, int update_period_seconds = 60) - : context(context_), - update_period(update_period_seconds), - thread([this] { run(); }) + AsynchronousMetrics( + Context & global_context_, + int update_period_seconds, + const std::vector & servers_to_start_before_tables_, + const std::vector & servers_) + : global_context(global_context_) + , update_period(update_period_seconds) + , servers_to_start_before_tables(servers_to_start_before_tables_) + , servers(servers_) { } ~AsynchronousMetrics(); + /// Separate method allows to initialize the `servers` variable beforehand. + void start() + { + thread = std::make_unique([this] { run(); }); + } /// Returns copy of all values. AsynchronousMetricValues getValues() const; private: - Context & context; + Context & global_context; const std::chrono::seconds update_period; + const std::vector & servers_to_start_before_tables; + const std::vector & servers; mutable std::mutex mutex; std::condition_variable wait_cond; @@ -54,7 +67,7 @@ private: MemoryStatisticsOS memory_stat; #endif - ThreadFromGlobalPool thread; + std::unique_ptr thread; void run(); void update(); diff --git a/src/Server/GRPCServer.h b/src/Server/GRPCServer.h index b6bc8c7bf7f..ef86b902b5a 100644 --- a/src/Server/GRPCServer.h +++ b/src/Server/GRPCServer.h @@ -35,6 +35,9 @@ public: /// Returns the number of currently handled connections. size_t currentConnections() const; + /// Returns the number of current threads. + size_t currentThreads() const { return currentConnections(); } + private: using GRPCService = clickhouse::grpc::ClickHouse::AsyncService; class Runner; diff --git a/src/Server/ProtocolServerAdapter.cpp b/src/Server/ProtocolServerAdapter.cpp index 7f57687f259..6ec1ec572f7 100644 --- a/src/Server/ProtocolServerAdapter.cpp +++ b/src/Server/ProtocolServerAdapter.cpp @@ -17,14 +17,15 @@ public: void start() override { tcp_server->start(); } void stop() override { tcp_server->stop(); } size_t currentConnections() const override { return tcp_server->currentConnections(); } + size_t currentThreads() const override { return tcp_server->currentThreads(); } private: std::unique_ptr tcp_server; }; -ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr tcp_server_) +ProtocolServerAdapter::ProtocolServerAdapter(const char * port_name_, std::unique_ptr tcp_server_) + : port_name(port_name_), impl(std::make_unique(std::move(tcp_server_))) { - impl = std::make_unique(std::move(tcp_server_)); } #if USE_GRPC @@ -37,14 +38,15 @@ public: void start() override { grpc_server->start(); } void stop() override { grpc_server->stop(); } size_t currentConnections() const override { return grpc_server->currentConnections(); } + size_t currentThreads() const override { return grpc_server->currentThreads(); } private: std::unique_ptr grpc_server; }; -ProtocolServerAdapter::ProtocolServerAdapter(std::unique_ptr grpc_server_) +ProtocolServerAdapter::ProtocolServerAdapter(const char * port_name_, std::unique_ptr grpc_server_) + : port_name(port_name_), impl(std::make_unique(std::move(grpc_server_))) { - impl = std::make_unique(std::move(grpc_server_)); } #endif } diff --git a/src/Server/ProtocolServerAdapter.h b/src/Server/ProtocolServerAdapter.h index c0f82dbfde0..2e3d67d081f 100644 --- a/src/Server/ProtocolServerAdapter.h +++ b/src/Server/ProtocolServerAdapter.h @@ -5,6 +5,7 @@ #endif #include +#include namespace Poco::Net { class TCPServer; } @@ -16,16 +17,14 @@ class GRPCServer; /// no matter what type it has (HTTPServer, TCPServer, MySQLServer, GRPCServer, ...). class ProtocolServerAdapter { + friend class ProtocolServers; public: - ProtocolServerAdapter() {} ProtocolServerAdapter(ProtocolServerAdapter && src) = default; ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default; - ~ProtocolServerAdapter() {} - - ProtocolServerAdapter(std::unique_ptr tcp_server_); + ProtocolServerAdapter(const char * port_name_, std::unique_ptr tcp_server_); #if USE_GRPC - ProtocolServerAdapter(std::unique_ptr grpc_server_); + ProtocolServerAdapter(const char * port_name_, std::unique_ptr grpc_server_); #endif /// Starts the server. A new thread will be created that waits for and accepts incoming connections. @@ -37,6 +36,11 @@ public: /// Returns the number of currently handled connections. size_t currentConnections() const { return impl->currentConnections(); } + /// Returns the number of current threads. + size_t currentThreads() const { return impl->currentThreads(); } + + const std::string & getPortName() const { return port_name; } + private: class Impl { @@ -45,10 +49,12 @@ private: virtual void start() = 0; virtual void stop() = 0; virtual size_t currentConnections() const = 0; + virtual size_t currentThreads() const = 0; }; class TCPServerAdapterImpl; class GRPCServerAdapterImpl; + std::string port_name; std::unique_ptr impl; };