2022-03-04 10:20:57 +00:00
|
|
|
#include <IO/ParallelReadBuffer.h>
|
2022-03-08 11:11:17 +00:00
|
|
|
#include <base/logger_useful.h>
|
|
|
|
#include <Poco/Logger.h>
|
2022-03-04 10:20:57 +00:00
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
2022-03-08 11:11:17 +00:00
|
|
|
extern const int CANNOT_SEEK_THROUGH_FILE;
|
|
|
|
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
|
|
|
|
2022-03-04 10:20:57 +00:00
|
|
|
}
|
|
|
|
|
2022-03-30 08:15:20 +00:00
|
|
|
ParallelReadBuffer::ParallelReadBuffer(std::unique_ptr<ReadBufferFactory> reader_factory_, ThreadPool * pool, size_t max_working_readers_)
|
2022-03-09 13:57:33 +00:00
|
|
|
: SeekableReadBufferWithSize(nullptr, 0)
|
|
|
|
, max_working_readers(max_working_readers_)
|
2022-03-30 08:15:20 +00:00
|
|
|
, schedule(threadPoolCallbackRunner(*pool, getWorkerSetup(), getWorkerCleanup()))
|
2022-03-09 13:57:33 +00:00
|
|
|
, reader_factory(std::move(reader_factory_))
|
2022-03-04 10:20:57 +00:00
|
|
|
{
|
2022-03-10 11:06:50 +00:00
|
|
|
std::unique_lock<std::mutex> lock{mutex};
|
|
|
|
addReaders(lock);
|
2022-03-08 11:11:17 +00:00
|
|
|
}
|
|
|
|
|
2022-03-30 08:15:20 +00:00
|
|
|
|
|
|
|
ParallelReadBuffer::WorkerSetup ParallelReadBuffer::getWorkerSetup()
|
|
|
|
{
|
|
|
|
return [this]
|
|
|
|
{
|
|
|
|
std::lock_guard lock{mutex};
|
|
|
|
++active_working_reader;
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
ParallelReadBuffer::WorkerCleanup ParallelReadBuffer::getWorkerCleanup()
|
|
|
|
{
|
|
|
|
return [this]
|
|
|
|
{
|
|
|
|
std::lock_guard lock{mutex};
|
|
|
|
--active_working_reader;
|
|
|
|
if (active_working_reader == 0)
|
|
|
|
{
|
|
|
|
readers_done.notify_all();
|
|
|
|
}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
2022-03-10 11:06:50 +00:00
|
|
|
bool ParallelReadBuffer::addReaderToPool(std::unique_lock<std::mutex> & /*buffer_lock*/)
|
2022-03-08 11:11:17 +00:00
|
|
|
{
|
2022-03-10 11:06:50 +00:00
|
|
|
auto reader = reader_factory->getReader();
|
|
|
|
if (!reader)
|
|
|
|
{
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2022-03-16 14:59:06 +00:00
|
|
|
auto worker = read_workers.emplace_back(std::make_shared<ReadWorker>(std::move(reader)));
|
2022-03-16 09:26:01 +00:00
|
|
|
|
2022-03-30 08:15:20 +00:00
|
|
|
schedule([this, worker = std::move(worker)]() mutable { readerThreadFunction(std::move(worker)); });
|
2022-03-16 09:26:01 +00:00
|
|
|
|
2022-03-10 11:06:50 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
void ParallelReadBuffer::addReaders(std::unique_lock<std::mutex> & buffer_lock)
|
|
|
|
{
|
|
|
|
while (read_workers.size() < max_working_readers && addReaderToPool(buffer_lock))
|
|
|
|
;
|
2022-03-04 10:20:57 +00:00
|
|
|
}
|
|
|
|
|
2022-03-08 11:11:17 +00:00
|
|
|
off_t ParallelReadBuffer::seek(off_t offset, int whence)
|
|
|
|
{
|
|
|
|
if (whence != SEEK_SET)
|
|
|
|
throw Exception("Only SEEK_SET mode is allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
|
|
|
|
|
|
|
|
if (offset < 0)
|
|
|
|
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
|
|
|
|
|
2022-03-09 09:25:07 +00:00
|
|
|
if (!working_buffer.empty() && static_cast<size_t>(offset) >= current_position - working_buffer.size() && offset < current_position)
|
2022-03-08 11:11:17 +00:00
|
|
|
{
|
|
|
|
pos = working_buffer.end() - (current_position - offset);
|
|
|
|
assert(pos >= working_buffer.begin());
|
|
|
|
assert(pos <= working_buffer.end());
|
|
|
|
|
|
|
|
return offset;
|
|
|
|
}
|
|
|
|
|
2022-03-11 13:38:19 +00:00
|
|
|
std::unique_lock lock{mutex};
|
2022-03-08 11:11:17 +00:00
|
|
|
const auto offset_is_in_range
|
2022-03-16 14:59:06 +00:00
|
|
|
= [&](const auto & range) { return static_cast<size_t>(offset) >= range.left && static_cast<size_t>(offset) <= *range.right; };
|
2022-03-08 11:11:17 +00:00
|
|
|
|
|
|
|
while (!read_workers.empty() && (offset < current_position || !offset_is_in_range(read_workers.front()->range)))
|
|
|
|
{
|
2022-03-18 09:28:56 +00:00
|
|
|
read_workers.front()->cancel = true;
|
2022-03-08 11:11:17 +00:00
|
|
|
read_workers.pop_front();
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!read_workers.empty())
|
|
|
|
{
|
|
|
|
auto & front_worker = read_workers.front();
|
|
|
|
auto & segments = front_worker->segments;
|
2022-03-16 14:59:06 +00:00
|
|
|
current_position = front_worker->range.left;
|
2022-03-08 11:11:17 +00:00
|
|
|
while (true)
|
|
|
|
{
|
2022-03-09 13:26:40 +00:00
|
|
|
next_condvar.wait(lock, [&] { return emergency_stop || !segments.empty(); });
|
|
|
|
|
|
|
|
if (emergency_stop)
|
|
|
|
handleEmergencyStop();
|
2022-03-08 11:11:17 +00:00
|
|
|
|
2022-03-09 09:25:07 +00:00
|
|
|
auto next_segment = front_worker->nextSegment();
|
|
|
|
if (static_cast<size_t>(offset) < current_position + next_segment.size())
|
2022-03-08 11:11:17 +00:00
|
|
|
{
|
2022-03-09 09:25:07 +00:00
|
|
|
current_segment = std::move(next_segment);
|
|
|
|
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
|
|
|
|
current_position += current_segment.size();
|
2022-03-08 11:11:17 +00:00
|
|
|
pos = working_buffer.end() - (current_position - offset);
|
2022-03-16 09:26:01 +00:00
|
|
|
addReaders(lock);
|
2022-03-08 11:11:17 +00:00
|
|
|
return offset;
|
|
|
|
}
|
|
|
|
|
2022-03-09 09:25:07 +00:00
|
|
|
current_position += next_segment.size();
|
2022-03-08 11:11:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
lock.unlock();
|
|
|
|
finishAndWait();
|
|
|
|
|
|
|
|
reader_factory->seek(offset, whence);
|
|
|
|
all_completed = false;
|
2022-03-08 14:17:19 +00:00
|
|
|
read_workers.clear();
|
2022-03-08 11:11:17 +00:00
|
|
|
|
|
|
|
current_position = offset;
|
|
|
|
resetWorkingBuffer();
|
|
|
|
|
|
|
|
emergency_stop = false;
|
2022-03-11 13:38:19 +00:00
|
|
|
|
2022-03-10 11:06:50 +00:00
|
|
|
lock.lock();
|
|
|
|
addReaders(lock);
|
2022-03-08 11:11:17 +00:00
|
|
|
return offset;
|
|
|
|
}
|
|
|
|
|
|
|
|
std::optional<size_t> ParallelReadBuffer::getTotalSize()
|
|
|
|
{
|
|
|
|
std::lock_guard lock{mutex};
|
|
|
|
return reader_factory->getTotalSize();
|
|
|
|
}
|
|
|
|
|
|
|
|
off_t ParallelReadBuffer::getPosition()
|
|
|
|
{
|
|
|
|
return current_position - available();
|
|
|
|
}
|
|
|
|
|
2022-03-11 13:38:19 +00:00
|
|
|
bool ParallelReadBuffer::currentWorkerReady() const
|
|
|
|
{
|
2022-03-16 14:59:06 +00:00
|
|
|
assert(!read_workers.empty());
|
|
|
|
return read_workers.front()->finished || !read_workers.front()->segments.empty();
|
2022-03-11 13:38:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
bool ParallelReadBuffer::currentWorkerCompleted() const
|
|
|
|
{
|
2022-03-16 14:59:06 +00:00
|
|
|
assert(!read_workers.empty());
|
|
|
|
return read_workers.front()->finished && read_workers.front()->segments.empty();
|
2022-03-11 13:38:19 +00:00
|
|
|
}
|
|
|
|
|
2022-03-09 13:26:40 +00:00
|
|
|
void ParallelReadBuffer::handleEmergencyStop()
|
|
|
|
{
|
2022-03-16 14:59:06 +00:00
|
|
|
// this can only be called from the main thread when there is an exception
|
|
|
|
assert(background_exception);
|
2022-03-09 13:26:40 +00:00
|
|
|
if (background_exception)
|
|
|
|
std::rethrow_exception(background_exception);
|
|
|
|
}
|
|
|
|
|
2022-03-04 10:20:57 +00:00
|
|
|
bool ParallelReadBuffer::nextImpl()
|
|
|
|
{
|
|
|
|
if (all_completed)
|
|
|
|
return false;
|
|
|
|
|
|
|
|
while (true)
|
|
|
|
{
|
2022-03-08 11:11:17 +00:00
|
|
|
std::unique_lock lock(mutex);
|
|
|
|
next_condvar.wait(
|
|
|
|
lock,
|
|
|
|
[this]()
|
|
|
|
{
|
|
|
|
/// Check if no more readers left or current reader can be processed
|
2022-03-10 11:06:50 +00:00
|
|
|
return emergency_stop || currentWorkerReady();
|
2022-03-08 11:11:17 +00:00
|
|
|
});
|
2022-03-04 10:20:57 +00:00
|
|
|
|
2022-03-09 09:25:07 +00:00
|
|
|
bool worker_removed = false;
|
2022-03-04 10:20:57 +00:00
|
|
|
/// Remove completed units
|
2022-03-16 14:59:06 +00:00
|
|
|
while (!read_workers.empty() && currentWorkerCompleted() && !emergency_stop)
|
2022-03-09 09:25:07 +00:00
|
|
|
{
|
2022-03-04 10:20:57 +00:00
|
|
|
read_workers.pop_front();
|
2022-03-09 09:25:07 +00:00
|
|
|
worker_removed = true;
|
|
|
|
}
|
|
|
|
|
2022-03-16 14:59:06 +00:00
|
|
|
if (emergency_stop)
|
|
|
|
handleEmergencyStop();
|
|
|
|
|
2022-03-09 09:25:07 +00:00
|
|
|
if (worker_removed)
|
2022-03-10 11:06:50 +00:00
|
|
|
addReaders(lock);
|
2022-03-04 10:20:57 +00:00
|
|
|
|
|
|
|
/// All readers processed, stop
|
2022-03-10 11:06:50 +00:00
|
|
|
if (read_workers.empty())
|
2022-03-04 10:20:57 +00:00
|
|
|
{
|
|
|
|
all_completed = true;
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2022-03-08 11:11:17 +00:00
|
|
|
auto & front_worker = read_workers.front();
|
2022-03-04 10:20:57 +00:00
|
|
|
/// Read data from first segment of the first reader
|
2022-03-08 11:11:17 +00:00
|
|
|
if (!front_worker->segments.empty())
|
2022-03-04 10:20:57 +00:00
|
|
|
{
|
2022-03-09 09:25:07 +00:00
|
|
|
current_segment = front_worker->nextSegment();
|
2022-03-16 11:44:07 +00:00
|
|
|
if (currentWorkerCompleted())
|
|
|
|
{
|
|
|
|
read_workers.pop_front();
|
|
|
|
all_completed = !addReaderToPool(lock) && read_workers.empty();
|
|
|
|
}
|
2022-03-04 10:20:57 +00:00
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
2022-03-09 09:25:07 +00:00
|
|
|
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
|
2022-03-08 11:11:17 +00:00
|
|
|
current_position += working_buffer.size();
|
2022-03-04 10:20:57 +00:00
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2022-03-10 11:06:50 +00:00
|
|
|
void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
|
2022-03-04 10:20:57 +00:00
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
2022-03-18 09:28:56 +00:00
|
|
|
while (!emergency_stop && !read_worker->cancel)
|
2022-03-04 10:20:57 +00:00
|
|
|
{
|
|
|
|
if (!read_worker->reader->next())
|
2022-03-30 08:15:20 +00:00
|
|
|
throw Exception(
|
|
|
|
ErrorCodes::LOGICAL_ERROR, "Failed to read all the data from the reader, missing {} bytes", read_worker->bytes_left);
|
2022-03-04 10:20:57 +00:00
|
|
|
|
2022-03-18 09:28:56 +00:00
|
|
|
if (emergency_stop || read_worker->cancel)
|
2022-03-04 10:20:57 +00:00
|
|
|
break;
|
|
|
|
|
|
|
|
Buffer buffer = read_worker->reader->buffer();
|
2022-03-16 11:44:07 +00:00
|
|
|
size_t bytes_to_copy = std::min(buffer.size(), read_worker->bytes_left);
|
|
|
|
Segment new_segment(bytes_to_copy, &arena);
|
|
|
|
memcpy(new_segment.data(), buffer.begin(), bytes_to_copy);
|
|
|
|
read_worker->reader->ignore(bytes_to_copy);
|
|
|
|
read_worker->bytes_left -= bytes_to_copy;
|
2022-03-04 10:20:57 +00:00
|
|
|
{
|
|
|
|
/// New data ready to be read
|
2022-03-08 11:11:17 +00:00
|
|
|
std::lock_guard lock(mutex);
|
2022-03-04 10:20:57 +00:00
|
|
|
read_worker->segments.emplace_back(std::move(new_segment));
|
2022-03-16 11:44:07 +00:00
|
|
|
read_worker->finished = read_worker->bytes_left == 0;
|
2022-03-04 10:20:57 +00:00
|
|
|
next_condvar.notify_all();
|
|
|
|
}
|
2022-03-16 11:44:07 +00:00
|
|
|
|
|
|
|
if (read_worker->finished)
|
|
|
|
{
|
|
|
|
break;
|
|
|
|
}
|
2022-03-04 10:20:57 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
onBackgroundException();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void ParallelReadBuffer::onBackgroundException()
|
|
|
|
{
|
2022-03-08 11:11:17 +00:00
|
|
|
std::lock_guard lock(mutex);
|
2022-03-04 10:20:57 +00:00
|
|
|
if (!background_exception)
|
|
|
|
{
|
|
|
|
background_exception = std::current_exception();
|
|
|
|
}
|
|
|
|
emergency_stop = true;
|
2022-03-09 13:57:33 +00:00
|
|
|
next_condvar.notify_all();
|
2022-03-04 10:20:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
void ParallelReadBuffer::finishAndWait()
|
|
|
|
{
|
|
|
|
emergency_stop = true;
|
2022-03-09 13:57:33 +00:00
|
|
|
|
|
|
|
std::unique_lock lock{mutex};
|
|
|
|
readers_done.wait(lock, [&] { return active_working_reader == 0; });
|
2022-03-04 10:20:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
}
|