From e3d2942aa60b99ef7c81fb7bc88dc6cae6652340 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sun, 10 Oct 2021 20:24:36 +0300 Subject: [PATCH] Correct merge --- src/Disks/DiskWebServer.cpp | 4 +- src/Disks/ReadBufferFromRemoteFSGather.cpp | 2 +- src/Disks/ReadBufferFromRemoteFSGather.h | 15 ++---- src/IO/ReadBufferFromWebServer.cpp | 19 +++++++- src/IO/ReadBufferFromWebServer.h | 4 +- src/IO/ReadWriteBufferFromHTTP.h | 55 +++++++++++++--------- 6 files changed, 61 insertions(+), 38 deletions(-) diff --git a/src/Disks/DiskWebServer.cpp b/src/Disks/DiskWebServer.cpp index f663229f689..4c98df561f1 100644 --- a/src/Disks/DiskWebServer.cpp +++ b/src/Disks/DiskWebServer.cpp @@ -168,9 +168,7 @@ std::unique_ptr DiskWebServer::readFile(const String & p bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool; - auto web_impl = std::make_unique(url, meta, getContext(), - read_settings, - threadpool_read); + auto web_impl = std::make_unique(url, meta, getContext(), threadpool_read, read_settings); if (threadpool_read) { diff --git a/src/Disks/ReadBufferFromRemoteFSGather.cpp b/src/Disks/ReadBufferFromRemoteFSGather.cpp index c923e484ee0..5038698d1e7 100644 --- a/src/Disks/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/ReadBufferFromRemoteFSGather.cpp @@ -36,7 +36,7 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path) const { - return std::make_unique(fs::path(uri) / path, context, buf_size, backoff_threshold, max_tries, threadpool_read); + return std::make_unique(fs::path(uri) / path, context, settings, threadpool_read); } diff --git a/src/Disks/ReadBufferFromRemoteFSGather.h b/src/Disks/ReadBufferFromRemoteFSGather.h index 80bdfe4775d..28ea347352a 100644 --- a/src/Disks/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/ReadBufferFromRemoteFSGather.h @@ -6,6 +6,7 @@ #include #include +#include namespace Aws { @@ -92,17 +93,13 @@ public: const String & uri_, RemoteMetadata metadata_, ContextPtr context_, - size_t buf_size_, - size_t backoff_threshold_, - size_t max_tries_, - size_t threadpool_read_) + size_t threadpool_read_, + const ReadSettings & settings_) : ReadBufferFromRemoteFSGather(metadata_) , uri(uri_) , context(context_) - , buf_size(buf_size_) - , backoff_threshold(backoff_threshold_) - , max_tries(max_tries_) , threadpool_read(threadpool_read_) + , settings(settings_) { } @@ -111,10 +108,8 @@ public: private: String uri; ContextPtr context; - size_t buf_size; - size_t backoff_threshold; - size_t max_tries; bool threadpool_read; + ReadSettings settings; }; diff --git a/src/IO/ReadBufferFromWebServer.cpp b/src/IO/ReadBufferFromWebServer.cpp index 693a2842a08..9a7874997ef 100644 --- a/src/IO/ReadBufferFromWebServer.cpp +++ b/src/IO/ReadBufferFromWebServer.cpp @@ -20,6 +20,8 @@ namespace ErrorCodes } +static constexpr size_t HTTP_MAX_TRIES = 10; + ReadBufferFromWebServer::ReadBufferFromWebServer( const String & url_, ContextPtr context_, const ReadSettings & settings_, bool use_external_buffer_) : SeekableReadBuffer(nullptr, 0) @@ -93,7 +95,22 @@ bool ReadBufferFromWebServer::nextImpl() } else { - impl = initialize(); + /// Initialize impl with retry. + auto num_tries = std::max(read_settings.http_max_tries, HTTP_MAX_TRIES); + for (size_t i = 0; i < num_tries; ++i) + { + try + { + impl = initialize(); + } + catch (Poco::Exception &) + { + if (i == num_tries - 1) + throw; + + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } } auto result = impl->next(); diff --git a/src/IO/ReadBufferFromWebServer.h b/src/IO/ReadBufferFromWebServer.h index c7970f9f856..61e5ebfbb51 100644 --- a/src/IO/ReadBufferFromWebServer.h +++ b/src/IO/ReadBufferFromWebServer.h @@ -41,9 +41,9 @@ private: off_t offset = 0; - bool use_external_buffer; - ReadSettings read_settings; + + bool use_external_buffer; }; } diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index 8f3fde11eaa..c61aebd040c 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -239,6 +239,16 @@ namespace detail try { impl = std::make_unique(*istr, buffer_size); + + if (use_external_buffer) + { + /** + * See comment 30 lines lower. + */ + impl->set(internal_buffer.begin(), internal_buffer.size()); + assert(working_buffer.begin() != nullptr); + assert(!internal_buffer.empty()); + } } catch (const Poco::Exception & e) { @@ -255,28 +265,31 @@ namespace detail if (next_callback) next_callback(count()); - if (use_external_buffer) + if (impl) { - /** - * use_external_buffer -- means we read into the buffer which - * was passed to us from somewhere else. We do not check whether - * previously returned buffer was read or not, because this branch - * means we are prefetching data, each nextImpl() call we can fill - * a different buffer. - */ - impl->set(internal_buffer.begin(), internal_buffer.size()); - assert(working_buffer.begin() != nullptr); - assert(!internal_buffer.empty()); - } - else - { - /** - * impl was initialized before, pass position() to it to make - * sure there is no pending data which was not read, becuase - * this branch means we read sequentially. - */ - if (!working_buffer.empty()) - impl->position() = position(); + if (use_external_buffer) + { + /** + * use_external_buffer -- means we read into the buffer which + * was passed to us from somewhere else. We do not check whether + * previously returned buffer was read or not, because this branch + * means we are prefetching data, each nextImpl() call we can fill + * a different buffer. + */ + impl->set(internal_buffer.begin(), internal_buffer.size()); + assert(working_buffer.begin() != nullptr); + assert(!internal_buffer.empty()); + } + else + { + /** + * impl was initialized before, pass position() to it to make + * sure there is no pending data which was not read, becuase + * this branch means we read sequentially. + */ + if (!working_buffer.empty()) + impl->position() = position(); + } } if (total_bytes_to_read && bytes_read == total_bytes_to_read.value())