Squashed commit of the following:

commit a1acc7ed3485bd158cf42f26b2d2a1bca84b7269
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Sun Oct 31 14:47:25 2021 +0000

    Fix

commit 12a27d445b
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Fri Oct 29 09:24:53 2021 +0000

    Adjutments after merge with master

commit f17e321a73
Merge: 55f1ba857d 11b70a285c
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Fri Oct 29 08:03:25 2021 +0000

    Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into retriable-http

commit 55f1ba857d
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Fri Oct 29 07:38:42 2021 +0000

    Small fixes

commit aeba8104d1
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Thu Oct 28 10:28:05 2021 +0000

    Some more fixes

commit 82f3754b8a
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Wed Oct 27 20:20:15 2021 +0000

    Slightly better

commit 2647b88a66
Merge: 7024f51b55 2bb586bed3
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Wed Oct 27 19:38:19 2021 +0000

    Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into retriable-http

commit 7024f51b55
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Wed Oct 27 18:30:25 2021 +0000

    Review fixes and add test

commit 7de5fca075
Merge: 1ecb6ad05d 855b10261c
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Sun Oct 24 17:04:24 2021 +0000

    Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into retriable-http

commit 1ecb6ad05d
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Sat Oct 23 18:53:59 2021 +0000

    Better

commit 710ee96878
Merge: 28d2485716 158b4c26b7
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Mon Oct 18 08:31:25 2021 +0000

    Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into retriable-http

commit 28d2485716
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Fri Oct 15 17:27:26 2021 +0000

    Some fixes

commit 7c20ca07b7
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Fri Oct 15 12:59:24 2021 +0000

    Fix style check

commit ccb02cac56
Merge: 9e4fe0f3c8 d1138a8a25
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Fri Oct 15 12:36:30 2021 +0000

    Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into retriable-http

commit 9e4fe0f3c8
Merge: 9814cb1b45 daed77038b
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Tue Oct 12 09:58:15 2021 +0000

    Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into retriable-http

commit 9814cb1b45
Merge: 1c8b1b1133 969999ff10
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Tue Oct 12 09:49:08 2021 +0000

    Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into retriable-http

commit 1c8b1b1133
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Tue Oct 12 09:33:07 2021 +0000

    Better

commit e7dbfff5d2
Merge: 3329b668d6 198adc7ecd
Author: kssenii <sumarokovakseniia@mail.ru>
Date:   Tue Oct 12 06:19:15 2021 +0000

    Merge branch 'master' of https://github.com/ClickHouse/ClickHouse into retriable-http
This commit is contained in:
kssenii 2021-10-31 17:53:08 +03:00
parent c936fa93c3
commit 2940d9fd19
18 changed files with 307 additions and 134 deletions

View File

@ -75,7 +75,7 @@ bool LibraryBridgeHelper::bridgeHandShake()
String result;
try
{
ReadWriteBufferFromHTTP buf(createRequestURI(PING), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts);
ReadWriteBufferFromHTTP buf(createRequestURI(PING), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts, credentials);
readString(result, buf);
}
catch (...)
@ -240,7 +240,7 @@ bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferF
uri,
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
http_timeouts);
http_timeouts, credentials);
bool res;
readBoolText(res, buf);
@ -255,8 +255,8 @@ Pipe LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTT
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
http_timeouts,
credentials,
0,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
getContext()->getReadSettings(),
ReadWriteBufferFromHTTP::HTTPHeaderEntries{});

View File

@ -101,6 +101,7 @@ private:
size_t bridge_port;
bool library_initialized = false;
ConnectionTimeouts http_timeouts;
Poco::Net::HTTPBasicCredentials credentials{};
};
}

View File

