Merge remote-tracking branch 'origin/retriable-http' into disk-async-read

This commit is contained in:
kssenii 2021-10-10 20:23:44 +03:00
commit fde6f0507d
10 changed files with 137 additions and 71 deletions

View File

@ -257,6 +257,7 @@ Pipe LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTT
0,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
getContext()->getReadSettings(),
ReadWriteBufferFromHTTP::HTTPHeaderEntries{});
auto input_stream = getContext()->getInputFormat(LibraryBridgeHelper::DEFAULT_FORMAT, *read_buf_ptr, sample_block, DEFAULT_BLOCK_SIZE);

View File

@ -75,7 +75,6 @@ class IColumn;
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(UInt64, http_max_single_read_retries, 4, "The maximum number of retries during single http read.", 0) \
M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
@ -521,9 +520,14 @@ class IColumn;
M(Milliseconds, async_insert_busy_timeout_ms, 200, "Maximum time to wait before dumping collected data per query since the first data appeared", 0) \
M(Milliseconds, async_insert_stale_timeout_ms, 0, "Maximum time to wait before dumping collected data per query since the last data appeared. Zero means no timeout at all", 0) \
\
M(Int64, remote_fs_read_backoff_threshold, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(Int64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(Int64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
\
M(Int64, http_max_tries, 1, "Max attempts to read via http.", 0) \
M(Int64, http_retry_initial_backoff_ms, 100, "Min milliseconds for backoff, when retrying read via http", 0) \
M(Int64, http_retry_max_backoff_ms, 10000, "Max milliseconds for backoff, when retrying read via http", 0) \
M(Bool, http_retriable_read, true, "Allow to resume reading via http if some error occurred. Reading will continue starting from last read byte (with `range` header)", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
M(Bool, allow_experimental_nlp_functions, false, "Enable experimental functions for natural language processing.", 0) \

View File

@ -103,6 +103,7 @@ Pipe HTTPDictionarySource::loadAll()
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
return createWrappedBuffer(std::move(in_ptr));
@ -121,6 +122,7 @@ Pipe HTTPDictionarySource::loadUpdatedAll()
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
return createWrappedBuffer(std::move(in_ptr));
@ -148,6 +150,7 @@ Pipe HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
return createWrappedBuffer(std::move(in_ptr));
@ -175,6 +178,7 @@ Pipe HTTPDictionarySource::loadKeys(const Columns & key_columns, const std::vect
0,
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
return createWrappedBuffer(std::move(in_ptr));

View File

@ -169,8 +169,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool;
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, meta, getContext(),
read_settings.remote_fs_buffer_size,
read_settings.remote_fs_backoff_threshold, read_settings.remote_fs_backoff_max_tries,
read_settings,
threadpool_read);
if (threadpool_read)

View File

@ -17,22 +17,17 @@ namespace ErrorCodes
{
extern const int CANNOT_SEEK_THROUGH_FILE;
extern const int SEEK_POSITION_OUT_OF_BOUND;
extern const int NETWORK_ERROR;
}
static const auto WAIT_MS = 10;
ReadBufferFromWebServer::ReadBufferFromWebServer(
const String & url_, ContextPtr context_, size_t buf_size_,
size_t backoff_threshold_, size_t max_tries_, bool use_external_buffer_)
const String & url_, ContextPtr context_, const ReadSettings & settings_, bool use_external_buffer_)
: SeekableReadBuffer(nullptr, 0)
, log(&Poco::Logger::get("ReadBufferFromWebServer"))
, context(context_)
, url(url_)
, buf_size(buf_size_)
, backoff_threshold_ms(backoff_threshold_)
, max_tries(max_tries_)
, buf_size(settings_.remote_fs_buffer_size)
, read_settings(settings_)
, use_external_buffer(use_external_buffer_)
{
}
@ -60,15 +55,16 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
http_keep_alive_timeout),
0,
Poco::Net::HTTPBasicCredentials{},
buf_size, headers, context->getRemoteHostFilter(), use_external_buffer);
buf_size,
read_settings,
headers,
context->getRemoteHostFilter(),
use_external_buffer);
}
bool ReadBufferFromWebServer::nextImpl()
{
bool next_result = false, successful_read = false;
UInt16 milliseconds_to_wait = WAIT_MS;
if (impl)
{
if (use_external_buffer)
@ -95,56 +91,19 @@ bool ReadBufferFromWebServer::nextImpl()
assert(!impl->hasPendingData());
}
}
WriteBufferFromOwnString error_msg;
for (size_t i = 0; (i < max_tries) && !successful_read && !next_result; ++i)
else
{
while (milliseconds_to_wait < backoff_threshold_ms)
{
try
{
if (!impl)
{
impl = initialize();
if (use_external_buffer)
{
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
next_result = impl->hasPendingData();
if (next_result)
break;
}
next_result = impl->next();
successful_read = true;
break;
}
catch (const Poco::Exception & e)
{
LOG_WARNING(log, "Read attempt failed for url: {}. Error: {}", url, e.what());
error_msg << fmt::format("Error: {}\n", e.what());
sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait *= 2;
impl.reset();
}
}
milliseconds_to_wait = WAIT_MS;
impl = initialize();
}
if (!successful_read)
throw Exception(ErrorCodes::NETWORK_ERROR,
"All read attempts failed for url: {}. Reason:\n{}", url, error_msg.str());
if (next_result)
auto result = impl->next();
if (result)
{
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
offset += working_buffer.size();
}
return next_result;
return result;
}

