From df0f5e20d0afd2f7e45fd6bccc6c84cd9ffd4fa5 Mon Sep 17 00:00:00 2001 From: Antonio Andelic Date: Wed, 9 Mar 2022 09:25:07 +0000 Subject: [PATCH] Refactor --- src/Common/ArenaWithFreeLists.h | 36 ++++++++++++++++++-- src/IO/ParallelReadBuffer.cpp | 45 +++++++++++++------------ src/IO/ParallelReadBuffer.h | 60 ++++++++++++++++++++++++++++++--- src/Storages/StorageURL.cpp | 2 +- 4 files changed, 113 insertions(+), 30 deletions(-) diff --git a/src/Common/ArenaWithFreeLists.h b/src/Common/ArenaWithFreeLists.h index 1284c3586c0..8bb8fd4b130 100644 --- a/src/Common/ArenaWithFreeLists.h +++ b/src/Common/ArenaWithFreeLists.h @@ -49,7 +49,7 @@ private: Block * free_lists[16] {}; public: - ArenaWithFreeLists( + explicit ArenaWithFreeLists( const size_t initial_size = 4096, const size_t growth_factor = 2, const size_t linear_growth_threshold = 128 * 1024 * 1024) : pool{initial_size, growth_factor, linear_growth_threshold} @@ -74,7 +74,7 @@ public: ASAN_UNPOISON_MEMORY_REGION(free_block_ptr, std::max(size, sizeof(Block))); - const auto res = free_block_ptr->data; + auto * res = free_block_ptr->data; free_block_ptr = free_block_ptr->next; return res; } @@ -93,7 +93,7 @@ public: /// Insert the released block into the head of the list. auto & free_block_ptr = free_lists[list_idx]; - const auto old_head = free_block_ptr; + auto * old_head = free_block_ptr; free_block_ptr = reinterpret_cast(ptr); free_block_ptr->next = old_head; @@ -113,5 +113,35 @@ public: } }; +class SynchronizedArenaWithFreeLists : private ArenaWithFreeLists +{ +public: + explicit SynchronizedArenaWithFreeLists( + const size_t initial_size = 4096, const size_t growth_factor = 2, + const size_t linear_growth_threshold = 128 * 1024 * 1024) + : ArenaWithFreeLists{initial_size, growth_factor, linear_growth_threshold} + {} + + char * alloc(const size_t size) + { + std::lock_guard lock{mutex}; + return ArenaWithFreeLists::alloc(size); + } + + void free(char * ptr, const size_t size) + { + std::lock_guard lock{mutex}; + return ArenaWithFreeLists::free(ptr, size); + } + + /// Size of the allocated pool in bytes + size_t size() const + { + std::lock_guard lock{mutex}; + return ArenaWithFreeLists::size(); + } +private: + mutable std::mutex mutex; +}; } diff --git a/src/IO/ParallelReadBuffer.cpp b/src/IO/ParallelReadBuffer.cpp index e2610c094e9..f50fb9417cd 100644 --- a/src/IO/ParallelReadBuffer.cpp +++ b/src/IO/ParallelReadBuffer.cpp @@ -33,7 +33,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence) if (offset < 0) throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND); - if (!working_buffer.empty() && size_t(offset) >= current_position - working_buffer.size() && offset < current_position) + if (!working_buffer.empty() && static_cast(offset) >= current_position - working_buffer.size() && offset < current_position) { pos = working_buffer.end() - (current_position - offset); assert(pos >= working_buffer.begin()); @@ -47,11 +47,16 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence) = [&](const auto & range) { return static_cast(offset) >= range.from && static_cast(offset) < range.to; }; std::unique_lock lock{mutex}; + bool worker_removed = false; while (!read_workers.empty() && (offset < current_position || !offset_is_in_range(read_workers.front()->range))) { read_workers.pop_front(); + worker_removed = true; } + if (worker_removed) + reader_condvar.notify_all(); + if (!read_workers.empty()) { auto & front_worker = read_workers.front(); @@ -61,21 +66,17 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence) { next_condvar.wait(lock, [&] { return !segments.empty(); }); - if (static_cast(offset) < current_position + segments.front().size()) + auto next_segment = front_worker->nextSegment(); + if (static_cast(offset) < current_position + next_segment.size()) { - arena.free(segment->data(), segment->size()); - segment = std::move(segments.front()); - segments.pop_front(); - working_buffer = internal_buffer = Buffer(segment->data(), segment->data() + segment->size()); - current_position += segment->size(); - front_worker->range.from += segment->size(); + current_segment = std::move(next_segment); + working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size()); + current_position += current_segment.size(); pos = working_buffer.end() - (current_position - offset); return offset; } - current_position += segments.front().size(); - front_worker->range.from += segments.front().size(); - segments.pop_front(); + current_position += next_segment.size(); } } @@ -130,9 +131,16 @@ bool ParallelReadBuffer::nextImpl() throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Emergency stop"); } + bool worker_removed = false; /// Remove completed units while (!read_workers.empty() && currentWorkerCompleted()) + { read_workers.pop_front(); + worker_removed = true; + } + + if (worker_removed) + reader_condvar.notify_all(); /// All readers processed, stop if (read_workers.empty() && all_created) @@ -145,17 +153,11 @@ bool ParallelReadBuffer::nextImpl() /// Read data from first segment of the first reader if (!front_worker->segments.empty()) { - if (segment) - { - arena.free(segment->data(), segment->size()); - } - segment = std::move(front_worker->segments.front()); - front_worker->range.from += segment->size(); - front_worker->segments.pop_front(); + current_segment = front_worker->nextSegment(); break; } } - working_buffer = internal_buffer = Buffer(segment->data(), segment->data() + segment->size()); + working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size()); current_position += working_buffer.size(); return true; } @@ -168,7 +170,8 @@ void ParallelReadBuffer::processor() { /// Create new read worker and put in into end of queue /// reader_factory is not thread safe, so we call getReader under lock - std::lock_guard lock(mutex); + std::unique_lock lock(mutex); + reader_condvar.wait(lock, [this] { return read_workers.size() < pool.getMaxThreads(); }); auto reader = reader_factory->getReader(); if (!reader) { @@ -203,7 +206,7 @@ void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker) break; Buffer buffer = read_worker->reader->buffer(); - std::span new_segment(arena.alloc(buffer.size()), buffer.size()); + Segment new_segment(buffer.size(), &arena); memcpy(new_segment.data(), buffer.begin(), buffer.size()); { /// New data ready to be read diff --git a/src/IO/ParallelReadBuffer.h b/src/IO/ParallelReadBuffer.h index 13808a8b29c..6662b1f4f5a 100644 --- a/src/IO/ParallelReadBuffer.h +++ b/src/IO/ParallelReadBuffer.h @@ -6,8 +6,6 @@ #include "Common/ArenaWithFreeLists.h" #include -#include - namespace DB { @@ -32,6 +30,47 @@ private: void initializeWorkers(); + class Segment + { + public: + Segment(size_t size_, SynchronizedArenaWithFreeLists * arena_) : arena(arena_), m_data(arena->alloc(size_)), m_size(size_) { } + + Segment() = default; + + Segment(const Segment &) = delete; + Segment & operator=(const Segment &) = delete; + + Segment(Segment && other) noexcept : arena(other.arena) + { + std::swap(m_data, other.m_data); + std::swap(m_size, other.m_size); + } + + Segment & operator=(Segment && other) noexcept + { + arena = other.arena; + std::swap(m_data, other.m_data); + std::swap(m_size, other.m_size); + return *this; + } + + ~Segment() + { + if (m_data) + { + arena->free(m_data, m_size); + } + } + + auto data() const noexcept { return m_data; } + auto size() const noexcept { return m_size; } + + private: + SynchronizedArenaWithFreeLists * arena{nullptr}; + char * m_data{nullptr}; + size_t m_size{0}; + }; + public: struct Range { @@ -64,8 +103,17 @@ private: { explicit ReadWorker(ReadBufferPtr reader_, const Range & range_) : reader(reader_), range(range_) { } + Segment nextSegment() + { + assert(!segments.empty()); + auto next_segment = std::move(segments.front()); + segments.pop_front(); + range.from += next_segment.size(); + return next_segment; + } + ReadBufferPtr reader; - std::deque> segments; + std::deque segments; bool finished{false}; Range range; }; @@ -93,8 +141,9 @@ private: void onBackgroundException(); void finishAndWait(); - ArenaWithFreeLists arena; - std::optional> segment; + SynchronizedArenaWithFreeLists arena; + + Segment current_segment; ThreadPool pool; @@ -107,6 +156,7 @@ private: * deque and data from next reader will be consumed to user. */ std::deque read_workers; + std::condition_variable reader_condvar; std::mutex mutex; /// Triggered when new data available diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 5182d2ce68d..c90cf65bc6f 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -306,7 +306,7 @@ namespace /* use_external_buffer */ false, /* skip_url_not_found_error */ skip_url_not_found_error); return wrapReadBufferWithCompressionMethod( - std::make_unique(std::move(read_buffer_factory), 4), + std::make_unique(std::move(read_buffer_factory), 8), chooseCompressionMethod(request_uri.getPath(), compression_method)); }