diff --git a/src/Dictionaries/CacheDictionaryUpdateQueue.cpp b/src/Dictionaries/CacheDictionaryUpdateQueue.cpp index aee1f0de2f6..1fdaf10c57c 100644 --- a/src/Dictionaries/CacheDictionaryUpdateQueue.cpp +++ b/src/Dictionaries/CacheDictionaryUpdateQueue.cpp @@ -68,9 +68,9 @@ void CacheDictionaryUpdateQueue::waitForCurrentUpdateFinish if (update_queue.isFinished()) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "CacheDictionaryUpdateQueue finished"); - std::unique_lock update_lock(update_mutex); + std::unique_lock update_lock(update_unit_ptr->update_mutex); - bool result = is_update_finished.wait_for( + bool result = update_unit_ptr->is_update_finished.wait_for( update_lock, std::chrono::milliseconds(configuration.query_wait_timeout_milliseconds), [&] @@ -133,19 +133,23 @@ void CacheDictionaryUpdateQueue::updateThreadFunction() /// Update update_func(unit_to_update); - /// Notify thread about finished updating the bunch of ids - /// where their own ids were included. - std::lock_guard lock(update_mutex); + { + /// Notify thread about finished updating the bunch of ids + /// where their own ids were included. + std::lock_guard lock(unit_to_update->update_mutex); + unit_to_update->is_done = true; + } - unit_to_update->is_done = true; - is_update_finished.notify_all(); + unit_to_update->is_update_finished.notify_all(); } catch (...) { - std::lock_guard lock(update_mutex); + { + std::lock_guard lock(unit_to_update->update_mutex); + unit_to_update->current_exception = std::current_exception(); // NOLINT(bugprone-throw-keyword-missing) + } - unit_to_update->current_exception = std::current_exception(); // NOLINT(bugprone-throw-keyword-missing) - is_update_finished.notify_all(); + unit_to_update->is_update_finished.notify_all(); } } } diff --git a/src/Dictionaries/CacheDictionaryUpdateQueue.h b/src/Dictionaries/CacheDictionaryUpdateQueue.h index d6a195ca7b8..48598cb4548 100644 --- a/src/Dictionaries/CacheDictionaryUpdateQueue.h +++ b/src/Dictionaries/CacheDictionaryUpdateQueue.h @@ -77,6 +77,9 @@ private: std::atomic is_done{false}; std::exception_ptr current_exception{nullptr}; /// NOLINT + mutable std::mutex update_mutex; + mutable std::condition_variable is_update_finished; + /// While UpdateUnit is alive, it is accounted in update_queue size. CurrentMetrics::Increment alive_batch{CurrentMetrics::CacheDictionaryUpdateQueueBatches}; CurrentMetrics::Increment alive_keys; @@ -159,9 +162,6 @@ private: UpdateQueue update_queue; ThreadPool update_pool; - - mutable std::mutex update_mutex; - mutable std::condition_variable is_update_finished; }; extern template class CacheDictionaryUpdateQueue;