Wait for connections to special servers

This commit is contained in:
alesapin 2020-12-16 13:04:46 +03:00
parent dff71850a8
commit ea4d11cb73

View File

@ -139,6 +139,28 @@ void setupTmpPath(Poco::Logger * log, const std::string & path)
}
}
int waitServersToFinish(std::vector<DB::ProtocolServerAdapter> & servers, size_t seconds_to_wait)
{
const int sleep_max_ms = 1000 * seconds_to_wait;
const int sleep_one_ms = 100;
int sleep_current_ms = 0;
int current_connections = 0;
while (sleep_current_ms < sleep_max_ms)
{
current_connections = 0;
for (auto & server : servers)
{
server.stop();
current_connections += server.currentConnections();
}
if (!current_connections)
break;
sleep_current_ms += sleep_one_ms;
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_one_ms));
}
return current_connections;
}
}
namespace DB
@ -794,8 +816,29 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_DEBUG(log, "Shut down storages.");
for (auto & server : servers_to_start_before_tables)
server.stop();
if (!servers_to_start_before_tables.empty())
{
LOG_DEBUG(log, "Waiting for current connections to servers for tables to finish.");
int current_connections = 0;
for (auto & server : servers_to_start_before_tables)
{
server.stop();
current_connections += server.currentConnections();
}
if (current_connections)
LOG_INFO(log, "Closed all listening sockets. Waiting for {} outstanding connections.", current_connections);
else
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections);
else
LOG_INFO(log, "Closed connections to servers for tables.");
}
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
* At this moment, no one could own shared part of Context.
@ -1167,24 +1210,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getProcessList().killAllQueries();
if (current_connections)
{
const int sleep_max_ms = 1000 * config().getInt("shutdown_wait_unfinished", 5);
const int sleep_one_ms = 100;
int sleep_current_ms = 0;
while (sleep_current_ms < sleep_max_ms)
{
current_connections = 0;
for (auto & server : servers)
{
server.stop();
current_connections += server.currentConnections();
}
if (!current_connections)
break;
sleep_current_ms += sleep_one_ms;
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_one_ms));
}
}
current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections. But {} remain."