From 7c570f2e48f6f7a360810c493f03597fd5c5e066 Mon Sep 17 00:00:00 2001 From: kssenii Date: Sun, 10 Oct 2021 22:13:49 +0300 Subject: [PATCH] Add retries for http read init --- src/IO/ReadBufferFromWebServer.cpp | 64 ++++++++++++++++++++++-------- src/IO/ReadBufferFromWebServer.h | 2 + 2 files changed, 49 insertions(+), 17 deletions(-) diff --git a/src/IO/ReadBufferFromWebServer.cpp b/src/IO/ReadBufferFromWebServer.cpp index 9a7874997ef..31b8f7da1c7 100644 --- a/src/IO/ReadBufferFromWebServer.cpp +++ b/src/IO/ReadBufferFromWebServer.cpp @@ -40,9 +40,12 @@ std::unique_ptr ReadBufferFromWebServer::initialize() Poco::URI uri(url); ReadWriteBufferFromHTTP::HTTPHeaderEntries headers; + + // read_settings.remote_read_max_bytes = 100000000; headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset))); - const auto & settings = context->getSettingsRef(); LOG_DEBUG(log, "Reading from offset: {}", offset); + + const auto & settings = context->getSettingsRef(); const auto & config = context->getConfigRef(); Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0}; @@ -65,6 +68,48 @@ std::unique_ptr ReadBufferFromWebServer::initialize() } +void ReadBufferFromWebServer::initializeWithRetry() +{ + /// Initialize impl with retry. + auto num_tries = std::max(read_settings.http_max_tries, HTTP_MAX_TRIES); + size_t milliseconds_to_wait = read_settings.http_retry_initial_backoff_ms; + bool initialized = false; + for (size_t i = 0; (i < num_tries) && !initialized; ++i) + { + while (milliseconds_to_wait < read_settings.http_retry_max_backoff_ms) + { + try + { + impl = initialize(); + + if (use_external_buffer) + { + /** + * See comment at line 120. + */ + impl->set(internal_buffer.begin(), internal_buffer.size()); + assert(working_buffer.begin() != nullptr); + assert(!internal_buffer.empty()); + } + + initialized = true; + break; + } + catch (Poco::Exception & e) + { + if (i == num_tries - 1) + throw; + + LOG_ERROR(&Poco::Logger::get("ReadBufferFromWeb"), e.what()); + sleepForMilliseconds(milliseconds_to_wait); + milliseconds_to_wait *= 2; + } + } + milliseconds_to_wait = read_settings.http_retry_initial_backoff_ms; + } +} + + bool ReadBufferFromWebServer::nextImpl() { if (impl) @@ -95,22 +140,7 @@ bool ReadBufferFromWebServer::nextImpl() } else { - /// Initialize impl with retry. - auto num_tries = std::max(read_settings.http_max_tries, HTTP_MAX_TRIES); - for (size_t i = 0; i < num_tries; ++i) - { - try - { - impl = initialize(); - } - catch (Poco::Exception &) - { - if (i == num_tries - 1) - throw; - - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } + initializeWithRetry(); } auto result = impl->next(); diff --git a/src/IO/ReadBufferFromWebServer.h b/src/IO/ReadBufferFromWebServer.h index 61e5ebfbb51..780a4b16442 100644 --- a/src/IO/ReadBufferFromWebServer.h +++ b/src/IO/ReadBufferFromWebServer.h @@ -31,6 +31,8 @@ public: private: std::unique_ptr initialize(); + void initializeWithRetry(); + Poco::Logger * log; ContextPtr context;