Add retries for http read init

This commit is contained in:
kssenii 2021-10-10 22:13:49 +03:00
parent e3d2942aa6
commit 7c570f2e48
2 changed files with 49 additions and 17 deletions

View File

@ -40,9 +40,12 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
Poco::URI uri(url);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
// read_settings.remote_read_max_bytes = 100000000;
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset)));
const auto & settings = context->getSettingsRef();
LOG_DEBUG(log, "Reading from offset: {}", offset);
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0};
@ -65,6 +68,48 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
}
void ReadBufferFromWebServer::initializeWithRetry()
{
/// Initialize impl with retry.
auto num_tries = std::max(read_settings.http_max_tries, HTTP_MAX_TRIES);
size_t milliseconds_to_wait = read_settings.http_retry_initial_backoff_ms;
bool initialized = false;
for (size_t i = 0; (i < num_tries) && !initialized; ++i)
{
while (milliseconds_to_wait < read_settings.http_retry_max_backoff_ms)
{
try
{
impl = initialize();
if (use_external_buffer)
{
/**
* See comment at line 120.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
initialized = true;
break;
}
catch (Poco::Exception & e)
{
if (i == num_tries - 1)
throw;
LOG_ERROR(&Poco::Logger::get("ReadBufferFromWeb"), e.what());
sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait *= 2;
}
}
milliseconds_to_wait = read_settings.http_retry_initial_backoff_ms;
}
}
bool ReadBufferFromWebServer::nextImpl()
{
if (impl)
@ -95,22 +140,7 @@ bool ReadBufferFromWebServer::nextImpl()
}
else
{
/// Initialize impl with retry.
auto num_tries = std::max(read_settings.http_max_tries, HTTP_MAX_TRIES);
for (size_t i = 0; i < num_tries; ++i)
{
try
{
impl = initialize();
}
catch (Poco::Exception &)
{
if (i == num_tries - 1)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
initializeWithRetry();
}
auto result = impl->next();

View File

@ -31,6 +31,8 @@ public:
private:
std::unique_ptr<ReadBuffer> initialize();
void initializeWithRetry();
Poco::Logger * log;
ContextPtr context;