diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index fa5895fd9ec..fe5ce057309 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -126,10 +126,6 @@ private: Poco::Logger * log; bool startup_restore_finished = false; - /** - * Get list of file segments which intesect with `range`. - * If `key` is not in cache or there is not such range, return std::nullopt. - */ FileSegments getImpl( const Key & key, const FileSegment::Range & range, [[maybe_unused]] std::lock_guard & cache_lock); diff --git a/src/Common/tests/gtest_lru_file_cache.cpp b/src/Common/tests/gtest_lru_file_cache.cpp index f73dd77738e..4d41cce8ab9 100644 --- a/src/Common/tests/gtest_lru_file_cache.cpp +++ b/src/Common/tests/gtest_lru_file_cache.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -95,10 +96,7 @@ TEST(LRUFileCache, get) DB::ThreadStatus thread_status; /// To work with cache need query_id and query context. - auto shared_context = DB::Context::createShared(); - auto context = DB::Context::createGlobal(shared_context.get()); - context->makeGlobalContext(); - auto query_context = DB::Context::createCopy(context); + auto query_context = DB::Context::createCopy(getContext().context); query_context->makeQueryContext(); query_context->setCurrentQueryId("query_id"); DB::CurrentThread::QueryScope query_scope_holder(query_context); @@ -343,7 +341,7 @@ TEST(LRUFileCache, get) std::thread other_1([&] { DB::ThreadStatus thread_status_1; - auto query_context_1 = DB::Context::createCopy(context); + auto query_context_1 = DB::Context::createCopy(getContext().context); query_context_1->makeQueryContext(); query_context_1->setCurrentQueryId("query_id_1"); DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1); @@ -410,7 +408,7 @@ TEST(LRUFileCache, get) std::thread other_1([&] { DB::ThreadStatus thread_status_1; - auto query_context_1 = DB::Context::createCopy(context); + auto query_context_1 = DB::Context::createCopy(getContext().context); query_context_1->makeQueryContext(); query_context_1->setCurrentQueryId("query_id_1"); DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1); diff --git a/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp b/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp index 14aa34690ea..17912c9efbb 100644 --- a/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp @@ -34,6 +34,7 @@ CacheableReadBufferFromRemoteFS::CacheableReadBufferFromRemoteFS( , reader(reader_) , settings(settings_) , read_until_position(read_until_position_) + , use_external_buffer(settings_.remote_fs_method == RemoteFSReadMethod::threadpool) { } @@ -227,9 +228,17 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() assert(current_read_range.left <= file_offset_of_buffer_end); assert(current_read_range.right >= file_offset_of_buffer_end); - assert(!internal_buffer.empty()); - swap(*impl); + if (use_external_buffer) + { + assert(!internal_buffer.empty()); + swap(*impl); + } + else + { + impl->position() = position(); + assert(!impl->hasPendingData()); + } bool result; auto & file_segment = *current_file_segment_it; @@ -292,7 +301,10 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() } } - swap(*impl); + if (use_external_buffer) + swap(*impl); + else + BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); if (file_offset_of_buffer_end > current_read_range.right) completeFileSegmentAndGetNext(); diff --git a/src/Disks/IO/CacheableReadBufferFromRemoteFS.h b/src/Disks/IO/CacheableReadBufferFromRemoteFS.h index c3813a73729..aeed5098971 100644 --- a/src/Disks/IO/CacheableReadBufferFromRemoteFS.h +++ b/src/Disks/IO/CacheableReadBufferFromRemoteFS.h @@ -52,6 +52,9 @@ private: SeekableReadBufferPtr impl; bool initialized = false; + /// Flag to identify usage of threadpool reads + bool use_external_buffer; + enum class ReadType { CACHED, diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 315772eedf6..4c5392c3818 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -138,8 +138,6 @@ void ReadBufferFromRemoteFSGather::initialize() bool ReadBufferFromRemoteFSGather::nextImpl() { - assert(!internal_buffer.empty()); - /// Find first available buffer that fits to given offset. if (!current_buf) initialize(); diff --git a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp index 064168553a8..cbea047dfa9 100644 --- a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp @@ -14,8 +14,7 @@ namespace ErrorCodes ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS( std::shared_ptr impl_) - : ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0) - , impl(std::move(impl_)) + : impl(std::move(impl_)) { } @@ -74,8 +73,6 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) bool ReadIndirectBufferFromRemoteFS::nextImpl() { - assert(!internal_buffer.empty()); - /// Transfer current position and working_buffer to actual ReadBuffer swap(*impl); /// Position and working_buffer will be updated in next() call