ClickHouse/src/IO/ParallelReadBuffer.cpp

241 lines
6.7 KiB
C++
Raw Normal View History

#include <IO/ParallelReadBuffer.h>
2022-03-08 11:11:17 +00:00
#include <base/logger_useful.h>
#include <Poco/Logger.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
2022-03-08 11:11:17 +00:00
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
ParallelReadBuffer::ParallelReadBuffer(std::unique_ptr<ReadBufferFactory> reader_factory_, size_t max_working_readers)
2022-03-08 11:11:17 +00:00
: SeekableReadBufferWithSize(nullptr, 0), pool(max_working_readers), reader_factory(std::move(reader_factory_))
{
2022-03-08 11:11:17 +00:00
initializeWorkers();
}
void ParallelReadBuffer::initializeWorkers()
{
for (size_t i = 0; i < pool.getMaxThreads(); ++i)
pool.scheduleOrThrow([this] { processor(); });
}
2022-03-08 11:11:17 +00:00
off_t ParallelReadBuffer::seek(off_t offset, int whence)
{
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
if (!working_buffer.empty() && size_t(offset) >= current_position - working_buffer.size() && offset < current_position)
{
pos = working_buffer.end() - (current_position - offset);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return offset;
}
const auto offset_is_in_range
= [&](const auto & range) { return static_cast<size_t>(offset) >= range.from && static_cast<size_t>(offset) < range.to; };
std::unique_lock lock{mutex};
while (!read_workers.empty() && (offset < current_position || !offset_is_in_range(read_workers.front()->range)))
{
read_workers.pop_front();
}
if (!read_workers.empty())
{
auto & front_worker = read_workers.front();
auto & segments = front_worker->segments;
current_position = front_worker->range.from;
while (true)
{
next_condvar.wait(lock, [&] { return !segments.empty(); });
if (static_cast<size_t>(offset) < current_position + segments.front().size())
{
segment = std::move(segments.front());
segments.pop_front();
working_buffer = internal_buffer = Buffer(segment.data(), segment.data() + segment.size());
current_position += segment.size();
front_worker->range.from += segment.size();
pos = working_buffer.end() - (current_position - offset);
return offset;
}
current_position += segments.front().size();
front_worker->range.from += segments.front().size();
segments.pop_front();
}
}
lock.unlock();
finishAndWait();
reader_factory->seek(offset, whence);
all_created = false;
all_completed = false;
2022-03-08 14:17:19 +00:00
read_workers.clear();
2022-03-08 11:11:17 +00:00
current_position = offset;
resetWorkingBuffer();
emergency_stop = false;
initializeWorkers();
return offset;
}
std::optional<size_t> ParallelReadBuffer::getTotalSize()
{
std::lock_guard lock{mutex};
return reader_factory->getTotalSize();
}
off_t ParallelReadBuffer::getPosition()
{
return current_position - available();
}
bool ParallelReadBuffer::nextImpl()
{
if (all_completed)
return false;
while (true)
{
2022-03-08 11:11:17 +00:00
std::unique_lock lock(mutex);
next_condvar.wait(
lock,
[this]()
{
/// Check if no more readers left or current reader can be processed
return emergency_stop || (all_created && read_workers.empty()) || currentWorkerReady();
});
if (emergency_stop)
{
if (background_exception)
std::rethrow_exception(background_exception);
else
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Emergency stop");
}
/// Remove completed units
while (!read_workers.empty() && currentWorkerCompleted())
read_workers.pop_front();
/// All readers processed, stop
2022-03-08 11:11:17 +00:00
if (read_workers.empty() && all_created)
{
all_completed = true;
return false;
}
2022-03-08 11:11:17 +00:00
auto & front_worker = read_workers.front();
/// Read data from first segment of the first reader
2022-03-08 11:11:17 +00:00
if (!front_worker->segments.empty())
{
2022-03-08 11:11:17 +00:00
segment = std::move(front_worker->segments.front());
front_worker->range.from += segment.size();
front_worker->segments.pop_front();
break;
}
}
working_buffer = internal_buffer = Buffer(segment.data(), segment.data() + segment.size());
2022-03-08 11:11:17 +00:00
current_position += working_buffer.size();
return true;
}
void ParallelReadBuffer::processor()
{
2022-03-08 11:11:17 +00:00
while (!emergency_stop)
{
ReadWorkerPtr worker;
{
/// Create new read worker and put in into end of queue
/// reader_factory is not thread safe, so we call getReader under lock
2022-03-08 11:11:17 +00:00
std::lock_guard lock(mutex);
auto reader = reader_factory->getReader();
2022-03-08 11:11:17 +00:00
if (!reader)
{
all_created = true;
2022-03-08 14:17:19 +00:00
next_condvar.notify_all();
break;
2022-03-08 11:11:17 +00:00
}
2022-03-08 11:11:17 +00:00
worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader->first), reader->second));
}
/// Start processing
2022-03-04 14:21:52 +00:00
readerThreadFunction(std::move(worker));
}
}
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{
try
{
while (!emergency_stop)
{
if (!read_worker->reader->next())
{
2022-03-08 11:11:17 +00:00
std::lock_guard lock(mutex);
read_worker->finished = true;
next_condvar.notify_all();
break;
}
if (emergency_stop)
break;
Buffer buffer = read_worker->reader->buffer();
Memory<> new_segment(buffer.size());
memcpy(new_segment.data(), buffer.begin(), buffer.size());
{
/// New data ready to be read
2022-03-08 11:11:17 +00:00
std::lock_guard lock(mutex);
read_worker->segments.emplace_back(std::move(new_segment));
next_condvar.notify_all();
}
}
}
catch (...)
{
onBackgroundException();
}
}
void ParallelReadBuffer::onBackgroundException()
{
2022-03-08 11:11:17 +00:00
std::lock_guard lock(mutex);
if (!background_exception)
{
background_exception = std::current_exception();
}
emergency_stop = true;
}
void ParallelReadBuffer::finishAndWait()
{
emergency_stop = true;
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}