diff --git a/src/IO/ParallelReadBuffer.cpp b/src/IO/ParallelReadBuffer.cpp index e47ce054a35..e2610c094e9 100644 --- a/src/IO/ParallelReadBuffer.cpp +++ b/src/IO/ParallelReadBuffer.cpp @@ -63,11 +63,12 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence) if (static_cast(offset) < current_position + segments.front().size()) { + arena.free(segment->data(), segment->size()); segment = std::move(segments.front()); segments.pop_front(); - working_buffer = internal_buffer = Buffer(segment.data(), segment.data() + segment.size()); - current_position += segment.size(); - front_worker->range.from += segment.size(); + working_buffer = internal_buffer = Buffer(segment->data(), segment->data() + segment->size()); + current_position += segment->size(); + front_worker->range.from += segment->size(); pos = working_buffer.end() - (current_position - offset); return offset; } @@ -144,13 +145,17 @@ bool ParallelReadBuffer::nextImpl() /// Read data from first segment of the first reader if (!front_worker->segments.empty()) { + if (segment) + { + arena.free(segment->data(), segment->size()); + } segment = std::move(front_worker->segments.front()); - front_worker->range.from += segment.size(); + front_worker->range.from += segment->size(); front_worker->segments.pop_front(); break; } } - working_buffer = internal_buffer = Buffer(segment.data(), segment.data() + segment.size()); + working_buffer = internal_buffer = Buffer(segment->data(), segment->data() + segment->size()); current_position += working_buffer.size(); return true; } @@ -198,7 +203,7 @@ void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker) break; Buffer buffer = read_worker->reader->buffer(); - Memory<> new_segment(buffer.size()); + std::span new_segment(arena.alloc(buffer.size()), buffer.size()); memcpy(new_segment.data(), buffer.begin(), buffer.size()); { /// New data ready to be read diff --git a/src/IO/ParallelReadBuffer.h b/src/IO/ParallelReadBuffer.h index 203f7413f4b..13808a8b29c 100644 --- a/src/IO/ParallelReadBuffer.h +++ b/src/IO/ParallelReadBuffer.h @@ -3,8 +3,11 @@ #include #include #include +#include "Common/ArenaWithFreeLists.h" #include +#include + namespace DB { @@ -62,7 +65,7 @@ private: explicit ReadWorker(ReadBufferPtr reader_, const Range & range_) : reader(reader_), range(range_) { } ReadBufferPtr reader; - std::deque> segments; + std::deque> segments; bool finished{false}; Range range; }; @@ -90,7 +93,8 @@ private: void onBackgroundException(); void finishAndWait(); - Memory<> segment; + ArenaWithFreeLists arena; + std::optional> segment; ThreadPool pool;