CacheDictionary simplify update queue

This commit is contained in:
Maksim Kita 2022-06-29 19:19:47 +02:00
parent 83375465eb
commit f443cf66f0
2 changed files with 17 additions and 13 deletions

View File

@ -68,9 +68,9 @@ void CacheDictionaryUpdateQueue<dictionary_key_type>::waitForCurrentUpdateFinish
if (update_queue.isFinished())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "CacheDictionaryUpdateQueue finished");
std::unique_lock<std::mutex> update_lock(update_mutex);
std::unique_lock<std::mutex> update_lock(update_unit_ptr->update_mutex);
bool result = is_update_finished.wait_for(
bool result = update_unit_ptr->is_update_finished.wait_for(
update_lock,
std::chrono::milliseconds(configuration.query_wait_timeout_milliseconds),
[&]
@ -133,19 +133,23 @@ void CacheDictionaryUpdateQueue<dictionary_key_type>::updateThreadFunction()
/// Update
update_func(unit_to_update);
/// Notify thread about finished updating the bunch of ids
/// where their own ids were included.
std::lock_guard lock(update_mutex);
{
/// Notify thread about finished updating the bunch of ids
/// where their own ids were included.
std::lock_guard lock(unit_to_update->update_mutex);
unit_to_update->is_done = true;
}
unit_to_update->is_done = true;
is_update_finished.notify_all();
unit_to_update->is_update_finished.notify_all();
}
catch (...)
{
std::lock_guard lock(update_mutex);
{
std::lock_guard lock(unit_to_update->update_mutex);
unit_to_update->current_exception = std::current_exception(); // NOLINT(bugprone-throw-keyword-missing)
}
unit_to_update->current_exception = std::current_exception(); // NOLINT(bugprone-throw-keyword-missing)
is_update_finished.notify_all();
unit_to_update->is_update_finished.notify_all();
}
}
}

View File

@ -77,6 +77,9 @@ private:
std::atomic<bool> is_done{false};
std::exception_ptr current_exception{nullptr}; /// NOLINT
mutable std::mutex update_mutex;
mutable std::condition_variable is_update_finished;
/// While UpdateUnit is alive, it is accounted in update_queue size.
CurrentMetrics::Increment alive_batch{CurrentMetrics::CacheDictionaryUpdateQueueBatches};
CurrentMetrics::Increment alive_keys;
@ -159,9 +162,6 @@ private:
UpdateQueue update_queue;
ThreadPool update_pool;
mutable std::mutex update_mutex;
mutable std::condition_variable is_update_finished;
};
extern template class CacheDictionaryUpdateQueue<DictionaryKeyType::Simple>;