View File

@ -2,6 +2,7 @@
#include <IO/SeekableReadBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadSettings.h>
#include <Interpreters/Context.h>
@ -18,8 +19,7 @@ class ReadBufferFromWebServer : public SeekableReadBuffer
public:
explicit ReadBufferFromWebServer(
const String & url_, ContextPtr context_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
size_t backoff_threshold_ = 10000, size_t max_tries_ = 4,
const ReadSettings & settings_ = {},
bool use_external_buffer_ = false);
bool nextImpl() override;
@ -41,10 +41,9 @@ private:
off_t offset = 0;
size_t backoff_threshold_ms;
size_t max_tries;
bool use_external_buffer;
ReadSettings read_settings;
};
}

View File

@ -74,8 +74,13 @@ struct ReadSettings
/// For 'pread_threadpool' method. Lower is more priority.
size_t priority = 0;
size_t remote_fs_backoff_threshold = 10000;
size_t remote_fs_backoff_max_tries = 4;
size_t remote_fs_read_max_backoff_ms = 10000;
size_t remote_fs_read_backoff_max_tries = 4;
bool http_retriable_read = true;
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 10000;
ReadSettings adjustBufferSize(size_t file_size) const
{

View File

@ -2,10 +2,12 @@
#include <functional>
#include <base/types.h>
#include <base/sleep.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadSettings.h>
#include <Poco/Any.h>
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPClientSession.h>
@ -17,6 +19,7 @@
#include <Common/RemoteHostFilter.h>
#include <base/logger_useful.h>
#include <Poco/URIStreamFactory.h>
#include <charconv>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
@ -101,6 +104,14 @@ namespace detail
RemoteHostFilter remote_host_filter;
std::function<void(size_t)> next_callback;
size_t buffer_size;
size_t bytes_read = 0;
size_t start_byte = 0;
std::optional<size_t> total_bytes_to_read;
ReadSettings settings;
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response)
{
// With empty path poco will send "POST HTTP/1.1" its bug.
@ -118,6 +129,9 @@ namespace detail
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
}
if (bytes_read && settings.http_retriable_read)
request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read));
if (!credentials.getUsername().empty())
credentials.authenticate(request);
@ -162,6 +176,7 @@ namespace detail
OutStreamCallback out_stream_callback_ = {},
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const ReadSettings & settings_ = {},
HTTPHeaderEntries http_header_entries_ = {},
const RemoteHostFilter & remote_host_filter_ = {},
bool use_external_buffer_ = false)
@ -173,10 +188,32 @@ namespace detail
, credentials {credentials_}
, http_header_entries {http_header_entries_}
, remote_host_filter {remote_host_filter_}
, buffer_size {buffer_size_}
, settings {settings_}
, use_external_buffer {use_external_buffer_}
{
Poco::Net::HTTPResponse response;
/**
* Get first byte from `bytes=offset-`, `bytes=offset-end`.
* Now there are two places, where it can be set: 1. in DiskWeb (offset), 2. via config as part of named-collection.
* Also header can be `bytes=-k` (read last k bytes), for this case retries in the middle of reading are disabled.
*/
auto range_header = std::find_if(http_header_entries_.begin(), http_header_entries_.end(),
[&](const HTTPHeaderEntry & header) { return std::get<0>(header) == "Range"; });
if (range_header != http_header_entries_.end())
{
auto range = std::get<1>(*range_header).substr(std::strlen("bytes="));
auto [ptr, ec] = std::from_chars(range.data(), range.data() + range.size(), start_byte);
if (ec != std::errc())
settings.http_retriable_read = false;
}
initialize();
}
void initialize()
{
Poco::Net::HTTPResponse response;
istr = call(uri, response);
while (isRedirect(response.getStatus()))
@ -189,9 +226,19 @@ namespace detail
istr = call(uri_redirect, response);
}
/// If it is the very first initialization.
if (!bytes_read && !total_bytes_to_read)
{
/// If we do not know total size, disable retries in the middle of reading.
if (response.hasContentLength())
total_bytes_to_read = response.getContentLength();
else
settings.http_retriable_read = false;
}
try
{
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size_);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size);
}
catch (const Poco::Exception & e)
{
@ -232,11 +279,52 @@ namespace detail
impl->position() = position();
}
if (!impl->next())
if (total_bytes_to_read && bytes_read == total_bytes_to_read.value())
return false;
if (impl && !working_buffer.empty())
impl->position() = position();
bool result = false;
bool successful_read = false;
size_t milliseconds_to_wait = settings.http_retry_initial_backoff_ms;
/// Default http_max_tries = 1.
for (size_t i = 0; (i < settings.http_max_tries) && !successful_read; ++i)
{
while (milliseconds_to_wait < settings.http_retry_max_backoff_ms)
{
try
{
if (!impl)
initialize();
result = impl->next();
successful_read = true;
break;
}
catch (const Poco::Exception &)
{
if (i == settings.http_max_tries - 1
|| (bytes_read && !settings.http_retriable_read))
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
impl.reset();
sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait *= 2;
}
}
milliseconds_to_wait = settings.http_retry_initial_backoff_ms;
}
if (!result)
return false;
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
bytes_read += working_buffer.size();
return true;
}
@ -299,12 +387,13 @@ public:
const UInt64 max_redirects = 0,
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const ReadSettings & settings_ = {},
const HTTPHeaderEntries & http_header_entries_ = {},
const RemoteHostFilter & remote_host_filter_ = {},
bool use_external_buffer_ = false)
: Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects),
uri_, method_, out_stream_callback_, credentials_, buffer_size_,
http_header_entries_, remote_host_filter_, use_external_buffer_)
settings_, http_header_entries_, remote_host_filter_, use_external_buffer_)
{
}
};

View File

@ -2936,14 +2936,19 @@ ReadSettings Context::getReadSettings() const
res.local_fs_prefetch = settings.local_filesystem_read_prefetch;
res.remote_fs_prefetch = settings.remote_filesystem_read_prefetch;
res.remote_fs_backoff_threshold = settings.remote_fs_read_backoff_threshold;
res.remote_fs_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
res.remote_fs_read_max_backoff_ms = settings.remote_fs_read_max_backoff_ms;
res.remote_fs_read_backoff_max_tries = settings.remote_fs_read_backoff_max_tries;
res.local_fs_buffer_size = settings.max_read_buffer_size;
res.direct_io_threshold = settings.min_bytes_to_use_direct_io;
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;
res.http_retriable_read = settings.http_retriable_read;
res.http_max_tries = settings.http_max_tries;
res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms;
res.http_retry_max_backoff_ms = settings.http_retry_max_backoff_ms;
res.mmap_cache = getMMappedFileCache().get();
return res;

View File

@ -124,6 +124,7 @@ namespace
context->getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
context->getRemoteHostFilter()),
chooseCompressionMethod(request_uri.getPath(), compression_method));