@ -76,7 +76,7 @@ protected:
{
try
{
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
@ -135,6 +135,8 @@ private:
std::optional<IdentifierQuotingStyle> quote_style;
std::optional<bool> is_schema_allowed;
Poco::Net::HTTPBasicCredentials credentials{};
protected:
using URLParams = std::vector<std::pair<std::string, std::string>>;
@ -166,7 +168,7 @@ protected:
uri.setPath(SCHEMA_ALLOWED_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
bool res;
readBoolText(res, buf);
@ -186,7 +188,7 @@ protected:
uri.setPath(IDENTIFIER_QUOTE_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
std::string character;
readStringBinary(character, buf);

View File

@ -593,6 +593,7 @@
M(623, CAPN_PROTO_BAD_CAST) \
M(624, BAD_FILE_TYPE) \
M(625, IO_SETUP_ERROR) \
M(626, HTTP_RANGE_NOT_SATISFIABLE) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -523,6 +523,10 @@ class IColumn;
M(Int64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(Int64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
\
M(UInt64, http_max_tries, 1, "Max attempts to read via http.", 0) \
M(UInt64, http_retry_initial_backoff_ms, 100, "Min milliseconds for backoff, when retrying read via http", 0) \
M(UInt64, http_retry_max_backoff_ms, 1600, "Max milliseconds for backoff, when retrying read via http", 0) \
\
M(Bool, force_remove_data_recursively_on_drop, false, "Recursively remove data on DROP query. Avoids 'Directory not empty' error, but may silently remove detached data", 0) \
\
/** Experimental functions */ \

View File

@ -98,11 +98,13 @@ Pipe HTTPDictionarySource::loadAll()
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
timeouts,
0,
credentials,
0,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
RemoteHostFilter{}, false);
return createWrappedBuffer(std::move(in_ptr));
}
@ -117,11 +119,13 @@ Pipe HTTPDictionarySource::loadUpdatedAll()
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
timeouts,
0,
credentials,
0,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
RemoteHostFilter{}, false);
return createWrappedBuffer(std::move(in_ptr));
}
@ -145,11 +149,13 @@ Pipe HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
Poco::Net::HTTPRequest::HTTP_POST,
out_stream_callback,
timeouts,
0,
credentials,
0,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
RemoteHostFilter{}, false);
return createWrappedBuffer(std::move(in_ptr));
}
@ -173,11 +179,13 @@ Pipe HTTPDictionarySource::loadKeys(const Columns & key_columns, const std::vect
Poco::Net::HTTPRequest::HTTP_POST,
out_stream_callback,
timeouts,
0,
credentials,
0,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
configuration.header_entries,
ReadWriteBufferFromHTTP::Range{},
RemoteHostFilter{}, false);
return createWrappedBuffer(std::move(in_ptr));
}

View File

@ -214,7 +214,8 @@ Pipe XDBCDictionarySource::loadFromQuery(const Poco::URI & url, const Block & re
os << "query=" << escapeForFileName(query);
};
auto read_buf = std::make_unique<ReadWriteBufferFromHTTP>(url, Poco::Net::HTTPRequest::HTTP_POST, write_body_callback, timeouts);
auto read_buf = std::make_unique<ReadWriteBufferFromHTTP>(
url, Poco::Net::HTTPRequest::HTTP_POST, write_body_callback, timeouts, credentials);
auto format = getContext()->getInputFormat(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, required_sample_block, max_block_size);
format->addBuffer(std::move(read_buf));

View File

@ -89,6 +89,7 @@ private:
BridgeHelperPtr bridge_helper;
Poco::URI bridge_url;
ConnectionTimeouts timeouts;
Poco::Net::HTTPBasicCredentials credentials{};
};
}

View File

@ -38,10 +38,12 @@ void DiskWebServer::initialize(const String & uri_path) const
LOG_TRACE(log, "Loading metadata for directory: {}", uri_path);
try
{
Poco::Net::HTTPBasicCredentials credentials{};
ReadWriteBufferFromHTTP metadata_buf(Poco::URI(fs::path(uri_path) / ".index"),
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(getContext()));
ConnectionTimeouts::getHTTPTimeouts(getContext()),
credentials);
String file_name;
FileData file_data{};

View File

