Retry if sessions not closed because missing leader

This commit is contained in:
Antonio Andelic 2023-07-20 14:02:55 +00:00
parent f53ff5d4f2
commit f997adfe27
2 changed files with 54 additions and 25 deletions

View File

@ -473,23 +473,30 @@ void KeeperDispatcher::shutdown()
session_to_response_callback.clear();
}
// if there is no leader, there is no reason to do CLOSE because it's a write request
if (server && hasLeader() && !close_requests.empty())
if (server && !close_requests.empty())
{
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
const auto raft_result = server->putRequestBatch(close_requests);
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
auto sessions_closing_done = sessions_closing_done_promise->get_future();
raft_result->when_ready([my_sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
nuraft::ptr<std::exception> & /*exception*/) { my_sessions_closing_done_promise->set_value(); });
// if there is no leader, there is no reason to do CLOSE because it's a write request
if (hasLeader())
{
LOG_INFO(log, "Trying to close {} session(s)", close_requests.size());
const auto raft_result = server->putRequestBatch(close_requests);
auto sessions_closing_done_promise = std::make_shared<std::promise<void>>();
auto sessions_closing_done = sessions_closing_done_promise->get_future();
raft_result->when_ready([my_sessions_closing_done_promise = std::move(sessions_closing_done_promise)](
nuraft::cmd_result<nuraft::ptr<nuraft::buffer>> & /*result*/,
nuraft::ptr<std::exception> & /*exception*/) { my_sessions_closing_done_promise->set_value(); });
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
LOG_WARNING(
log,
"Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.",
session_shutdown_timeout);
auto session_shutdown_timeout = configuration_and_settings->coordination_settings->session_shutdown_timeout.totalMilliseconds();
if (sessions_closing_done.wait_for(std::chrono::milliseconds(session_shutdown_timeout)) != std::future_status::ready)
LOG_WARNING(
log,
"Failed to close sessions in {}ms. If they are not closed, they will be closed after session timeout.",
session_shutdown_timeout);
}
else
{
LOG_INFO(log, "Sessions cannot be closed during shutdown because there is no active leader");
}
}
if (server)

View File

@ -6,6 +6,7 @@ import socket
import struct
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
# from kazoo.protocol.serialization import Connect, read_buffer, write_buffer
@ -162,17 +163,38 @@ def test_session_timeout(started_cluster):
def test_session_close_shutdown(started_cluster):
wait_nodes()
node1_zk = get_fake_zk(node1.name)
node2_zk = get_fake_zk(node2.name)
node1_zk = None
node2_zk = None
for i in range(20):
node1_zk = get_fake_zk(node1.name)
node2_zk = get_fake_zk(node2.name)
eph_node = "/test_node"
node2_zk.create(eph_node, ephemeral=True)
node1_zk.sync(eph_node)
assert node1_zk.exists(eph_node) != None
eph_node = "/test_node"
node2_zk.create(eph_node, ephemeral=True)
node1_zk.sync(eph_node)
# shutdown while session is active
node2.stop_clickhouse()
node1_zk.exists(eph_node) != None
assert node1_zk.exists(eph_node) == None
# restart while session is active so it's closed during shutdown
node2.restart_clickhouse()
node2.start_clickhouse()
if node1_zk.exists(eph_node) == None:
break
assert node2.contains_in_log("Sessions cannot be closed during shutdown because there is no active leader")
try:
node1_zk.delete(eph_node)
except NoNodeError:
pass
assert node1_zk.exists(eph_node) == None
destroy_zk_client(node1_zk)
node1_zk = None
destroy_zk_client(node2_zk)
node2_zk = None
time.sleep(1)
else:
assert False, "Session wasn't properly cleaned up on shutdown"