This commit is contained in:
nikitamikhaylov 2020-12-22 00:47:10 +03:00
parent 7eea1cbe0f
commit cd3a73f9d3
3 changed files with 47 additions and 33 deletions

View File

@ -770,7 +770,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
std::vector<ProtocolServerAdapter> servers_to_start_before_tables;
std::shared_ptr<std::vector<ProtocolServerAdapter>> servers_to_start_before_tables;
std::vector<std::string> listen_hosts = DB::getMultipleValuesFromConfig(config(), "", "listen_host");
@ -792,7 +792,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers_to_start_before_tables.emplace_back(
servers_to_start_before_tables->emplace_back(
port_name,
std::make_unique<Poco::Net::TCPServer>(
new TestKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
@ -801,7 +801,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
});
}
for (auto & server : servers_to_start_before_tables)
for (auto & server : *servers_to_start_before_tables)
server.start();
SCOPE_EXIT({
@ -816,11 +816,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_DEBUG(log, "Shut down storages.");
if (!servers_to_start_before_tables.empty())
if (!servers_to_start_before_tables->empty())
{
LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish.");
int current_connections = 0;
for (auto & server : servers_to_start_before_tables)
for (auto & server : *servers_to_start_before_tables)
{
server.stop();
current_connections += server.currentConnections();
@ -832,7 +832,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(*servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections);
@ -978,7 +978,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled.");
#endif
std::vector<ProtocolServerAdapter> servers;
std::shared_ptr<std::vector<ProtocolServerAdapter>> servers;
{
/// This object will periodically calculate some metrics.
AsynchronousMetrics async_metrics(
@ -996,7 +996,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for http://{}", address.toString());
@ -1011,7 +1011,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "HTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for https://{}", address.toString());
@ -1030,7 +1030,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ false),
server_pool,
socket,
@ -1047,7 +1047,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure */ false, /* proxy protocol */ true),
server_pool,
socket,
@ -1065,7 +1065,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure */ true, /* proxy protocol */ false),
server_pool,
socket,
@ -1086,7 +1086,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for replica communication (interserver): http://{}", address.toString());
@ -1100,7 +1100,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "InterserverIOHTTPSHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for secure replica communication (interserver): https://{}", address.toString());
@ -1118,7 +1118,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new MySQLHandlerFactory(*this),
server_pool,
socket,
@ -1134,7 +1134,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::TCPServer>(
new PostgreSQLHandlerFactory(*this),
server_pool,
socket,
@ -1148,7 +1148,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::SocketAddress server_address(listen_host, port);
servers.emplace_back(port_name, std::make_unique<GRPCServer>(*this, makeSocketAddress(listen_host, port, log)));
servers->emplace_back(port_name, std::make_unique<GRPCServer>(*this, makeSocketAddress(listen_host, port, log)));
LOG_INFO(log, "Listening for gRPC protocol: " + server_address.toString());
});
#endif
@ -1161,14 +1161,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto address = socketBindListen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
servers->emplace_back(port_name, std::make_unique<Poco::Net::HTTPServer>(
createHandlerFactory(*this, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
LOG_INFO(log, "Listening for Prometheus: http://{}", address.toString());
});
}
if (servers.empty())
if (servers->empty())
throw Exception("No servers started (add valid listen_host and 'tcp_port' or 'http_port' to configuration file.)",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
@ -1176,7 +1176,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
async_metrics.start();
global_context->enableNamedSessions();
for (auto & server : servers)
for (auto & server : *servers)
server.start();
{
@ -1208,7 +1208,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
is_cancelled = true;
int current_connections = 0;
for (auto & server : servers)
for (auto & server : *servers)
{
server.stop();
current_connections += server.currentConnections();
@ -1223,7 +1223,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getProcessList().killAllQueries();
if (current_connections)
current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(*servers, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections. But {} remain."

View File

@ -336,16 +336,22 @@ void AsynchronousMetrics::update()
return it->second;
};
for (const auto & server : servers_to_start_before_tables)
if (servers_to_start_before_tables)
{
if (const auto * name = get_metric_name(server.getPortName()))
new_values[name] = server.currentThreads();
for (const auto & server : *servers_to_start_before_tables)
{
if (const auto * name = get_metric_name(server.getPortName()))
new_values[name] = server.currentThreads();
}
}
for (const auto & server : servers)
if (servers)
{
if (const auto * name = get_metric_name(server.getPortName()))
new_values[name] = server.currentThreads();
for (const auto & server : *servers)
{
if (const auto * name = get_metric_name(server.getPortName()))
new_values[name] = server.currentThreads();
}
}
}

View File

@ -26,14 +26,22 @@ using AsynchronousMetricValues = std::unordered_map<std::string, AsynchronousMet
class AsynchronousMetrics
{
public:
AsynchronousMetrics(
Context & global_context_,
int update_period_seconds = 60)
: global_context(global_context_)
, update_period(update_period_seconds)
{
}
// The default value of update_period_seconds is for ClickHouse-over-YT
// in Arcadia -- it uses its own server implementation that also uses these
// metrics.
AsynchronousMetrics(
Context & global_context_,
int update_period_seconds,
const std::vector<ProtocolServerAdapter> & servers_to_start_before_tables_,
const std::vector<ProtocolServerAdapter> & servers_)
std::shared_ptr<std::vector<ProtocolServerAdapter>> servers_to_start_before_tables_,
std::shared_ptr<std::vector<ProtocolServerAdapter>> servers_)
: global_context(global_context_)
, update_period(update_period_seconds)
, servers_to_start_before_tables(servers_to_start_before_tables_)
@ -55,8 +63,8 @@ public:
private:
Context & global_context;
const std::chrono::seconds update_period;
const std::vector<ProtocolServerAdapter> & servers_to_start_before_tables;
const std::vector<ProtocolServerAdapter> & servers;
std::shared_ptr<std::vector<ProtocolServerAdapter>> servers_to_start_before_tables{nullptr};
std::shared_ptr<std::vector<ProtocolServerAdapter>> servers{nullptr};
mutable std::mutex mutex;
std::condition_variable wait_cond;