@ -21,15 +21,12 @@ namespace ErrorCodes
}
static constexpr size_t HTTP_MAX_TRIES = 10;
static constexpr size_t WAIT_INIT = 100;
ReadBufferFromWebServer::ReadBufferFromWebServer(
const String & url_,
ContextPtr context_,
const ReadSettings & settings_,
bool use_external_buffer_,
size_t last_offset_)
size_t read_until_position_)
: SeekableReadBuffer(nullptr, 0)
, log(&Poco::Logger::get("ReadBufferFromWebServer"))
, context(context_)
@ -37,7 +34,7 @@ ReadBufferFromWebServer::ReadBufferFromWebServer(
, buf_size(settings_.remote_fs_buffer_size)
, read_settings(settings_)
, use_external_buffer(use_external_buffer_)
, last_offset(last_offset_)
, read_until_position(read_until_position_)
{
}
@ -45,20 +42,18 @@ ReadBufferFromWebServer::ReadBufferFromWebServer(
std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
{
Poco::URI uri(url);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
if (last_offset)
ReadWriteBufferFromHTTP::Range range;
if (read_until_position)
{
if (last_offset < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, last_offset - 1);
if (read_until_position < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-{}", offset, last_offset - 1)));
LOG_DEBUG(log, "Reading with range: {}-{}", offset, last_offset);
range = { .begin = static_cast<size_t>(offset), .end = read_until_position - 1 };
LOG_DEBUG(log, "Reading with range: {}-{}", offset, read_until_position);
}
else
{
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset)));
range = { .begin = static_cast<size_t>(offset) };
LOG_DEBUG(log, "Reading from offset: {}", offset);
}
@ -75,78 +70,32 @@ std::unique_ptr<ReadBuffer> ReadBufferFromWebServer::initialize()
std::max(Poco::Timespan(settings.http_receive_timeout.totalSeconds(), 0), Poco::Timespan(20, 0)),
settings.tcp_keep_alive_timeout,
http_keep_alive_timeout),
credentials,
0,
Poco::Net::HTTPBasicCredentials{},
buf_size,
read_settings,
headers,
ReadWriteBufferFromHTTP::HTTPHeaderEntries{},
range,
context->getRemoteHostFilter(),
/* delay_initialization */true,
use_external_buffer);
}
void ReadBufferFromWebServer::initializeWithRetry()
{
/// Initialize impl with retry.
size_t milliseconds_to_wait = WAIT_INIT;
for (size_t i = 0; i < HTTP_MAX_TRIES; ++i)
{
try
{
impl = initialize();
if (use_external_buffer)
{
/**
* See comment 30 lines lower.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
break;
}
catch (Poco::Exception & e)
{
if (i == HTTP_MAX_TRIES - 1)
throw;
LOG_ERROR(&Poco::Logger::get("ReadBufferFromWeb"), "Error: {}, code: {}", e.what(), e.code());
sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait *= 2;
}
}
}
bool ReadBufferFromWebServer::nextImpl()
{
if (last_offset)
if (read_until_position)
{
if (last_offset == offset)
if (read_until_position == offset)
return false;
if (last_offset < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, last_offset - 1);
if (read_until_position < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
}
if (impl)
{
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not, because this branch
* means we are prefetching data, each nextImpl() call we can fill
* a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
else
if (!use_external_buffer)
{
/**
* impl was initialized before, pass position() to it to make
@ -159,7 +108,21 @@ bool ReadBufferFromWebServer::nextImpl()
}
else
{
initializeWithRetry();
impl = initialize();
}
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not, because this branch
* means we are prefetching data, each nextImpl() call we can fill
* a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
auto result = impl->next();

View File

@ -4,6 +4,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadSettings.h>
#include <Interpreters/Context.h>
#include <Poco/Net/HTTPBasicCredentials.h>
namespace DB
@ -21,7 +22,7 @@ public:
const String & url_, ContextPtr context_,
const ReadSettings & settings_ = {},
bool use_external_buffer_ = false,
size_t last_offset = 0);
size_t read_until_position = 0);
bool nextImpl() override;
@ -32,8 +33,6 @@ public:
private:
std::unique_ptr<ReadBuffer> initialize();
void initializeWithRetry();
Poco::Logger * log;
ContextPtr context;
@ -42,13 +41,14 @@ private:
std::unique_ptr<ReadBuffer> impl;
off_t offset = 0;
ReadSettings read_settings;
Poco::Net::HTTPBasicCredentials credentials{};
bool use_external_buffer;
off_t last_offset = 0;
off_t offset = 0;
off_t read_until_position = 0;
};
}

View File

@ -77,6 +77,10 @@ struct ReadSettings
size_t remote_fs_read_max_backoff_ms = 10000;
size_t remote_fs_read_backoff_max_tries = 4;
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 1600;
/// Set to true for MergeTree tables to make sure
/// that last position (offset in compressed file) is always passed.
/// (Otherwise asynchronous reading from remote fs is not efficient).

View File

@ -7,6 +7,7 @@
#include <IO/HTTPCommon.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadSettings.h>
#include <Poco/Any.h>
#include <Poco/Net/HTTPBasicCredentials.h>
@ -31,6 +32,8 @@ namespace DB
namespace ErrorCodes
{
extern const int TOO_MANY_REDIRECTS;
extern const int HTTP_RANGE_NOT_SATISFIABLE;
extern const int BAD_ARGUMENTS;
}
template <typename SessionPtr>
@ -40,7 +43,7 @@ protected:
SessionPtr session;
UInt64 redirects { 0 };
Poco::URI initial_uri;
const ConnectionTimeouts & timeouts;
ConnectionTimeouts timeouts;
UInt64 max_redirects;
public:
@ -86,6 +89,13 @@ namespace detail
using HTTPHeaderEntry = std::tuple<std::string, std::string>;
using HTTPHeaderEntries = std::vector<HTTPHeaderEntry>;
/// HTTP range, including right bound [begin, end].
struct Range
{
size_t begin = 0;
std::optional<size_t> end;
};
protected:
Poco::URI uri;
std::string method;
@ -102,7 +112,17 @@ namespace detail
std::function<void(size_t)> next_callback;
size_t buffer_size;
bool use_external_buffer;
size_t bytes_read = 0;
Range read_range;
/// Delayed exception in case retries with partial content are not satisfiable.
std::exception_ptr exception;
bool retry_with_range_header = false;
ReadSettings settings;
Poco::Logger * log;
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response)
{
@ -121,10 +141,23 @@ namespace detail
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
}
/**
* Add range header if we have some passed range (for disk web)
* or if we want to retry GET request on purpose.
*/
bool with_partial_content = (read_range.begin || read_range.end) || retry_with_range_header;
if (with_partial_content)
{
if (read_range.end)
request.set("Range", fmt::format("bytes={}-{}", read_range.begin + bytes_read, *read_range.end));
else
request.set("Range", fmt::format("bytes={}-", read_range.begin + bytes_read));
}
if (!credentials.getUsername().empty())
credentials.authenticate(request);
LOG_TRACE((&Poco::Logger::get("ReadWriteBufferFromHTTP")), "Sending request to {}", uri_.toString());
LOG_TRACE(log, "Sending request to {}", uri_.toString());
auto sess = session->getSession();
@ -138,9 +171,16 @@ namespace detail
istr = receiveResponse(*sess, request, response, true);
response.getCookies(cookies);
if (with_partial_content && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
{
/// If we retried some request, throw error from that request.
if (exception)
std::rethrow_exception(exception);
throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: {}", request.get("Range"));
}
content_encoding = response.get("Content-Encoding", "");
return istr;
}
catch (const Poco::Exception & e)
{
@ -151,9 +191,6 @@ namespace detail
}
}
private:
bool use_external_buffer;
public:
using NextCallback = std::function<void(size_t)>;
using OutStreamCallback = std::function<void(std::ostream &)>;
@ -161,13 +198,15 @@ namespace detail
explicit ReadWriteBufferFromHTTPBase(
UpdatableSessionPtr session_,
Poco::URI uri_,
const Poco::Net::HTTPBasicCredentials & credentials_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback_ = {},
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const ReadSettings & settings_ = {},
HTTPHeaderEntries http_header_entries_ = {},
Range read_range_ = {},
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization = false,
bool use_external_buffer_ = false)
: ReadBuffer(nullptr, 0)
, uri {uri_}
@ -178,15 +217,25 @@ namespace detail
, http_header_entries {http_header_entries_}
, remote_host_filter {remote_host_filter_}
, buffer_size {buffer_size_}
, settings {settings_}
, use_external_buffer {use_external_buffer_}
, read_range(read_range_)
, settings {settings_}
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
{
initialize();
if (settings.http_max_tries <= 0 || settings.http_retry_initial_backoff_ms <= 0
|| settings.http_retry_initial_backoff_ms >= settings.http_retry_max_backoff_ms)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Invalid setting for http backoff, "
"must be http_max_tries >= 1 (current is {}) and "
"0 < http_retry_initial_backoff_ms < settings.http_retry_max_backoff_ms (now 0 < {} < {})",
settings.http_max_tries, settings.http_retry_initial_backoff_ms, settings.http_retry_max_backoff_ms);
if (!delay_initialization)
initialize();
}
void initialize()
{
Poco::Net::HTTPResponse response;
istr = call(uri, response);
@ -200,6 +249,9 @@ namespace detail
istr = call(uri_redirect, response);
}
if (!bytes_read && !read_range.end && response.hasContentLength())
read_range.end = response.getContentLength();
try
{
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size);
@ -229,30 +281,93 @@ namespace detail
if (next_callback)
next_callback(count());
if (use_external_buffer)
if (read_range.end && bytes_read == read_range.end.value())
return false;
if (impl)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not (no hasPendingData() check is needed),
* because this branch means we are prefetching data,
* each nextImpl() call we can fill a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
else
{
/**
* impl was initialized before, pass position() to it to make
* sure there is no pending data which was not read.
*/
if (!working_buffer.empty())
impl->position() = position();
if (use_external_buffer)
{
/**
* use_external_buffer -- means we read into the buffer which
* was passed to us from somewhere else. We do not check whether
* previously returned buffer was read or not (no hasPendingData() check is needed),
* because this branch means we are prefetching data,
* each nextImpl() call we can fill a different buffer.
*/
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
else
{
/**
* impl was initialized before, pass position() to it to make
* sure there is no pending data which was not read.
*/
if (!working_buffer.empty())
impl->position() = position();
}
}
if (!impl->next())
bool result = false;
bool successful_read = false;
size_t milliseconds_to_wait = settings.http_retry_initial_backoff_ms;
/// Default http_max_tries = 1.
for (size_t i = 0; i < settings.http_max_tries; ++i)
{
try
{
if (!impl)
{
initialize();
if (use_external_buffer)
{
/// See comment 40 lines above.
impl->set(internal_buffer.begin(), internal_buffer.size());
assert(working_buffer.begin() != nullptr);
assert(!internal_buffer.empty());
}
}
result = impl->next();
successful_read = true;
break;
}
catch (const Poco::Exception & e)
{
/**
* Retry request unconditionally if nothing has beed read yet.
* Otherwise if it is GET method retry with range header starting from bytes_read.
*/
bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET;
if (!can_retry_request)
throw;
LOG_ERROR(log,
"HTTP request to `{}` failed at try {}/{} with bytes read: {}. "
"Error: {}. (Current backoff wait is {}/{} ms)",
uri.toString(), i, settings.http_max_tries, bytes_read, e.what(),
milliseconds_to_wait, settings.http_retry_max_backoff_ms);
retry_with_range_header = true;
exception = std::current_exception();
impl.reset();
sleepForMilliseconds(milliseconds_to_wait);
}
if (successful_read)
break;
milliseconds_to_wait = std::min(milliseconds_to_wait * 2, settings.http_retry_max_backoff_ms);
}
if (!successful_read && exception)
std::rethrow_exception(exception);
if (!result)
return false;
internal_buffer = impl->buffer();
@ -311,21 +426,24 @@ class ReadWriteBufferFromHTTP : public detail::ReadWriteBufferFromHTTPBase<std::
using Parent = detail::ReadWriteBufferFromHTTPBase<std::shared_ptr<UpdatableSession>>;
public:
explicit ReadWriteBufferFromHTTP(
explicit ReadWriteBufferFromHTTP(
Poco::URI uri_,
const std::string & method_,
OutStreamCallback out_stream_callback_,
const ConnectionTimeouts & timeouts,
const Poco::Net::HTTPBasicCredentials & credentials_,
const UInt64 max_redirects = 0,
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const ReadSettings & settings_ = {},
const HTTPHeaderEntries & http_header_entries_ = {},
Range read_range_ = {},
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization_ = true,
bool use_external_buffer_ = false)
: Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects),
uri_, method_, out_stream_callback_, credentials_, buffer_size_,
settings_, http_header_entries_, remote_host_filter_, use_external_buffer_)
uri_, credentials_, method_, out_stream_callback_, buffer_size_,
settings_, http_header_entries_, read_range_, remote_host_filter_,
delay_initialization_, use_external_buffer_)
{
}
};
@ -369,9 +487,9 @@ public:
size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT)
: Parent(std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint),
uri_,
credentials_,
method_,
out_stream_callback_,
credentials_,
buffer_size_)
{
}

