Merge pull request #41120 from azat/async-insert-queue-term

Terminate AsynchronousInsertQueue w/o waiting for timeout
This commit is contained in:
Alexander Tokmakov 2022-09-13 14:52:50 +03:00 committed by GitHub
commit 2845494624
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 26 additions and 9 deletions

View File

@ -141,7 +141,13 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
{ {
/// TODO: add a setting for graceful shutdown. /// TODO: add a setting for graceful shutdown.
shutdown = true; LOG_TRACE(log, "Shutting down the asynchronous insertion queue");
{
std::lock_guard lock(shutdown_mutex);
shutdown = true;
shutdown_cv.notify_all();
}
assert(dump_by_first_update_thread.joinable()); assert(dump_by_first_update_thread.joinable());
dump_by_first_update_thread.join(); dump_by_first_update_thread.join();
@ -162,6 +168,8 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
ErrorCodes::TIMEOUT_EXCEEDED, ErrorCodes::TIMEOUT_EXCEEDED,
"Wait for async insert timeout exceeded)"))); "Wait for async insert timeout exceeded)")));
} }
LOG_TRACE(log, "Asynchronous insertion queue finished");
} }
void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context) void AsynchronousInsertQueue::scheduleDataProcessingJob(const InsertQuery & key, InsertDataPtr data, ContextPtr global_context)
@ -276,10 +284,8 @@ void AsynchronousInsertQueue::busyCheck()
{ {
auto timeout = busy_timeout; auto timeout = busy_timeout;
while (!shutdown) while (!waitForShutdown(timeout))
{ {
std::this_thread::sleep_for(timeout);
/// TODO: use priority queue instead of raw unsorted queue. /// TODO: use priority queue instead of raw unsorted queue.
timeout = busy_timeout; timeout = busy_timeout;
std::shared_lock read_lock(rwlock); std::shared_lock read_lock(rwlock);
@ -301,9 +307,8 @@ void AsynchronousInsertQueue::busyCheck()
void AsynchronousInsertQueue::staleCheck() void AsynchronousInsertQueue::staleCheck()
{ {
while (!shutdown) while (!waitForShutdown(stale_timeout))
{ {
std::this_thread::sleep_for(stale_timeout);
std::shared_lock read_lock(rwlock); std::shared_lock read_lock(rwlock);
for (auto & [key, elem] : queue) for (auto & [key, elem] : queue)
@ -325,9 +330,8 @@ void AsynchronousInsertQueue::cleanup()
/// because it holds exclusive lock. /// because it holds exclusive lock.
auto timeout = busy_timeout * 5; auto timeout = busy_timeout * 5;
while (!shutdown) while (!waitForShutdown(timeout))
{ {
std::this_thread::sleep_for(timeout);
std::vector<InsertQuery> keys_to_remove; std::vector<InsertQuery> keys_to_remove;
{ {
@ -379,6 +383,12 @@ void AsynchronousInsertQueue::cleanup()
} }
} }
bool AsynchronousInsertQueue::waitForShutdown(const Milliseconds & timeout)
{
std::unique_lock shutdown_lock(shutdown_mutex);
return shutdown_cv.wait_for(shutdown_lock, timeout, [this]() { return shutdown; });
}
// static // static
void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context) void AsynchronousInsertQueue::processData(InsertQuery key, InsertDataPtr data, ContextPtr global_context)
try try

View File

@ -115,7 +115,10 @@ private:
const Milliseconds busy_timeout; const Milliseconds busy_timeout;
const Milliseconds stale_timeout; const Milliseconds stale_timeout;
std::atomic<bool> shutdown{false}; std::mutex shutdown_mutex;
std::condition_variable shutdown_cv;
bool shutdown{false};
ThreadPool pool; /// dump the data only inside this pool. ThreadPool pool; /// dump the data only inside this pool.
ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck() ThreadFromGlobalPool dump_by_first_update_thread; /// uses busy_timeout and busyCheck()
ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck() ThreadFromGlobalPool dump_by_last_update_thread; /// uses stale_timeout and staleCheck()
@ -136,6 +139,10 @@ private:
template <typename E> template <typename E>
static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception); static void finishWithException(const ASTPtr & query, const std::list<InsertData::EntryPtr> & entries, const E & exception);
/// @param timeout - time to wait
/// @return true if shutdown requested
bool waitForShutdown(const Milliseconds & timeout);
public: public:
auto getQueueLocked() const auto getQueueLocked() const
{ {