Use mutex per worker

This commit is contained in:
Antonio Andelic 2022-04-29 14:21:53 +00:00
parent a1a22b0007
commit bfdb4566c6
2 changed files with 24 additions and 27 deletions

View File

@ -21,10 +21,7 @@ struct ParallelReadBuffer::ReadWorker
bytes_left = *range.right - range.left + 1;
}
auto hasSegment() const
{
return current_segment_index < segments.size();
}
auto hasSegment() const { return current_segment_index < segments.size(); }
auto nextSegment()
{
@ -42,19 +39,20 @@ struct ParallelReadBuffer::ReadWorker
SeekableReadBuffer::Range range;
size_t bytes_left{0};
std::atomic_bool cancel{false};
std::mutex worker_mutex;
};
ParallelReadBuffer::ParallelReadBuffer(std::unique_ptr<ReadBufferFactory> reader_factory_, CallbackRunner schedule_, size_t max_working_readers_)
ParallelReadBuffer::ParallelReadBuffer(
std::unique_ptr<ReadBufferFactory> reader_factory_, CallbackRunner schedule_, size_t max_working_readers_)
: SeekableReadBuffer(nullptr, 0)
, max_working_readers(max_working_readers_)
, schedule(std::move(schedule_))
, reader_factory(std::move(reader_factory_))
{
std::unique_lock<std::mutex> lock{mutex};
addReaders(lock);
addReaders();
}
bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer_lock*/)
bool ParallelReadBuffer::addReaderToPool()
{
auto reader = reader_factory->getReader();
if (!reader)
@ -70,9 +68,9 @@ bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer
return true;
}
void ParallelReadBuffer::addReaders(std::unique_lock<std::mutex> & buffer_lock)
void ParallelReadBuffer::addReaders()
{
while (read_workers.size() < max_working_readers && addReaderToPool(buffer_lock))
while (read_workers.size() < max_working_readers && addReaderToPool())
;
}
@ -93,7 +91,6 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
return offset;
}
std::unique_lock lock{mutex};
const auto offset_is_in_range
= [&](const auto & range) { return static_cast<size_t>(offset) >= range.left && static_cast<size_t>(offset) <= *range.right; };
@ -109,6 +106,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
current_position = front_worker->range.left;
while (true)
{
std::unique_lock lock{front_worker->worker_mutex};
next_condvar.wait(lock, [&] { return emergency_stop || front_worker->hasSegment(); });
if (emergency_stop)
@ -121,7 +119,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
current_position += current_segment.size();
pos = working_buffer.end() - (current_position - offset);
addReaders(lock);
addReaders();
return offset;
}
@ -129,7 +127,6 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
}
}
lock.unlock();
finishAndWait();
reader_factory->seek(offset, whence);
@ -141,14 +138,12 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
emergency_stop = false;
lock.lock();
addReaders(lock);
addReaders();
return offset;
}
std::optional<size_t> ParallelReadBuffer::getFileSize()
{
std::lock_guard lock{mutex};
return reader_factory->getFileSize();
}
@ -165,7 +160,6 @@ bool ParallelReadBuffer::currentWorkerReady() const
bool ParallelReadBuffer::currentWorkerCompleted() const
{
assert(!read_workers.empty());
return read_workers.front()->finished && !read_workers.front()->hasSegment();
}
@ -184,7 +178,7 @@ bool ParallelReadBuffer::nextImpl()
while (true)
{
std::unique_lock lock(mutex);
std::unique_lock lock{read_workers.front()->worker_mutex};
next_condvar.wait(
lock,
[this]()
@ -195,17 +189,22 @@ bool ParallelReadBuffer::nextImpl()
bool worker_removed = false;
/// Remove completed units
while (!read_workers.empty() && currentWorkerCompleted() && !emergency_stop)
while (currentWorkerCompleted() && !emergency_stop)
{
read_workers.pop_front();
worker_removed = true;
if (read_workers.empty())
break;
lock = std::unique_lock{read_workers.front()->worker_mutex};
}
if (emergency_stop)
handleEmergencyStop();
if (worker_removed)
addReaders(lock);
addReaders();
/// All readers processed, stop
if (read_workers.empty())
@ -222,7 +221,7 @@ bool ParallelReadBuffer::nextImpl()
if (currentWorkerCompleted())
{
read_workers.pop_front();
all_completed = !addReaderToPool(lock) && read_workers.empty();
all_completed = !addReaderToPool() && read_workers.empty();
}
break;
}
@ -258,7 +257,7 @@ void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
read_worker->bytes_left -= bytes_to_copy;
{
/// New data ready to be read
std::lock_guard lock(mutex);
std::lock_guard lock(read_worker->worker_mutex);
read_worker->segments.emplace_back(std::move(new_segment));
read_worker->finished = read_worker->bytes_left == 0;
next_condvar.notify_all();
@ -278,7 +277,6 @@ void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
void ParallelReadBuffer::onBackgroundException()
{
std::lock_guard lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();

View File

@ -30,7 +30,7 @@ private:
bool nextImpl() override;
public:
class ReadBufferFactory : public WithFileSize
class ReadBufferFactory : public WithFileSize
{
public:
virtual SeekableReadBufferPtr getReader() = 0;
@ -60,8 +60,8 @@ private:
void handleEmergencyStop();
void addReaders(std::unique_lock<std::mutex> & buffer_lock);
bool addReaderToPool(std::unique_lock<std::mutex> & buffer_lock);
void addReaders();
bool addReaderToPool();
/// Process read_worker, read data and save into internal segments queue
void readerThreadFunction(ReadWorkerPtr read_worker);
@ -86,7 +86,6 @@ private:
*/
std::deque<ReadWorkerPtr> read_workers;
std::mutex mutex;
/// Triggered when new data available
std::condition_variable next_condvar;