Don't shutdown interserver before tables

This commit is contained in:
alesapin 2023-07-23 22:38:59 +02:00
parent 37054e7b47
commit e02948580b
2 changed files with 79 additions and 23 deletions

View File

@ -739,11 +739,12 @@ try
[&]() -> std::vector<ProtocolServerMetrics> [&]() -> std::vector<ProtocolServerMetrics>
{ {
std::vector<ProtocolServerMetrics> metrics; std::vector<ProtocolServerMetrics> metrics;
metrics.reserve(servers_to_start_before_tables.size());
std::lock_guard lock(servers_lock);
metrics.reserve(servers_to_start_before_tables.size() + servers.size());
for (const auto & server : servers_to_start_before_tables) for (const auto & server : servers_to_start_before_tables)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
std::lock_guard lock(servers_lock);
for (const auto & server : servers) for (const auto & server : servers)
metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()}); metrics.emplace_back(ProtocolServerMetrics{server.getPortName(), server.currentThreads()});
return metrics; return metrics;
@ -1302,7 +1303,7 @@ try
global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config); global_context->reloadAuxiliaryZooKeepersConfigIfChanged(config);
std::lock_guard lock(servers_lock); std::lock_guard lock(servers_lock);
updateServers(*config, server_pool, async_metrics, servers); updateServers(*config, server_pool, async_metrics, servers, servers_to_start_before_tables);
} }
global_context->updateStorageConfiguration(*config); global_context->updateStorageConfiguration(*config);
@ -1404,10 +1405,27 @@ try
} }
for (auto & server : servers_to_start_before_tables)
{ {
server.start(); std::lock_guard lock(servers_lock);
LOG_INFO(log, "Listening for {}", server.getDescription()); /// We should start interserver communications before (and more imporant shutdown after) tables.
/// Because server can wait for a long-running queries (for example in tcp_handler) after interserver handler was already shut down.
/// In this case we will have replicated tables which are unable to send any parts to other replicas, but still can
/// communicate with zookeeper, execute merges, etc.
createInterserverServers(
config(),
interserver_listen_hosts,
listen_try,
server_pool,
async_metrics,
servers_to_start_before_tables,
/* start_servers= */ false);
for (auto & server : servers_to_start_before_tables)
{
server.start();
LOG_INFO(log, "Listening for {}", server.getDescription());
}
} }
/// Initialize access storages. /// Initialize access storages.
@ -1527,10 +1545,13 @@ try
{ {
LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish."); LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish.");
size_t current_connections = 0; size_t current_connections = 0;
for (auto & server : servers_to_start_before_tables)
{ {
server.stop(); std::lock_guard lock(servers_lock);
current_connections += server.currentConnections(); for (auto & server : servers_to_start_before_tables)
{
server.stop();
current_connections += server.currentConnections();
}
} }
if (current_connections) if (current_connections)
@ -1709,7 +1730,7 @@ try
{ {
std::lock_guard lock(servers_lock); std::lock_guard lock(servers_lock);
createServers(config(), listen_hosts, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers); createServers(config(), listen_hosts, listen_try, server_pool, async_metrics, servers);
if (servers.empty()) if (servers.empty())
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"No servers started (add valid listen_host and 'tcp_port' or 'http_port' " "No servers started (add valid listen_host and 'tcp_port' or 'http_port' "
@ -1967,7 +1988,6 @@ HTTPContextPtr Server::httpContext() const
void Server::createServers( void Server::createServers(
Poco::Util::AbstractConfiguration & config, Poco::Util::AbstractConfiguration & config,
const Strings & listen_hosts, const Strings & listen_hosts,
const Strings & interserver_listen_hosts,
bool listen_try, bool listen_try,
Poco::ThreadPool & server_pool, Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics, AsynchronousMetrics & async_metrics,
@ -2189,6 +2209,23 @@ void Server::createServers(
httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params)); httpContext(), createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"), server_pool, socket, http_params));
}); });
} }
}
void Server::createInterserverServers(
Poco::Util::AbstractConfiguration & config,
const Strings & interserver_listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers,
bool start_servers)
{
const Settings & settings = global_context->getSettingsRef();
Poco::Timespan keep_alive_timeout(config.getUInt("keep_alive_timeout", 10), 0);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
/// Now iterate over interserver_listen_hosts /// Now iterate over interserver_listen_hosts
for (const auto & interserver_listen_host : interserver_listen_hosts) for (const auto & interserver_listen_host : interserver_listen_hosts)
@ -2237,14 +2274,14 @@ void Server::createServers(
#endif #endif
}); });
} }
} }
void Server::updateServers( void Server::updateServers(
Poco::Util::AbstractConfiguration & config, Poco::Util::AbstractConfiguration & config,
Poco::ThreadPool & server_pool, Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics, AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers) std::vector<ProtocolServerAdapter> & servers,
std::vector<ProtocolServerAdapter> & servers_to_start_before_tables)
{ {
Poco::Logger * log = &logger(); Poco::Logger * log = &logger();
@ -2270,11 +2307,19 @@ void Server::updateServers(
Poco::Util::AbstractConfiguration & previous_config = latest_config ? *latest_config : this->config(); Poco::Util::AbstractConfiguration & previous_config = latest_config ? *latest_config : this->config();
std::vector<ProtocolServerAdapter *> all_servers;
all_servers.reserve(servers.size() + servers_to_start_before_tables.size());
for (auto & server : servers) for (auto & server : servers)
all_servers.push_back(&server);
for (auto & server : servers_to_start_before_tables)
all_servers.push_back(&server);
for (auto * server : all_servers)
{ {
if (!server.isStopping()) if (!server->isStopping())
{ {
std::string port_name = server.getPortName(); std::string port_name = server->getPortName();
bool has_host = false; bool has_host = false;
bool is_http = false; bool is_http = false;
if (port_name.starts_with("protocols.")) if (port_name.starts_with("protocols."))
@ -2312,27 +2357,29 @@ void Server::updateServers(
/// NOTE: better to compare using getPortName() over using /// NOTE: better to compare using getPortName() over using
/// dynamic_cast<> since HTTPServer is also used for prometheus and /// dynamic_cast<> since HTTPServer is also used for prometheus and
/// internal replication communications. /// internal replication communications.
is_http = server.getPortName() == "http_port" || server.getPortName() == "https_port"; is_http = server->getPortName() == "http_port" || server->getPortName() == "https_port";
} }
if (!has_host) if (!has_host)
has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server.getListenHost()) != listen_hosts.end(); has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server->getListenHost()) != listen_hosts.end();
bool has_port = !config.getString(port_name, "").empty(); bool has_port = !config.getString(port_name, "").empty();
bool force_restart = is_http && !isSameConfiguration(previous_config, config, "http_handlers"); bool force_restart = is_http && !isSameConfiguration(previous_config, config, "http_handlers");
if (force_restart) if (force_restart)
LOG_TRACE(log, "<http_handlers> had been changed, will reload {}", server.getDescription()); LOG_TRACE(log, "<http_handlers> had been changed, will reload {}", server->getDescription());
if (!has_host || !has_port || config.getInt(server.getPortName()) != server.portNumber() || force_restart) if (!has_host || !has_port || config.getInt(server->getPortName()) != server->portNumber() || force_restart)
{ {
server.stop(); server->stop();
LOG_INFO(log, "Stopped listening for {}", server.getDescription()); LOG_INFO(log, "Stopped listening for {}", server->getDescription());
} }
} }
} }
createServers(config, listen_hosts, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true); createServers(config, listen_hosts, listen_try, server_pool, async_metrics, servers, /* start_servers= */ true);
createInterserverServers(config, interserver_listen_hosts, listen_try, server_pool, async_metrics, servers_to_start_before_tables, /* start_servers= */ true);
std::erase_if(servers, std::bind_front(check_server, "")); std::erase_if(servers, std::bind_front(check_server, ""));
std::erase_if(servers_to_start_before_tables, std::bind_front(check_server, ""));
} }
} }

View File

@ -102,6 +102,14 @@ private:
void createServers( void createServers(
Poco::Util::AbstractConfiguration & config, Poco::Util::AbstractConfiguration & config,
const Strings & listen_hosts, const Strings & listen_hosts,
bool listen_try,
Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers,
bool start_servers = false);
void createInterserverServers(
Poco::Util::AbstractConfiguration & config,
const Strings & interserver_listen_hosts, const Strings & interserver_listen_hosts,
bool listen_try, bool listen_try,
Poco::ThreadPool & server_pool, Poco::ThreadPool & server_pool,
@ -113,7 +121,8 @@ private:
Poco::Util::AbstractConfiguration & config, Poco::Util::AbstractConfiguration & config,
Poco::ThreadPool & server_pool, Poco::ThreadPool & server_pool,
AsynchronousMetrics & async_metrics, AsynchronousMetrics & async_metrics,
std::vector<ProtocolServerAdapter> & servers); std::vector<ProtocolServerAdapter> & servers,
std::vector<ProtocolServerAdapter> & servers_to_start_before_tables);
}; };
} }