Initial implementation with Arena

This commit is contained in:
Antonio Andelic 2022-03-08 16:04:19 +00:00
parent 16c20332cf
commit 7b526b25a8
2 changed files with 17 additions and 8 deletions

View File

@ -63,11 +63,12 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
if (static_cast<size_t>(offset) < current_position + segments.front().size()) if (static_cast<size_t>(offset) < current_position + segments.front().size())
{ {
arena.free(segment->data(), segment->size());
segment = std::move(segments.front()); segment = std::move(segments.front());
segments.pop_front(); segments.pop_front();
working_buffer = internal_buffer = Buffer(segment.data(), segment.data() + segment.size()); working_buffer = internal_buffer = Buffer(segment->data(), segment->data() + segment->size());
current_position += segment.size(); current_position += segment->size();
front_worker->range.from += segment.size(); front_worker->range.from += segment->size();
pos = working_buffer.end() - (current_position - offset); pos = working_buffer.end() - (current_position - offset);
return offset; return offset;
} }
@ -144,13 +145,17 @@ bool ParallelReadBuffer::nextImpl()
/// Read data from first segment of the first reader /// Read data from first segment of the first reader
if (!front_worker->segments.empty()) if (!front_worker->segments.empty())
{ {
if (segment)
{
arena.free(segment->data(), segment->size());
}
segment = std::move(front_worker->segments.front()); segment = std::move(front_worker->segments.front());
front_worker->range.from += segment.size(); front_worker->range.from += segment->size();
front_worker->segments.pop_front(); front_worker->segments.pop_front();
break; break;
} }
} }
working_buffer = internal_buffer = Buffer(segment.data(), segment.data() + segment.size()); working_buffer = internal_buffer = Buffer(segment->data(), segment->data() + segment->size());
current_position += working_buffer.size(); current_position += working_buffer.size();
return true; return true;
} }
@ -198,7 +203,7 @@ void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
break; break;
Buffer buffer = read_worker->reader->buffer(); Buffer buffer = read_worker->reader->buffer();
Memory<> new_segment(buffer.size()); std::span new_segment(arena.alloc(buffer.size()), buffer.size());
memcpy(new_segment.data(), buffer.begin(), buffer.size()); memcpy(new_segment.data(), buffer.begin(), buffer.size());
{ {
/// New data ready to be read /// New data ready to be read

View File

@ -3,8 +3,11 @@
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include "Common/ArenaWithFreeLists.h"
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <span>
namespace DB namespace DB
{ {
@ -62,7 +65,7 @@ private:
explicit ReadWorker(ReadBufferPtr reader_, const Range & range_) : reader(reader_), range(range_) { } explicit ReadWorker(ReadBufferPtr reader_, const Range & range_) : reader(reader_), range(range_) { }
ReadBufferPtr reader; ReadBufferPtr reader;
std::deque<Memory<>> segments; std::deque<std::span<char>> segments;
bool finished{false}; bool finished{false};
Range range; Range range;
}; };
@ -90,7 +93,8 @@ private:
void onBackgroundException(); void onBackgroundException();
void finishAndWait(); void finishAndWait();
Memory<> segment; ArenaWithFreeLists arena;
std::optional<std::span<char>> segment;
ThreadPool pool; ThreadPool pool;