Merge pull request #36192 from Avogar/fix-parallel-read-buffer

Use atomic instead of mutex + condvar in ParallelReadBuffer
This commit is contained in:
Kruglov Pavel 2022-04-14 18:36:16 +02:00 committed by GitHub
commit 4bad2cd364
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 9 additions and 11 deletions

View File

@ -205,12 +205,8 @@ bool ParallelReadBuffer::nextImpl()
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{
SCOPE_EXIT({
std::lock_guard lock{mutex};
--active_working_reader;
if (active_working_reader == 0)
{
readers_done.notify_all();
}
if (active_working_reader.fetch_sub(1) == 1)
active_working_reader.notify_all();
});
try
@ -265,8 +261,12 @@ void ParallelReadBuffer::finishAndWait()
{
emergency_stop = true;
std::unique_lock lock{mutex};
readers_done.wait(lock, [&] { return active_working_reader == 0; });
size_t active_readers = active_working_reader.load();
while (active_readers != 0)
{
active_working_reader.wait(active_readers);
active_readers = active_working_reader.load();
}
}
}

View File

@ -135,9 +135,7 @@ private:
Segment current_segment;
size_t max_working_readers;
size_t active_working_reader{0};
// Triggered when all reader workers are done
std::condition_variable readers_done;
std::atomic_size_t active_working_reader{0};
CallbackRunner schedule;