View File

@ -3088,6 +3088,10 @@ ReadSettings Context::getReadSettings() const
res.mmap_threshold = settings.min_bytes_to_use_mmap_io;
res.priority = settings.read_priority;
res.http_max_tries = settings.http_max_tries;
res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms;
res.http_retry_max_backoff_ms = settings.http_retry_max_backoff_ms;
res.mmap_cache = getMMappedFileCache().get();
return res;

View File

@ -121,11 +121,12 @@ namespace
method,
callback,
timeouts,
credentials,
context->getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter()),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
@ -189,6 +190,8 @@ namespace
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
Poco::Net::HTTPBasicCredentials credentials{};
};
}
@ -450,6 +453,8 @@ void registerStorageURL(StorageFactory & factory)
for (const auto & [header, value] : configuration.headers)
{
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
}

View File

@ -82,7 +82,8 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
columns_info_uri.addQueryParameter("external_table_functions_use_nulls",
Poco::NumberFormatter::format(use_nulls));
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
Poco::Net::HTTPBasicCredentials credentials{};
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context), credentials);
std::string columns_info;
readStringBinary(columns_info, buf);

View File

@ -2,10 +2,6 @@
<named_collections>
<url1>
<headers>
<header>
<name>Range</name>
<value>bytes=0-1</value>
</header>
<header>
<name>Access-Control-Request-Method</name>
<value>PUT</value>

