Some more fixes

This commit is contained in:
kssenii 2021-10-28 10:28:05 +00:00
parent 82f3754b8a
commit aeba8104d1
10 changed files with 83 additions and 66 deletions

View File

@ -75,7 +75,7 @@ bool LibraryBridgeHelper::bridgeHandShake()
String result;
try
{
ReadWriteBufferFromHTTP buf(createRequestURI(PING), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts);
ReadWriteBufferFromHTTP buf(createRequestURI(PING), Poco::Net::HTTPRequest::HTTP_GET, {}, http_timeouts, credentials);
readString(result, buf);
}
catch (...)
@ -240,7 +240,7 @@ bool LibraryBridgeHelper::executeRequest(const Poco::URI & uri, ReadWriteBufferF
uri,
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
http_timeouts);
http_timeouts, credentials);
bool res;
readBoolText(res, buf);
@ -255,8 +255,8 @@ Pipe LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTT
Poco::Net::HTTPRequest::HTTP_POST,
std::move(out_stream_callback),
http_timeouts,
0,
credentials,
0,
DBMS_DEFAULT_BUFFER_SIZE,
getContext()->getReadSettings(),
ReadWriteBufferFromHTTP::HTTPHeaderEntries{});

View File

@ -78,7 +78,7 @@ protected:
{
try
{
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
ReadWriteBufferFromHTTP buf(getPingURI(), Poco::Net::HTTPRequest::HTTP_GET, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
return checkString(PING_OK_ANSWER, buf);
}
catch (...)
@ -137,6 +137,8 @@ private:
std::optional<IdentifierQuotingStyle> quote_style;
std::optional<bool> is_schema_allowed;
Poco::Net::HTTPBasicCredentials credentials{};
protected:
using URLParams = std::vector<std::pair<std::string, std::string>>;
@ -168,7 +170,7 @@ protected:
uri.setPath(SCHEMA_ALLOWED_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
bool res;
readBoolText(res, buf);
@ -188,7 +190,7 @@ protected:
uri.setPath(IDENTIFIER_QUOTE_HANDLER);
uri.addQueryParameter("connection_string", getConnectionString());
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()));
ReadWriteBufferFromHTTP buf(uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(getContext()), credentials);
std::string character;
readStringBinary(character, buf);

View File

@ -98,11 +98,12 @@ Pipe HTTPDictionarySource::loadAll()
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
timeouts,
0,
credentials,
0,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
configuration.header_entries,
RemoteHostFilter{}, false);
return createWrappedBuffer(std::move(in_ptr));
}
@ -117,11 +118,12 @@ Pipe HTTPDictionarySource::loadUpdatedAll()
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
timeouts,
0,
credentials,
0,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
configuration.header_entries,
RemoteHostFilter{}, false);
return createWrappedBuffer(std::move(in_ptr));
}
@ -145,11 +147,12 @@ Pipe HTTPDictionarySource::loadIds(const std::vector<UInt64> & ids)
Poco::Net::HTTPRequest::HTTP_POST,
out_stream_callback,
timeouts,
0,
credentials,
0,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
configuration.header_entries,
RemoteHostFilter{}, false);
return createWrappedBuffer(std::move(in_ptr));
}
@ -173,11 +176,12 @@ Pipe HTTPDictionarySource::loadKeys(const Columns & key_columns, const std::vect
Poco::Net::HTTPRequest::HTTP_POST,
out_stream_callback,
timeouts,
0,
credentials,
0,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
configuration.header_entries);
configuration.header_entries,
RemoteHostFilter{}, false);
return createWrappedBuffer(std::move(in_ptr));
}

View File

@ -214,7 +214,8 @@ Pipe XDBCDictionarySource::loadFromQuery(const Poco::URI & url, const Block & re
os << "query=" << escapeForFileName(query);
};
auto read_buf = std::make_unique<ReadWriteBufferFromHTTP>(url, Poco::Net::HTTPRequest::HTTP_POST, write_body_callback, timeouts);
auto read_buf = std::make_unique<ReadWriteBufferFromHTTP>(
url, Poco::Net::HTTPRequest::HTTP_POST, write_body_callback, timeouts, credentials);
auto format = getContext()->getInputFormat(IXDBCBridgeHelper::DEFAULT_FORMAT, *read_buf, required_sample_block, max_block_size);
format->addBuffer(std::move(read_buf));

View File

@ -89,6 +89,7 @@ private:
BridgeHelperPtr bridge_helper;
Poco::URI bridge_url;
ConnectionTimeouts timeouts;
Poco::Net::HTTPBasicCredentials credentials{};
};
}

View File

@ -35,10 +35,12 @@ void DiskWebServer::initialize(const String & uri_path) const
LOG_TRACE(log, "Loading metadata for directory: {}", uri_path);
try
{
Poco::Net::HTTPBasicCredentials credentials{};
ReadWriteBufferFromHTTP metadata_buf(Poco::URI(fs::path(uri_path) / ".index"),
Poco::Net::HTTPRequest::HTTP_GET,
ReadWriteBufferFromHTTP::OutStreamCallback(),
ConnectionTimeouts::getHTTPTimeouts(getContext()));
ConnectionTimeouts::getHTTPTimeouts(getContext()),
credentials);
String file_name;
FileData file_data{};

View File

@ -53,8 +53,8 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
std::max(Poco::Timespan(settings.http_receive_timeout.totalSeconds(), 0), Poco::Timespan(20, 0)),
settings.tcp_keep_alive_timeout,
http_keep_alive_timeout),
0,
credentials,
0,
buf_size,
read_settings);
}

View File

