ClickHouse/src/IO/ParallelReadBuffer.cpp

310 lines
8.7 KiB
C++
Raw Normal View History

#include <IO/ParallelReadBuffer.h>
2022-03-08 11:11:17 +00:00
#include <Poco/Logger.h>
2022-05-03 06:34:03 +00:00
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
2022-03-08 11:11:17 +00:00
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
2022-04-25 12:10:41 +00:00
struct ParallelReadBuffer::ReadWorker
{
explicit ReadWorker(SeekableReadBufferPtr reader_) : reader(std::move(reader_)), range(reader->getRemainingReadRange())
{
assert(range.right);
bytes_left = *range.right - range.left + 1;
}
2022-04-29 14:21:53 +00:00
auto hasSegment() const { return current_segment_index < segments.size(); }
2022-04-25 12:10:41 +00:00
auto nextSegment()
{
assert(hasSegment());
auto next_segment = std::move(segments[current_segment_index]);
++current_segment_index;
range.left += next_segment.size();
return next_segment;
}
SeekableReadBufferPtr reader;
std::vector<Memory<>> segments;
size_t current_segment_index = 0;
bool finished{false};
SeekableReadBuffer::Range range;
size_t bytes_left{0};
std::atomic_bool cancel{false};
2022-04-29 14:21:53 +00:00
std::mutex worker_mutex;
2022-04-25 12:10:41 +00:00
};
2022-04-29 14:21:53 +00:00
ParallelReadBuffer::ParallelReadBuffer(
std::unique_ptr<ReadBufferFactory> reader_factory_, CallbackRunner schedule_, size_t max_working_readers_)
2022-04-26 12:57:02 +00:00
: SeekableReadBuffer(nullptr, 0)
2022-03-09 13:57:33 +00:00
, max_working_readers(max_working_readers_)
2022-03-30 10:49:37 +00:00
, schedule(std::move(schedule_))
2022-03-09 13:57:33 +00:00
, reader_factory(std::move(reader_factory_))
{
try
{
addReaders();
}
catch (const Exception &)
{
finishAndWait();
throw;
}
2022-03-08 11:11:17 +00:00
}
2022-04-29 14:21:53 +00:00
bool ParallelReadBuffer::addReaderToPool()
2022-03-08 11:11:17 +00:00
{
2022-03-10 11:06:50 +00:00
auto reader = reader_factory->getReader();
if (!reader)
{
return false;
}
2022-03-16 14:59:06 +00:00
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
2022-03-16 09:26:01 +00:00
2022-04-12 13:54:53 +00:00
++active_working_reader;
2022-03-30 08:15:20 +00:00
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); });
2022-03-16 09:26:01 +00:00
2022-03-10 11:06:50 +00:00
return true;
}
2022-04-29 14:21:53 +00:00
void ParallelReadBuffer::addReaders()
2022-03-10 11:06:50 +00:00
{
2022-04-29 14:21:53 +00:00
while (read_workers.size() < max_working_readers && addReaderToPool())
2022-03-10 11:06:50 +00:00
;
}
2022-03-08 11:11:17 +00:00
off_t ParallelReadBuffer::seek(off_t offset, int whence)
{
if (whence != SEEK_SET)
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
if (offset < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
2022-03-09 09:25:07 +00:00
if (!working_buffer.empty() && static_cast<size_t>(offset) >= current_position - working_buffer.size() && offset < current_position)
2022-03-08 11:11:17 +00:00
{
pos = working_buffer.end() - (current_position - offset);
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
return offset;
}
const auto offset_is_in_range
2022-03-16 14:59:06 +00:00
= [&](const auto & range) { return static_cast<size_t>(offset) >= range.left && static_cast<size_t>(offset) <= *range.right; };
2022-03-08 11:11:17 +00:00
while (!read_workers.empty() && (offset < current_position || !offset_is_in_range(read_workers.front()->range)))
{
read_workers.front()->cancel = true;
2022-03-08 11:11:17 +00:00
read_workers.pop_front();
}
if (!read_workers.empty())
{
auto & front_worker = read_workers.front();
2022-03-16 14:59:06 +00:00
current_position = front_worker->range.left;
2022-03-08 11:11:17 +00:00
while (true)
{
2022-04-29 14:21:53 +00:00
std::unique_lock lock{front_worker->worker_mutex};
2022-04-25 12:10:41 +00:00
next_condvar.wait(lock, [&] { return emergency_stop || front_worker->hasSegment(); });
2022-03-09 13:26:40 +00:00
if (emergency_stop)
handleEmergencyStop();
2022-03-08 11:11:17 +00:00
2022-03-09 09:25:07 +00:00
auto next_segment = front_worker->nextSegment();
if (static_cast<size_t>(offset) < current_position + next_segment.size())
2022-03-08 11:11:17 +00:00
{
2022-03-09 09:25:07 +00:00
current_segment = std::move(next_segment);
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
current_position += current_segment.size();
2022-03-08 11:11:17 +00:00
pos = working_buffer.end() - (current_position - offset);
2022-04-29 14:21:53 +00:00
addReaders();
2022-03-08 11:11:17 +00:00
return offset;
}
2022-03-09 09:25:07 +00:00
current_position += next_segment.size();
2022-03-08 11:11:17 +00:00
}
}
finishAndWait();
reader_factory->seek(offset, whence);
all_completed = false;
2022-03-08 14:17:19 +00:00
read_workers.clear();
2022-03-08 11:11:17 +00:00
current_position = offset;
resetWorkingBuffer();
emergency_stop = false;
2022-03-11 13:38:19 +00:00
2022-04-29 14:21:53 +00:00
addReaders();
2022-03-08 11:11:17 +00:00
return offset;
}
2022-05-25 14:49:40 +00:00
size_t ParallelReadBuffer::getFileSize()
2022-03-08 11:11:17 +00:00
{
2022-04-26 12:57:02 +00:00
return reader_factory->getFileSize();
2022-03-08 11:11:17 +00:00
}
off_t ParallelReadBuffer::getPosition()
{
return current_position - available();
}
2022-03-11 13:38:19 +00:00
bool ParallelReadBuffer::currentWorkerReady() const
{
2022-03-16 14:59:06 +00:00
assert(!read_workers.empty());
2022-04-25 12:10:41 +00:00
return read_workers.front()->finished || read_workers.front()->hasSegment();
2022-03-11 13:38:19 +00:00
}
bool ParallelReadBuffer::currentWorkerCompleted() const
{
2022-04-25 12:10:41 +00:00
return read_workers.front()->finished && !read_workers.front()->hasSegment();
2022-03-11 13:38:19 +00:00
}
2022-03-09 13:26:40 +00:00
void ParallelReadBuffer::handleEmergencyStop()
{
2022-03-16 14:59:06 +00:00
// this can only be called from the main thread when there is an exception
assert(background_exception);
2022-05-03 06:34:03 +00:00
std::rethrow_exception(background_exception);
2022-03-09 13:26:40 +00:00
}
bool ParallelReadBuffer::nextImpl()
{
if (all_completed)
return false;
while (true)
{
2022-04-29 14:21:53 +00:00
std::unique_lock lock{read_workers.front()->worker_mutex};
2022-03-08 11:11:17 +00:00
next_condvar.wait(
lock,
[this]()
{
/// Check if no more readers left or current reader can be processed
2022-03-10 11:06:50 +00:00
return emergency_stop || currentWorkerReady();
2022-03-08 11:11:17 +00:00
});
2022-03-09 09:25:07 +00:00
bool worker_removed = false;
/// Remove completed units
2022-04-29 14:21:53 +00:00
while (currentWorkerCompleted() && !emergency_stop)
2022-03-09 09:25:07 +00:00
{
2022-05-04 06:28:17 +00:00
lock.unlock();
read_workers.pop_front();
2022-03-09 09:25:07 +00:00
worker_removed = true;
2022-04-29 14:21:53 +00:00
if (read_workers.empty())
break;
lock = std::unique_lock{read_workers.front()->worker_mutex};
2022-03-09 09:25:07 +00:00
}
2022-03-16 14:59:06 +00:00
if (emergency_stop)
handleEmergencyStop();
2022-03-09 09:25:07 +00:00
if (worker_removed)
2022-04-29 14:21:53 +00:00
addReaders();
/// All readers processed, stop
2022-03-10 11:06:50 +00:00
if (read_workers.empty())
{
all_completed = true;
return false;
}
2022-03-08 11:11:17 +00:00
auto & front_worker = read_workers.front();
/// Read data from first segment of the first reader
2022-04-25 12:10:41 +00:00
if (front_worker->hasSegment())
{
2022-03-09 09:25:07 +00:00
current_segment = front_worker->nextSegment();
2022-03-16 11:44:07 +00:00
if (currentWorkerCompleted())
{
2022-05-04 06:28:17 +00:00
lock.unlock();
2022-03-16 11:44:07 +00:00
read_workers.pop_front();
2022-04-29 14:21:53 +00:00
all_completed = !addReaderToPool() && read_workers.empty();
2022-03-16 11:44:07 +00:00
}
break;
}
}
2022-03-09 09:25:07 +00:00
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
2022-03-08 11:11:17 +00:00
current_position += working_buffer.size();
return true;
}
2022-03-10 11:06:50 +00:00
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
{
2022-03-30 09:00:06 +00:00
SCOPE_EXIT({
if (active_working_reader.fetch_sub(1) == 1)
active_working_reader.notify_all();
2022-03-30 09:00:06 +00:00
});
try
{
while (!emergency_stop && !read_worker->cancel)
{
if (!read_worker->reader->next())
2022-03-30 08:15:20 +00:00
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Failed to read all the data from the reader, missing {} bytes", read_worker->bytes_left);
if (emergency_stop || read_worker->cancel)
break;
Buffer buffer = read_worker->reader->buffer();
2022-03-16 11:44:07 +00:00
size_t bytes_to_copy = std::min(buffer.size(), read_worker->bytes_left);
2022-04-25 11:31:18 +00:00
Memory<> new_segment(bytes_to_copy);
2022-03-16 11:44:07 +00:00
memcpy(new_segment.data(), buffer.begin(), bytes_to_copy);
read_worker->reader->ignore(bytes_to_copy);
read_worker->bytes_left -= bytes_to_copy;
{
/// New data ready to be read
2022-04-29 14:21:53 +00:00
std::lock_guard lock(read_worker->worker_mutex);
read_worker->segments.emplace_back(std::move(new_segment));
2022-03-16 11:44:07 +00:00
read_worker->finished = read_worker->bytes_left == 0;
next_condvar.notify_all();
}
2022-03-16 11:44:07 +00:00
if (read_worker->finished)
{
break;
}
}
}
catch (...)
{
onBackgroundException();
}
}
void ParallelReadBuffer::onBackgroundException()
{
2022-05-03 06:34:03 +00:00
std::lock_guard lock{exception_mutex};
if (!background_exception)
background_exception = std::current_exception();
2022-05-03 06:34:03 +00:00
emergency_stop = true;
2022-03-09 13:57:33 +00:00
next_condvar.notify_all();
}
void ParallelReadBuffer::finishAndWait()
{
emergency_stop = true;
2022-03-09 13:57:33 +00:00
size_t active_readers = active_working_reader.load();
while (active_readers != 0)
{
active_working_reader.wait(active_readers);
active_readers = active_working_reader.load();
}
}
}