From efddc03246e02c2dbcf35cbe0643ea436c0780f0 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sun, 26 Sep 2021 22:49:28 +0300 Subject: [PATCH] Async read for disk web --- src/Disks/DiskWebServer.cpp | 17 +++++++++++++---- src/Disks/ReadIndirectBufferFromWebServer.cpp | 15 ++++++++++++--- src/Disks/ReadIndirectBufferFromWebServer.h | 10 ++++++---- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/src/Disks/DiskWebServer.cpp b/src/Disks/DiskWebServer.cpp index 439d59f21a9..9c4747aefdb 100644 --- a/src/Disks/DiskWebServer.cpp +++ b/src/Disks/DiskWebServer.cpp @@ -116,19 +116,22 @@ public: ContextPtr context_, size_t buf_size_, size_t backoff_threshold_, - size_t max_tries_) + size_t max_tries_, + size_t threadpool_read_) : ReadBufferFromRemoteFS(metadata_) , uri(uri_) , context(context_) , buf_size(buf_size_) , backoff_threshold(backoff_threshold_) , max_tries(max_tries_) + , threadpool_read(threadpool_read_) { } SeekableReadBufferPtr createReadBuffer(const String & path) const override { - return std::make_unique(fs::path(uri) / path, context, buf_size, backoff_threshold, max_tries); + return std::make_unique( + fs::path(uri) / path, context, buf_size, backoff_threshold, max_tries, threadpool_read); } private: @@ -137,6 +140,7 @@ private: size_t buf_size; size_t backoff_threshold; size_t max_tries; + bool threadpool_read; }; @@ -198,9 +202,14 @@ std::unique_ptr DiskWebServer::readFile(const String & p RemoteMetadata meta(path, remote_path); meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size)); + bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool; + auto web_impl = std::make_unique(url, meta, getContext(), - read_settings.remote_fs_buffer_size, read_settings.remote_fs_backoff_threshold, read_settings.remote_fs_backoff_max_tries); - if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool) + read_settings.remote_fs_buffer_size, + read_settings.remote_fs_backoff_threshold, read_settings.remote_fs_backoff_max_tries, + threadpool_read); + + if (threadpool_read) { auto reader = IDiskRemote::getThreadPoolReader(); auto buf = std::make_unique(reader, read_settings.priority, std::move(web_impl)); diff --git a/src/Disks/ReadIndirectBufferFromWebServer.cpp b/src/Disks/ReadIndirectBufferFromWebServer.cpp index 809e6d67107..c0d41bb006a 100644 --- a/src/Disks/ReadIndirectBufferFromWebServer.cpp +++ b/src/Disks/ReadIndirectBufferFromWebServer.cpp @@ -24,7 +24,8 @@ static const auto WAIT_MS = 10; ReadIndirectBufferFromWebServer::ReadIndirectBufferFromWebServer( - const String & url_, ContextPtr context_, size_t buf_size_, size_t backoff_threshold_, size_t max_tries_) + const String & url_, ContextPtr context_, size_t buf_size_, + size_t backoff_threshold_, size_t max_tries_, bool use_external_buffer_) : BufferWithOwnMemory(buf_size_) , log(&Poco::Logger::get("ReadIndirectBufferFromWebServer")) , context(context_) @@ -32,6 +33,7 @@ ReadIndirectBufferFromWebServer::ReadIndirectBufferFromWebServer( , buf_size(buf_size_) , backoff_threshold_ms(backoff_threshold_) , max_tries(max_tries_) + , use_external_buffer(use_external_buffer_) { } @@ -70,8 +72,15 @@ bool ReadIndirectBufferFromWebServer::nextImpl() if (impl) { - /// Restore correct position at the needed offset. - impl->position() = position(); + if (use_external_buffer) + { + impl->set(working_buffer.begin(), working_buffer.size()); + } + else + { + impl->position() = position(); + } + assert(!impl->hasPendingData()); } diff --git a/src/Disks/ReadIndirectBufferFromWebServer.h b/src/Disks/ReadIndirectBufferFromWebServer.h index 04bb155f83b..55885427887 100644 --- a/src/Disks/ReadIndirectBufferFromWebServer.h +++ b/src/Disks/ReadIndirectBufferFromWebServer.h @@ -16,10 +16,10 @@ namespace DB class ReadIndirectBufferFromWebServer : public BufferWithOwnMemory { public: - explicit ReadIndirectBufferFromWebServer(const String & url_, - ContextPtr context_, - size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE, - size_t backoff_threshold_ = 10000, size_t max_tries_ = 4); + explicit ReadIndirectBufferFromWebServer( + const String & url_, ContextPtr context_, + size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE, + size_t backoff_threshold_ = 10000, size_t max_tries_ = 4, bool use_external_buffer_ = false); bool nextImpl() override; @@ -42,6 +42,8 @@ private: size_t backoff_threshold_ms; size_t max_tries; + + bool use_external_buffer; }; }