@ -190,13 +190,14 @@ namespace detail
explicit ReadWriteBufferFromHTTPBase(
UpdatableSessionPtr session_,
Poco::URI uri_,
const Poco::Net::HTTPBasicCredentials & credentials_,
const std::string & method_ = {},
OutStreamCallback out_stream_callback_ = {},
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const ReadSettings & settings_ = {},
HTTPHeaderEntries http_header_entries_ = {},
const RemoteHostFilter & remote_host_filter_ = {})
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization = false)
: ReadBuffer(nullptr, 0)
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
@ -217,6 +218,9 @@ namespace detail
"must be http_max_tries >= 1 (current is {}) and "
"0 < http_retry_initial_backoff_ms < settings.http_retry_max_backoff_ms (but now {} > {})",
settings.http_max_tries, settings.http_retry_initial_backoff_ms, settings.http_retry_max_backoff_ms);
if (!delay_initialization)
initialize();
}
void initialize()
@ -268,51 +272,52 @@ namespace detail
size_t milliseconds_to_wait = settings.http_retry_initial_backoff_ms;
/// Default http_max_tries = 1.
for (size_t i = 0; (i < settings.http_max_tries) && !successful_read; ++i)
for (size_t i = 0; i < settings.http_max_tries; ++i)
{
while (milliseconds_to_wait < settings.http_retry_max_backoff_ms)
try
{
try
{
if (!impl)
initialize();
if (!impl)
initialize();
result = impl->next();
successful_read = true;
break;
}
catch (const Poco::Exception & e)
{
/**
* Retry request unconditionally if nothing has beed read yet.
* Otherwise if it is GET method retry with range header starting from bytes_read.
*/
bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET;
if (!can_retry_request)
throw;
/**
* if total_size is not known, last read can fail if we retry with
* bytes_read == total_size and header `bytes=bytes_read-`
* (we will get an error code 416 - range not satisfiable).
* In this case rethrow previous exception.
*/
if (exception && !total_bytes_to_read.has_value() && e.code() == 416)
std::rethrow_exception(exception);
LOG_ERROR(log,
"HTTP request to `{}` failed at try {}/{} with bytes read: {}. "
"Error: {}, code: {}. (Current backoff wait is {}/{} ms)",
uri.toString(), i, settings.http_max_tries, bytes_read, e.what(), e.code(),
milliseconds_to_wait, settings.http_retry_max_backoff_ms);
retry_with_range_header = true;
exception = std::current_exception();
impl.reset();
sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait *= 2;
}
result = impl->next();
successful_read = true;
break;
}
catch (const Poco::Exception & e)
{
/**
* Retry request unconditionally if nothing has beed read yet.
* Otherwise if it is GET method retry with range header starting from bytes_read.
*/
bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET;
if (!can_retry_request)
throw;
/**
* if total_size is not known, last read can fail if we retry with
* bytes_read == total_size and header `bytes=bytes_read-`
* (we will get an error code 416 - range not satisfiable).
* In this case rethrow previous exception.
*/
if (exception && !total_bytes_to_read.has_value() && e.code() == 416)
std::rethrow_exception(exception);
LOG_ERROR(log,
"HTTP request to `{}` failed at try {}/{} with bytes read: {}. "
"Error: {}, code: {}. (Current backoff wait is {}/{} ms)",
uri.toString(), i, settings.http_max_tries, bytes_read, e.what(), e.code(),
milliseconds_to_wait, settings.http_retry_max_backoff_ms);
retry_with_range_header = true;
exception = std::current_exception();
impl.reset();
sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait *= 2;
}
if (successful_read || milliseconds_to_wait >= settings.http_retry_max_backoff_ms)
break;
milliseconds_to_wait = settings.http_retry_initial_backoff_ms;
}
@ -384,14 +389,15 @@ public:
const std::string & method_,
OutStreamCallback out_stream_callback_,
const ConnectionTimeouts & timeouts,
const Poco::Net::HTTPBasicCredentials & credentials_,
const UInt64 max_redirects = 0,
const Poco::Net::HTTPBasicCredentials & credentials_ = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE,
const ReadSettings & settings_ = {},
const HTTPHeaderEntries & http_header_entries_ = {},
const RemoteHostFilter & remote_host_filter_ = {})
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization_ = true)
: Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects),
uri_, method_, out_stream_callback_, credentials_, buffer_size_, settings_, http_header_entries_, remote_host_filter_)
uri_, credentials_, method_, out_stream_callback_, buffer_size_, settings_, http_header_entries_, remote_host_filter_, delay_initialization_)
{
}
};
@ -435,9 +441,9 @@ public:
size_t max_connections_per_endpoint = DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT)
: Parent(std::make_shared<UpdatablePooledSession>(uri_, timeouts_, max_redirects, max_connections_per_endpoint),
uri_,
credentials_,
method_,
out_stream_callback_,
credentials_,
buffer_size_)
{
}

View File

@ -121,8 +121,8 @@ namespace
method,
callback,
timeouts,
context->getSettingsRef().max_http_get_redirects,
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,

View File

@ -82,7 +82,8 @@ ColumnsDescription ITableFunctionXDBC::getActualTableStructure(ContextPtr contex
columns_info_uri.addQueryParameter("external_table_functions_use_nulls",
Poco::NumberFormatter::format(use_nulls));
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context));
Poco::Net::HTTPBasicCredentials credentials{};
ReadWriteBufferFromHTTP buf(columns_info_uri, Poco::Net::HTTPRequest::HTTP_POST, {}, ConnectionTimeouts::getHTTPTimeouts(context), credentials);
std::string columns_info;
readStringBinary(columns_info, buf);