From 4ec7b10c735794469004ef8bafcb808acd2dbc2a Mon Sep 17 00:00:00 2001 From: Yarik Briukhovetskyi <114298166+yariks5s@users.noreply.github.com> Date: Fri, 4 Oct 2024 20:46:28 +0200 Subject: [PATCH] add proper thread termination --- programs/server/Server.cpp | 2 ++ src/Interpreters/CancellationChecker.cpp | 13 ++++++++----- src/Interpreters/CancellationChecker.h | 3 ++- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 6ec357a9927..2feba9dfc46 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -2318,6 +2318,8 @@ try if (current_connections) current_connections = waitServersToFinish(servers, servers_lock, server_settings.shutdown_wait_unfinished); + CancellationChecker::getInstance().terminateThread(); + if (current_connections) LOG_WARNING(log, "Closed connections. But {} remain." " Tip: To increase wait time add to config: 60", current_connections); diff --git a/src/Interpreters/CancellationChecker.cpp b/src/Interpreters/CancellationChecker.cpp index 651efa73584..a7fbdf24497 100644 --- a/src/Interpreters/CancellationChecker.cpp +++ b/src/Interpreters/CancellationChecker.cpp @@ -30,17 +30,20 @@ CancellationChecker::CancellationChecker() : stop_thread(false) { } -CancellationChecker::~CancellationChecker() -{ - stop_thread = true; -} - CancellationChecker& CancellationChecker::getInstance() { static CancellationChecker instance; return instance; } +void CancellationChecker::terminateThread() +{ + LOG_TRACE(getLogger("CancellationChecker"), "Stopping CancellationChecker"); + stop_thread = true; + cond_var.notify_all(); +} + + void CancellationChecker::cancelTask(std::shared_ptr query, CancelReason reason) { query->cancelQuery(/*kill=*/false, /*reason=*/reason); diff --git a/src/Interpreters/CancellationChecker.h b/src/Interpreters/CancellationChecker.h index 252cc0e7420..2def3537253 100644 --- a/src/Interpreters/CancellationChecker.h +++ b/src/Interpreters/CancellationChecker.h @@ -34,7 +34,6 @@ class CancellationChecker { private: CancellationChecker(); - ~CancellationChecker(); // Priority queue to manage tasks based on endTime std::multiset querySet; @@ -55,6 +54,8 @@ public: CancellationChecker(const CancellationChecker&) = delete; CancellationChecker& operator=(const CancellationChecker&) = delete; + void terminateThread(); + // Method to add a new task to the multiset void appendTask(const std::shared_ptr & query, const Int64 & timeout);