Merge pull request #18331 from nikitamikhaylov/async-metrics-fix

Provide extra constructor for Async metrics
This commit is contained in:
Nikita Mikhaylov 2020-12-22 17:17:03 +03:00 committed by GitHub
commit 410d0a51b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 35 deletions

View File

@ -770,7 +770,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
http_params->setTimeout(settings.http_receive_timeout); http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout); http_params->setKeepAliveTimeout(keep_alive_timeout);
std::vector<ProtocolServerAdapter> servers_to_start_before_tables; auto servers_to_start_before_tables = std::make_shared<std::vector<ProtocolServerAdapter>>();
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host"); std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");
@ -792,7 +792,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port); auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout); socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout); socket.setSendTimeout(settings.send_timeout);
servers_to_start_before_tables.emplace_back( servers_to_start_before_tables->emplace_back(
port_name, port_name,
std::make_unique<Poco::Net::TCPServer>( std::make_unique<Poco::Net::TCPServer>(
new TestKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); new TestKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
@ -801,7 +801,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
}); });
} }
for (auto & server : servers_to_start_before_tables) for (auto & server : *servers_to_start_before_tables)
server.start(); server.start();
SCOPE_EXIT({ SCOPE_EXIT({
@ -816,11 +816,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_DEBUG(log, "Shut down storages."); LOG_DEBUG(log, "Shut down storages.");
if (!servers_to_start_before_tables.empty()) if (!servers_to_start_before_tables->empty())
{ {
LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish."); LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish.");
int current_connections = 0; int current_connections = 0;
for (auto & server : servers_to_start_before_tables) for (auto & server : *servers_to_start_before_tables)
{ {
server.stop(); server.stop();
current_connections += server.currentConnections(); current_connections += server.currentConnections();
@ -832,7 +832,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Closed all listening sockets."); LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0) if (current_connections > 0)
current_connections = waitServersToFinish(servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5)); current_connections = waitServersToFinish(*servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections) if (current_connections)
LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections); LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections);
@ -978,7 +978,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled."); LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled.");
#endif #endif
std::vector<ProtocolServerAdapter> servers; auto servers = std::make_shared<std::vector<ProtocolServerAdapter>>();
{ {
/// This object will periodically calculate some metrics. /// This object will periodically calculate some metrics.
AsynchronousMetrics async_metrics( AsynchronousMetrics async_metrics(
@ -996,7 +996,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params)); createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for http://{}", address.toString()); LOG_INFO(log, "Listening for http://{}", address.toString());
@ -1011,7 +1011,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params)); createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for https://{}", address.toString()); LOG_INFO(log, "Listening for https://{}", address.toString());
@ -1030,7 +1030,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port); auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout); socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout); socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false), new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false),
server_pool, server_pool,
socket, socket,
@ -1047,7 +1047,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port); auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout); socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout); socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true), new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true),
server_pool, server_pool,
socket, socket,
@ -1065,7 +1065,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.receive_timeout); socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout); socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false), new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false),
server_pool, server_pool,
socket, socket,
@ -1086,7 +1086,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port); auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params)); createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString()); LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString());
@ -1100,7 +1100,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params)); createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString()); LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString());
@ -1118,7 +1118,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan()); socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout); socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new MySQLHandlerFactory(*this), new MySQLHandlerFactory(*this),
server_pool, server_pool,
socket, socket,
@ -1134,7 +1134,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true); auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan()); socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout); socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new PostgreSQLHandlerFactory(*this), new PostgreSQLHandlerFactory(*this),
server_pool, server_pool,
socket, socket,
@ -1148,7 +1148,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
createServer(listen_host, port_name, listen_try, [&](UInt16 port) createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{ {
Poco::Net::SocketAddress server_address(listen_host, port); Poco::Net::SocketAddress server_address(listen_host, port);
servers.emplace_back(port_name, std::make_unique<GRPCServer>(*this, makeSocketAddress(listen_host, port, log))); servers->emplace_back(port_name, std::make_unique<GRPCServer>(*this, makeSocketAddress(listen_host, port, log)));
LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString()); LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString());
}); });
#endif #endif
@ -1161,14 +1161,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port); auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout); socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout); socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>( servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params)); createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString()); LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString());
}); });
} }
if (servers.empty()) if (servers->empty())
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)", throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
ErrorCodes::NO_ELEMENTS_IN_CONFIG); ErrorCodes::NO_ELEMENTS_IN_CONFIG);
@ -1176,7 +1176,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
async_metrics.start(); async_metrics.start();
global_context->enableNamedSessions(); global_context->enableNamedSessions();
for (auto & server : servers) for (auto & server : *servers)
server.start(); server.start();
{ {
@ -1208,7 +1208,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
is_cancelled = true; is_cancelled = true;
int current_connections = 0; int current_connections = 0;
for (auto & server : servers) for (auto & server : *servers)
{ {
server.stop(); server.stop();
current_connections += server.currentConnections(); current_connections += server.currentConnections();
@ -1223,7 +1223,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getProcessList().killAllQueries(); global_context->getProcessList().killAllQueries();
if (current_connections) if (current_connections)
current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5)); current_connections = waitServersToFinish(*servers, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections) if (current_connections)
LOG_INFO(log, "Closed connections. But {} remain." LOG_INFO(log, "Closed connections. But {} remain."

View File

@ -355,18 +355,24 @@ void AsynchronousMetrics::update()
return it->second; return it->second;
}; };
for (const auto & server : servers_to_start_before_tables) if (servers_to_start_before_tables)
{
for (const auto & server : *servers_to_start_before_tables)
{ {
if (const auto * name = get_metric_name(server.getPortName())) if (const auto * name = get_metric_name(server.getPortName()))
new_values[name] = server.currentThreads(); new_values[name] = server.currentThreads();
} }
}
for (const auto & server : servers) if (servers)
{
for (const auto & server : *servers)
{ {
if (const auto * name = get_metric_name(server.getPortName())) if (const auto * name = get_metric_name(server.getPortName()))
new_values[name] = server.currentThreads(); new_values[name] = server.currentThreads();
} }
} }
}
#if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 4 #if USE_JEMALLOC && JEMALLOC_VERSION_MAJOR >= 4
// 'epoch' is a special mallctl -- it updates the statistics. Without it, all // 'epoch' is a special mallctl -- it updates the statistics. Without it, all

