mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Better
This commit is contained in:
parent
e7dbfff5d2
commit
1c8b1b1133
@ -588,6 +588,7 @@
|
||||
M(618, LZ4_DECODER_FAILED) \
|
||||
M(619, POSTGRESQL_REPLICATION_INTERNAL_ERROR) \
|
||||
M(620, QUERY_NOT_ALLOWED) \
|
||||
M(621, HTTP_RANGE_NOT_SATISFIABLE) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
|
@ -525,7 +525,6 @@ class IColumn;
|
||||
M(Int64, http_max_tries, 1, "Max attempts to read via http.", 0) \
|
||||
M(Int64, http_retry_initial_backoff_ms, 100, "Min milliseconds for backoff, when retrying read via http", 0) \
|
||||
M(Int64, http_retry_max_backoff_ms, 10000, "Max milliseconds for backoff, when retrying read via http", 0) \
|
||||
M(Bool, http_retriable_read, true, "Allow to resume reading via http if some error occurred. Reading will continue starting from last read byte (with `range` header)", 0) \
|
||||
\
|
||||
/** Experimental functions */ \
|
||||
M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \
|
||||
|
@ -17,8 +17,11 @@ namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_SEEK_THROUGH_FILE;
|
||||
extern const int SEEK_POSITION_OUT_OF_BOUND;
|
||||
extern const int NETWORK_ERROR;
|
||||
}
|
||||
|
||||
static constexpr size_t HTTP_MAX_TRIES = 10;
|
||||
|
||||
ReadIndirectBufferFromWebServer::ReadIndirectBufferFromWebServer(
|
||||
const String & url_, ContextPtr context_, size_t buf_size_, const ReadSettings & settings_)
|
||||
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size_)
|
||||
@ -59,6 +62,37 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
|
||||
}
|
||||
|
||||
|
||||
void ReadIndirectBufferFromWebServer::initializeWithRetry()
|
||||
{
|
||||
/// Initialize impl with retry.
|
||||
auto num_tries = std::max(read_settings.http_max_tries, HTTP_MAX_TRIES);
|
||||
size_t milliseconds_to_wait = read_settings.http_retry_initial_backoff_ms;
|
||||
bool initialized = false;
|
||||
for (size_t i = 0; (i < num_tries) && !initialized; ++i)
|
||||
{
|
||||
while (milliseconds_to_wait < read_settings.http_retry_max_backoff_ms)
|
||||
{
|
||||
try
|
||||
{
|
||||
impl = initialize();
|
||||
initialized = true;
|
||||
break;
|
||||
}
|
||||
catch (Poco::Exception & e)
|
||||
{
|
||||
if (i == num_tries - 1)
|
||||
throw;
|
||||
|
||||
LOG_ERROR(&Poco::Logger::get("ReadBufferFromWeb"), "Error: {}, code: {}", e.what(), e.code());
|
||||
sleepForMilliseconds(milliseconds_to_wait);
|
||||
milliseconds_to_wait *= 2;
|
||||
}
|
||||
}
|
||||
milliseconds_to_wait = read_settings.http_retry_initial_backoff_ms;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
bool ReadIndirectBufferFromWebServer::nextImpl()
|
||||
{
|
||||
if (impl)
|
||||
@ -69,7 +103,7 @@ bool ReadIndirectBufferFromWebServer::nextImpl()
|
||||
}
|
||||
else
|
||||
{
|
||||
impl = initialize();
|
||||
initializeWithRetry();
|
||||
}
|
||||
|
||||
auto result = impl->next();
|
||||
|
@ -31,6 +31,8 @@ public:
|
||||
private:
|
||||
std::unique_ptr<ReadBuffer> initialize();
|
||||
|
||||
void initializeWithRetry();
|
||||
|
||||
Poco::Logger * log;
|
||||
ContextPtr context;
|
||||
|
||||
|
@ -69,7 +69,6 @@ struct ReadSettings
|
||||
size_t remote_fs_read_max_backoff_ms = 10000;
|
||||
size_t remote_fs_read_backoff_max_tries = 4;
|
||||
|
||||
bool http_retriable_read = true;
|
||||
size_t http_max_tries = 1;
|
||||
size_t http_retry_initial_backoff_ms = 100;
|
||||
size_t http_retry_max_backoff_ms = 10000;
|
||||
|
@ -34,6 +34,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TOO_MANY_REDIRECTS;
|
||||
extern const int HTTP_RANGE_NOT_SATISFIABLE;
|
||||
}
|
||||
|
||||
template <typename SessionPtr>
|
||||
@ -108,8 +109,13 @@ namespace detail
|
||||
|
||||
size_t bytes_read = 0;
|
||||
size_t start_byte = 0;
|
||||
|
||||
bool resumable_read = true;
|
||||
std::optional<size_t> total_bytes_to_read;
|
||||
|
||||
/// Delayed exception in case retries with partial content are not satisfiable
|
||||
std::optional<Poco::Exception> exception;
|
||||
|
||||
ReadSettings settings;
|
||||
|
||||
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response)
|
||||
@ -129,8 +135,12 @@ namespace detail
|
||||
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
|
||||
}
|
||||
|
||||
if (bytes_read && settings.http_retriable_read)
|
||||
bool partial_content = false;
|
||||
if (bytes_read && resumable_read)
|
||||
{
|
||||
partial_content = true;
|
||||
request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read));
|
||||
}
|
||||
|
||||
if (!credentials.getUsername().empty())
|
||||
credentials.authenticate(request);
|
||||
@ -149,9 +159,16 @@ namespace detail
|
||||
istr = receiveResponse(*sess, request, response, true);
|
||||
response.getCookies(cookies);
|
||||
|
||||
if (partial_content && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
|
||||
{
|
||||
/// If we retries some request, throw error from that request.
|
||||
if (exception)
|
||||
exception->rethrow();
|
||||
throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: {}", request.get("Range"));
|
||||
}
|
||||
|
||||
content_encoding = response.get("Content-Encoding", "");
|
||||
return istr;
|
||||
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
@ -196,10 +213,13 @@ namespace detail
|
||||
[&](const HTTPHeaderEntry & header) { return std::get<0>(header) == "Range"; });
|
||||
if (range_header != http_header_entries_.end())
|
||||
{
|
||||
if (method == Poco::Net::HTTPRequest::HTTP_POST)
|
||||
throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "POST request cannot have range headers");
|
||||
|
||||
auto range = std::get<1>(*range_header).substr(std::strlen("bytes="));
|
||||
auto [ptr, ec] = std::from_chars(range.data(), range.data() + range.size(), start_byte);
|
||||
if (ec != std::errc())
|
||||
settings.http_retriable_read = false;
|
||||
resumable_read = false;
|
||||
}
|
||||
|
||||
initialize();
|
||||
@ -228,7 +248,7 @@ namespace detail
|
||||
if (response.hasContentLength())
|
||||
total_bytes_to_read = response.getContentLength();
|
||||
else
|
||||
settings.http_retriable_read = false;
|
||||
resumable_read = false;
|
||||
}
|
||||
|
||||
try
|
||||
@ -274,15 +294,18 @@ namespace detail
|
||||
successful_read = true;
|
||||
break;
|
||||
}
|
||||
catch (const Poco::Exception &)
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
if (i == settings.http_max_tries - 1
|
||||
|| (bytes_read && !settings.http_retriable_read))
|
||||
|| (bytes_read && !resumable_read))
|
||||
throw;
|
||||
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code());
|
||||
impl.reset();
|
||||
|
||||
exception.reset();
|
||||
exception.emplace(e);
|
||||
|
||||
sleepForMilliseconds(milliseconds_to_wait);
|
||||
milliseconds_to_wait *= 2;
|
||||
}
|
||||
|
@ -2937,7 +2937,6 @@ ReadSettings Context::getReadSettings() const
|
||||
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
|
||||
res.priority = settings.read_priority;
|
||||
|
||||
res.http_retriable_read = settings.http_retriable_read;
|
||||
res.http_max_tries = settings.http_max_tries;
|
||||
res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms;
|
||||
res.http_retry_max_backoff_ms = settings.http_retry_max_backoff_ms;
|
||||
|
Loading…
Reference in New Issue
Block a user