mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #41215 from ClickHouse/keeper-close-connection-on-shutdown
Close sessions on Keeper shutdown
This commit is contained in:
commit
36f46891dd
@ -30,6 +30,7 @@ struct Settings;
|
||||
M(UInt64, snapshot_distance, 100000, "How many log items we have to collect to write new snapshot", 0) \
|
||||
M(Bool, auto_forwarding, true, "Allow to forward write requests from followers to leader", 0) \
|
||||
M(Milliseconds, shutdown_timeout, 5000, "How much time we will wait until RAFT shutdown", 0) \
|
||||
M(Milliseconds, session_shutdown_timeout, 10000, "How much time we will wait until sessions are closed during shutdown", 0) \
|
||||
M(Milliseconds, startup_timeout, 180000, "How much time we will wait until RAFT to start.", 0) \
|
||||
M(LogsLevel, raft_logs_level, LogsLevel::information, "Log internal RAFT logs into main server log level. Valid values: 'trace', 'debug', 'information', 'warning', 'error', 'fatal', 'none'", 0) \
|
||||
M(UInt64, rotate_log_storage_interval, 100000, "How many records will be stored in one log storage file", 0) \
|
||||
|
@ -354,9 +354,6 @@ void KeeperDispatcher::shutdown()
|
||||
update_configuration_thread.join();
|
||||
}
|
||||
|
||||
if (server)
|
||||
server->shutdown();
|
||||
|
||||
KeeperStorage::RequestForSession request_for_session;
|
||||
|
||||
/// Set session expired for all pending requests
|
||||
@ -368,10 +365,58 @@ void KeeperDispatcher::shutdown()
|
||||
setResponse(request_for_session.session_id, response);
|
||||
}
|
||||
|
||||
/// Clear all registered sessions
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
session_to_response_callback.clear();
|
||||
KeeperStorage::RequestsForSessions close_requests;
|
||||
{
|
||||
/// Clear all registered sessions
|
||||
std::lock_guard lock(session_to_response_callback_mutex);
|
||||
|
||||
if (hasLeader())
|
||||
{
|
||||
close_requests.reserve(session_to_response_callback.size());
|
||||
// send to leader CLOSE requests for active sessions
|
||||
for (const auto & [session, response] : session_to_response_callback)
|
||||
{
|
||||
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
|
||||
request->xid = Coordination::CLOSE_XID;
|
||||
using namespace std::chrono;
|
||||
KeeperStorage::RequestForSession request_info
|
||||
{
|
||||
.session_id = session,
|
||||
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
|
||||
.request = std::move(request),
|
||||
};
|
||||
|
||||
close_requests.push_back(std::move(request_info));
|
||||
}
|
||||
}
|
||||
|
||||
session_to_response_callback.clear();
|
||||
}
|
||||
|
||||
// if there is no leader, there is no reason to do CLOSE because it's a write request
|
||||
if (hasLeader() && !close_requests.empty())
|
||||
{
|
||||
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
|
||||
const auto raft_result = server->putRequestBatch(close_requests);
|
||||
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
|
||||
auto sessions_closing_done = sessions_closing_done_promise->get_future();
|
||||
raft_result->when_ready([sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
|
||||
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
|
||||
nuraft::ptr<std::exception> & /*exception*/) { sessions_closing_done_promise->set_value(); });
|
||||
|
||||
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
|
||||
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
|
||||
LOG_WARNING(
|
||||
log,
|
||||
"Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.",
|
||||
session_shutdown_timeout);
|
||||
}
|
||||
|
||||
if (server)
|
||||
server->shutdown();
|
||||
|
||||
CurrentMetrics::set(CurrentMetrics::KeeperAliveConnections, 0);
|
||||
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -418,13 +463,15 @@ void KeeperDispatcher::sessionCleanerTask()
|
||||
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
|
||||
|
||||
/// Close session == send close request to raft server
|
||||
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
|
||||
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
|
||||
request->xid = Coordination::CLOSE_XID;
|
||||
KeeperStorage::RequestForSession request_info;
|
||||
request_info.request = request;
|
||||
using namespace std::chrono;
|
||||
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
|
||||
request_info.session_id = dead_session;
|
||||
KeeperStorage::RequestForSession request_info
|
||||
{
|
||||
.session_id = dead_session,
|
||||
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
|
||||
.request = std::move(request),
|
||||
};
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
if (!requests_queue->push(std::move(request_info)))
|
||||
|
@ -1,4 +1,4 @@
|
||||
<yandex>
|
||||
<clickhouse>
|
||||
<keeper_server>
|
||||
<tcp_port>9181</tcp_port>
|
||||
<server_id>1</server_id>
|
||||
@ -19,9 +19,19 @@
|
||||
<id>1</id>
|
||||
<hostname>node1</hostname>
|
||||
<port>9234</port>
|
||||
<can_become_leader>true</can_become_leader>
|
||||
<priority>3</priority>
|
||||
</server>
|
||||
<server>
|
||||
<id>2</id>
|
||||
<hostname>node2</hostname>
|
||||
<port>9234</port>
|
||||
<start_as_follower>true</start_as_follower>
|
||||
</server>
|
||||
<server>
|
||||
<id>3</id>
|
||||
<hostname>node3</hostname>
|
||||
<port>9234</port>
|
||||
<start_as_follower>true</start_as_follower>
|
||||
</server>
|
||||
</raft_configuration>
|
||||
</keeper_server>
|
||||
</yandex>
|
||||
</clickhouse>
|
@ -0,0 +1,37 @@
|
||||
<clickhouse>
|
||||
<keeper_server>
|
||||
<tcp_port>9181</tcp_port>
|
||||
<server_id>2</server_id>
|
||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
|
||||
<four_letter_word_white_list>*</four_letter_word_white_list>
|
||||
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>5000</operation_timeout_ms>
|
||||
<session_timeout_ms>10000</session_timeout_ms>
|
||||
<min_session_timeout_ms>5000</min_session_timeout_ms>
|
||||
<snapshot_distance>75</snapshot_distance>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
</coordination_settings>
|
||||
|
||||
<raft_configuration>
|
||||
<server>
|
||||
<id>1</id>
|
||||
<hostname>node1</hostname>
|
||||
<port>9234</port>
|
||||
</server>
|
||||
<server>
|
||||
<id>2</id>
|
||||
<hostname>node2</hostname>
|
||||
<port>9234</port>
|
||||
<start_as_follower>true</start_as_follower>
|
||||
</server>
|
||||
<server>
|
||||
<id>3</id>
|
||||
<hostname>node3</hostname>
|
||||
<port>9234</port>
|
||||
<start_as_follower>true</start_as_follower>
|
||||
</server>
|
||||
</raft_configuration>
|
||||
</keeper_server>
|
||||
</clickhouse>
|
@ -0,0 +1,37 @@
|
||||
<clickhouse>
|
||||
<keeper_server>
|
||||
<tcp_port>9181</tcp_port>
|
||||
<server_id>3</server_id>
|
||||
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
|
||||
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
|
||||
<four_letter_word_white_list>*</four_letter_word_white_list>
|
||||
|
||||
<coordination_settings>
|
||||
<operation_timeout_ms>5000</operation_timeout_ms>
|
||||
<session_timeout_ms>10000</session_timeout_ms>
|
||||
<min_session_timeout_ms>5000</min_session_timeout_ms>
|
||||
<snapshot_distance>75</snapshot_distance>
|
||||
<raft_logs_level>trace</raft_logs_level>
|
||||
</coordination_settings>
|
||||
|
||||
<raft_configuration>
|
||||
<server>
|
||||
<id>1</id>
|
||||
<hostname>node1</hostname>
|
||||
<port>9234</port>
|
||||
</server>
|
||||
<server>
|
||||
<id>2</id>
|
||||
<hostname>node2</hostname>
|
||||
<port>9234</port>
|
||||
<start_as_follower>true</start_as_follower>
|
||||
</server>
|
||||
<server>
|
||||
<id>3</id>
|
||||
<hostname>node3</hostname>
|
||||
<port>9234</port>
|
||||
<start_as_follower>true</start_as_follower>
|
||||
</server>
|
||||
</raft_configuration>
|
||||
</keeper_server>
|
||||
</clickhouse>
|
@ -10,7 +10,15 @@ from kazoo.client import KazooClient
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node1 = cluster.add_instance(
|
||||
"node1", main_configs=["configs/keeper_config.xml"], stay_alive=True
|
||||
"node1", main_configs=["configs/keeper_config1.xml"], stay_alive=True
|
||||
)
|
||||
|
||||
node2 = cluster.add_instance(
|
||||
"node2", main_configs=["configs/keeper_config2.xml"], stay_alive=True
|
||||
)
|
||||
|
||||
node3 = cluster.add_instance(
|
||||
"node3", main_configs=["configs/keeper_config3.xml"], stay_alive=True
|
||||
)
|
||||
|
||||
bool_struct = struct.Struct("B")
|
||||
@ -61,7 +69,7 @@ def wait_node(node):
|
||||
|
||||
|
||||
def wait_nodes():
|
||||
for n in [node1]:
|
||||
for n in [node1, node2, node3]:
|
||||
wait_node(n)
|
||||
|
||||
|
||||
@ -165,3 +173,21 @@ def test_session_timeout(started_cluster):
|
||||
|
||||
negotiated_timeout, _ = handshake(node1.name, session_timeout=20000, session_id=0)
|
||||
assert negotiated_timeout == 10000
|
||||
|
||||
|
||||
def test_session_close_shutdown(started_cluster):
|
||||
wait_nodes()
|
||||
|
||||
node1_zk = get_fake_zk(node1.name)
|
||||
node2_zk = get_fake_zk(node2.name)
|
||||
|
||||
eph_node = "/test_node"
|
||||
node2_zk.create(eph_node, ephemeral=True)
|
||||
assert node1_zk.exists(eph_node) != None
|
||||
|
||||
# shutdown while session is active
|
||||
node2.stop_clickhouse()
|
||||
|
||||
assert node1_zk.exists(eph_node) == None
|
||||
|
||||
node2.start_clickhouse()
|
||||
|
Loading…
Reference in New Issue
Block a user