View File

@ -26,14 +26,26 @@ using AsynchronousMetricValues = std::unordered_map<std::string, AsynchronousMet
class AsynchronousMetrics class AsynchronousMetrics
{ {
public: public:
// The default value of update_period_seconds is for ClickHouse-over-YT #if defined(ARCADIA_BUILD)
// in Arcadia -- it uses its own server implementation that also uses these /// This constructor needs only to provide backward compatibility with some other projects (hello, Arcadia).
// metrics. /// Never use this in the ClickHouse codebase.
AsynchronousMetrics(
Context & global_context_,
int update_period_seconds = 60)
: global_context(global_context_)
, update_period(update_period_seconds)
{
}
#endif
/// The default value of update_period_seconds is for ClickHouse-over-YT
/// in Arcadia -- it uses its own server implementation that also uses these
/// metrics.
AsynchronousMetrics( AsynchronousMetrics(
Context & global_context_, Context & global_context_,
int update_period_seconds, int update_period_seconds,
const std::vector<ProtocolServerAdapter> & servers_to_start_before_tables_, std::shared_ptr<std::vector<ProtocolServerAdapter>> servers_to_start_before_tables_,
const std::vector<ProtocolServerAdapter> & servers_) std::shared_ptr<std::vector<ProtocolServerAdapter>> servers_)
: global_context(global_context_) : global_context(global_context_)
, update_period(update_period_seconds) , update_period(update_period_seconds)
, servers_to_start_before_tables(servers_to_start_before_tables_) , servers_to_start_before_tables(servers_to_start_before_tables_)
@ -55,8 +67,8 @@ public:
private: private:
Context & global_context; Context & global_context;
const std::chrono::seconds update_period; const std::chrono::seconds update_period;
const std::vector<ProtocolServerAdapter> & servers_to_start_before_tables; std::shared_ptr<std::vector<ProtocolServerAdapter>> servers_to_start_before_tables{nullptr};
const std::vector<ProtocolServerAdapter> & servers; std::shared_ptr<std::vector<ProtocolServerAdapter>> servers{nullptr};
mutable std::mutex mutex; mutable std::mutex mutex;
std::condition_variable wait_cond; std::condition_variable wait_cond;