View File

@ -1,6 +1,10 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
import threading
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/named_collections.xml'], with_zookeeper=False, with_hdfs=True)
@ -95,3 +99,61 @@ def test_predefined_connection_configuration(started_cluster):
result = node1.query("SET max_http_get_redirects=1; select * from url(url1, url='http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', format='TSV', structure='id UInt32, name String, weight Float64')")
assert(result == "1\tMark\t72.53\n")
node1.query("drop table WebHDFSStorageWithRedirect")
result = ''
def test_url_reconnect_at_start(started_cluster):
hdfs_api = started_cluster.hdfs_api
with PartitionManager() as pm:
node1.query(
"insert into table function hdfs('hdfs://hdfs1:9000/storage_big', 'TSV', 'id Int32') select number from numbers(500000)")
pm._add_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
def select():
global result
print("reading")
result = node1.query(
"select sum(cityHash64(id)) from url('http://hdfs1:50075/webhdfs/v1/storage_big?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'id Int32') settings http_max_tries = 20, http_retry_max_backoff_ms=10000")
print(result)
thread = threading.Thread(target=select)
thread.start()
time.sleep(1)
print("delete rule")
pm._delete_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
thread.join()
#assert node1.contains_in_log("Error: Timeout, code:")
print(result)
result = ''
def test_url_reconnect_in_the_middle(started_cluster):
hdfs_api = started_cluster.hdfs_api
with PartitionManager() as pm:
node1.query(
"insert into table function hdfs('hdfs://hdfs1:9000/storage_big2', 'TSV', 'id Int32') select number from numbers(10000000)")
def select():
global result
print("reading")
result = node1.query(
"select sum(cityHash64(id)) from url('http://hdfs1:50075/webhdfs/v1/storage_big2?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'id Int32')")
print(result)
thread = threading.Thread(target=select)
print("add rule")
pm._add_rule({'probability': 0.3, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
thread.start()
time.sleep(0.5)
pm._add_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
time.sleep(3)
print("delete rule")
pm._delete_rule({'probability': 0.3, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
pm._delete_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
thread.join()
assert node1.contains_in_log("Error: Timeout, code:")
print(result)