Merge pull request #66061 from vitlibar/fix-shutdown-in-grpc-server

Fix shutdown in GRPCServer
This commit is contained in:
Vitaly Baranov 2024-07-04 14:33:59 +00:00 committed by GitHub
commit 6bf1320870
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 66 additions and 43 deletions

View File

@ -1735,10 +1735,19 @@ namespace
class GRPCServer::Runner
{
public:
explicit Runner(GRPCServer & owner_) : owner(owner_) {}
explicit Runner(GRPCServer & owner_) : owner(owner_), log(owner.log) {}
~Runner()
{
try
{
stop();
}
catch (...)
{
tryLogCurrentException(log, "~Runner");
}
if (queue_thread.joinable())
queue_thread.join();
}
@ -1756,13 +1765,27 @@ public:
}
catch (...)
{
tryLogCurrentException("GRPCServer");
tryLogCurrentException(log, "run");
}
};
queue_thread = ThreadFromGlobalPool{runner_function};
}
void stop() { stopReceivingNewCalls(); }
void stop()
{
std::lock_guard lock{mutex};
should_stop = true;
if (current_calls.empty())
{
/// If there are no current calls then we call shutdownQueue() to signal the queue to stop waiting for next events.
/// The following line will make CompletionQueue::Next() stop waiting if the queue is empty and return false instead.
shutdownQueue();
/// If there are some current calls then we can't call shutdownQueue() right now because we want to let the current calls finish.
/// In this case function shutdownQueue() will be called later in run().
}
}
size_t getNumCurrentCalls() const
{
@ -1789,12 +1812,6 @@ private:
[this, call_type](bool ok) { onNewCall(call_type, ok); });
}
void stopReceivingNewCalls()
{
std::lock_guard lock{mutex};
should_stop = true;
}
void onNewCall(CallType call_type, bool responder_started_ok)
{
std::lock_guard lock{mutex};
@ -1827,38 +1844,47 @@ private:
void run()
{
setThreadName("GRPCServerQueue");
while (true)
bool ok = false;
void * tag = nullptr;
while (owner.queue->Next(&tag, &ok))
{
{
std::lock_guard lock{mutex};
finished_calls.clear(); /// Destroy finished calls.
/// If (should_stop == true) we continue processing until there is no active calls.
if (should_stop && current_calls.empty())
{
bool all_responders_gone = std::all_of(
responders_for_new_calls.begin(), responders_for_new_calls.end(),
[](std::unique_ptr<BaseResponder> & responder) { return !responder; });
if (all_responders_gone)
break;
}
}
bool ok = false;
void * tag = nullptr;
if (!owner.queue->Next(&tag, &ok))
{
/// Queue shutted down.
break;
}
auto & callback = *static_cast<CompletionCallback *>(tag);
callback(ok);
std::lock_guard lock{mutex};
finished_calls.clear(); /// Destroy finished calls.
/// If (should_stop == true) we continue processing while there are current calls.
if (should_stop && current_calls.empty())
shutdownQueue();
}
/// CompletionQueue::Next() returns false if the queue is fully drained and shut down.
}
/// Shutdown the queue if that isn't done yet.
void shutdownQueue()
{
chassert(should_stop);
if (queue_is_shut_down)
return;
queue_is_shut_down = true;
/// Server should be shut down before CompletionQueue.
if (owner.grpc_server)
owner.grpc_server->Shutdown();
if (owner.queue)
owner.queue->Shutdown();
}
GRPCServer & owner;
LoggerRawPtr log;
ThreadFromGlobalPool queue_thread;
bool queue_is_shut_down = false;
std::vector<std::unique_ptr<BaseResponder>> responders_for_new_calls;
std::map<Call *, std::unique_ptr<Call>> current_calls;
std::vector<std::unique_ptr<Call>> finished_calls;
@ -1876,16 +1902,6 @@ GRPCServer::GRPCServer(IServer & iserver_, const Poco::Net::SocketAddress & addr
GRPCServer::~GRPCServer()
{
/// Server should be shutdown before CompletionQueue.
if (grpc_server)
grpc_server->Shutdown();
/// Completion Queue should be shutdown before destroying the runner,
/// because the runner is now probably executing CompletionQueue::Next() on queue_thread
/// which is blocked until an event is available or the queue is shutting down.
if (queue)
queue->Shutdown();
runner.reset();
}

View File

@ -39,6 +39,7 @@ node = cluster.add_instance(
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
},
ipv6_address=IPV6_ADDRESS,
stay_alive=True,
)
main_channel = None
@ -763,3 +764,9 @@ def test_opentelemetry_context_propagation():
)
== "SELECT 1\tsome custom state\n"
)
def test_restart():
assert query("SELECT 1") == "1\n"
node.restart_clickhouse()
assert query("SELECT 2") == "2\n"