This commit is contained in:
Nikita Mikhaylov 2020-08-11 22:50:48 +03:00
parent 519c5c500e
commit 4c89fb8f84

View File

@ -750,53 +750,29 @@ void CacheDictionary::updateThreadFunction()
setThreadName("AsyncUpdater");
while (!finished)
{
UpdateUnitPtr first_popped;
update_queue.pop(first_popped);
UpdateUnitPtr popped;
update_queue.pop(popped);
if (finished)
break;
/// Here we pop as many unit pointers from update queue as we can.
/// We fix current size to avoid livelock (or too long waiting),
/// when this thread pops from the queue and other threads push to the queue.
const size_t current_queue_size = update_queue.size();
if (current_queue_size > 0)
LOG_TRACE(log, "Performing bunch of keys update in cache dictionary with {} keys", current_queue_size + 1);
std::vector<UpdateUnitPtr> update_request;
update_request.reserve(current_queue_size + 1);
update_request.emplace_back(first_popped);
UpdateUnitPtr current_unit_ptr;
while (update_request.size() < current_queue_size + 1 && update_queue.tryPop(current_unit_ptr))
update_request.emplace_back(std::move(current_unit_ptr));
BunchUpdateUnit bunch_update_unit(update_request);
try
{
/// Update a bunch of ids.
update(bunch_update_unit);
update(popped);
/// Notify all threads about finished updating the bunch of ids
/// Notify thread about finished updating the bunch of ids
/// where their own ids were included.
std::unique_lock<std::mutex> lock(update_mutex);
for (auto & unit_ptr: update_request)
unit_ptr->is_done = true;
popped->is_done = true;
is_update_finished.notify_all();
}
catch (...)
{
std::unique_lock<std::mutex> lock(update_mutex);
/// It is a big trouble, because one bad query can make other threads fail with not relative exception.
/// So at this point all threads (and queries) will receive the same exception.
for (auto & unit_ptr: update_request)
unit_ptr->current_exception = std::current_exception();
popped->current_exception = std::current_exception();
is_update_finished.notify_all();
}
}