Merge pull request #34392 from kssenii/http-buffer-skip-not-found-url-for-globs

Allow to skip not found urls for globs
This commit is contained in:
Kseniia Sumarokova 2022-02-09 09:32:09 +01:00 committed by GitHub
commit 86956ca08b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 253 additions and 106 deletions

View File

@ -262,6 +262,7 @@ class IColumn;
M(UInt64, http_max_fields, 1000000, "Maximum number of fields in HTTP header", 0) \
M(UInt64, http_max_field_name_size, 1048576, "Maximum length of field name in HTTP header", 0) \
M(UInt64, http_max_field_value_size, 1048576, "Maximum length of field value in HTTP header", 0) \
M(Bool, http_skip_not_found_url_for_globs, true, "Skip url's for globs with HTTP_NOT_FOUND error", 0) \
M(Bool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown", 0) \
M(Bool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.", 0) \
M(Bool, joined_subquery_requires_alias, true, "Force joined subqueries and table functions to have aliases for correct name qualification.", 0) \

View File

@ -82,6 +82,7 @@ struct ReadSettings
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 1600;
bool http_skip_not_found_url_for_globs = true;
/// Set to true for MergeTree tables to make sure
/// that last position (offset in compressed file) is always passed.

View File

@ -129,6 +129,8 @@ namespace detail
/// In case of redirects, save result uri to use it if we retry the request.
std::optional<Poco::URI> saved_uri_redirect;
bool http_skip_not_found_url;
ReadSettings settings;
Poco::Logger * log;
@ -146,7 +148,7 @@ namespace detail
return read_range.begin + offset_from_begin_pos;
}
std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_)
std::istream * callImpl(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_)
{
// With empty path poco will send "POST HTTP/1.1" its bug.
if (uri_.getPath().empty())
@ -211,7 +213,7 @@ namespace detail
{
try
{
call(uri, response, Poco::Net::HTTPRequest::HTTP_HEAD);
call(response, Poco::Net::HTTPRequest::HTTP_HEAD);
while (isRedirect(response.getStatus()))
{
@ -220,7 +222,7 @@ namespace detail
session->updateSession(uri_redirect);
istr = call(uri_redirect, response, method);
istr = callImpl(uri_redirect, response, method);
}
break;
@ -237,6 +239,17 @@ namespace detail
return read_range.end;
}
enum class InitializeError
{
/// If error is not retriable, `exception` variable must be set.
NON_RETRIABLE_ERROR,
/// Allows to skip not found urls for globs
SKIP_NOT_FOUND_URL,
NONE,
};
InitializeError initialization_error = InitializeError::NONE;
public:
using NextCallback = std::function<void(size_t)>;
using OutStreamCallback = std::function<void(std::ostream &)>;
@ -253,7 +266,8 @@ namespace detail
Range read_range_ = {},
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization = false,
bool use_external_buffer_ = false)
bool use_external_buffer_ = false,
bool http_skip_not_found_url_ = false)
: SeekableReadBufferWithSize(nullptr, 0)
, uri {uri_}
, method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET}
@ -265,6 +279,7 @@ namespace detail
, buffer_size {buffer_size_}
, use_external_buffer {use_external_buffer_}
, read_range(read_range_)
, http_skip_not_found_url(http_skip_not_found_url_)
, settings {settings_}
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
{
@ -277,17 +292,45 @@ namespace detail
settings.http_max_tries, settings.http_retry_initial_backoff_ms, settings.http_retry_max_backoff_ms);
if (!delay_initialization)
{
initialize();
if (exception)
std::rethrow_exception(exception);
}
}
void call(Poco::Net::HTTPResponse & response, const String & method_)
{
try
{
istr = callImpl(saved_uri_redirect ? *saved_uri_redirect : uri, response, method_);
}
catch (...)
{
if (response.getStatus() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND
&& http_skip_not_found_url)
{
initialization_error = InitializeError::SKIP_NOT_FOUND_URL;
}
else
{
throw;
}
}
}
/**
* Note: In case of error return false if error is not retriable, otherwise throw.
* Throws if error is retriable, otherwise sets initialization_error = NON_RETRIABLE_ERROR and
* saves exception into `exception` variable. In case url is not found and skip_not_found_url == true,
* sets initialization_error = SKIP_NOT_FOUND_URL, otherwise throws.
*/
bool initialize()
void initialize()
{
Poco::Net::HTTPResponse response;
istr = call(saved_uri_redirect ? *saved_uri_redirect : uri, response, method);
call(response, method);
if (initialization_error != InitializeError::NONE)
return;
while (isRedirect(response.getStatus()))
{
@ -296,7 +339,7 @@ namespace detail
session->updateSession(uri_redirect);
istr = call(uri_redirect, response, method);
istr = callImpl(uri_redirect, response, method);
saved_uri_redirect = uri_redirect;
}
@ -310,7 +353,8 @@ namespace detail
Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE,
"Cannot read with range: [{}, {}]", read_range.begin, read_range.end ? *read_range.end : '-'));
return false;
initialization_error = InitializeError::NON_RETRIABLE_ERROR;
return;
}
else if (read_range.end)
{
@ -345,12 +389,14 @@ namespace detail
sess->attachSessionData(e.message());
throw;
}
return true;
}
bool nextImpl() override
{
if (initialization_error == InitializeError::SKIP_NOT_FOUND_URL)
return false;
assert(initialization_error == InitializeError::NONE);
if (next_callback)
next_callback(count());
@ -392,14 +438,16 @@ namespace detail
{
if (!impl)
{
/// If error is not retriable -- false is returned and exception is set.
/// Otherwise the error is thrown and retries continue.
bool initialized = initialize();
if (!initialized)
initialize();
if (initialization_error == InitializeError::NON_RETRIABLE_ERROR)
{
assert(exception);
break;
}
else if (initialization_error == InitializeError::SKIP_NOT_FOUND_URL)
{
return false;
}
if (use_external_buffer)
{
@ -570,11 +618,12 @@ public:
Range read_range_ = {},
const RemoteHostFilter & remote_host_filter_ = {},
bool delay_initialization_ = true,
bool use_external_buffer_ = false)
bool use_external_buffer_ = false,
bool skip_not_found_url_ = false)
: Parent(std::make_shared<UpdatableSession>(uri_, timeouts, max_redirects),
uri_, credentials_, method_, out_stream_callback_, buffer_size_,
settings_, http_header_entries_, read_range_, remote_host_filter_,
delay_initialization_, use_external_buffer_)
delay_initialization_, use_external_buffer_, skip_not_found_url_)
{
}
};

View File

@ -3148,6 +3148,7 @@ ReadSettings Context::getReadSettings() const
res.http_max_tries = settings.http_max_tries;
res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms;
res.http_retry_max_backoff_ms = settings.http_retry_max_backoff_ms;
res.http_skip_not_found_url_for_globs = settings.http_skip_not_found_url_for_globs;
res.mmap_cache = getMMappedFileCache().get();

View File

@ -36,6 +36,14 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int NETWORK_ERROR;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
static bool urlWithGlobs(const String & uri)
{
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos)
|| uri.find('|') != std::string::npos;
}
@ -62,6 +70,7 @@ IStorageURLBase::IStorageURLBase(
, partition_by(partition_by_)
{
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = getTableStructureFromData(format_name, uri, compression_method, headers, format_settings, context_);
@ -69,52 +78,12 @@ IStorageURLBase::IStorageURLBase(
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
}
ColumnsDescription IStorageURLBase::getTableStructureFromData(
const String & format,
const String & uri,
const String & compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
auto parsed_uri = Poco::URI(uri);
Poco::Net::HTTPBasicCredentials credentials;
std::string user_info = parsed_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
auto read_buffer_creator = [&]()
{
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
parsed_uri,
Poco::Net::HTTPRequest::HTTP_GET,
nullptr,
ConnectionTimeouts::getHTTPTimeouts(context),
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter()),
chooseCompressionMethod(parsed_uri.getPath(), compression_method));
};
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
}
namespace
{
@ -163,6 +132,20 @@ namespace
reader->cancel();
}
static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri)
{
const auto & user_info = request_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
}
StorageURLSource(
URIInfoPtr uri_info_,
const std::string & http_method,
@ -177,7 +160,8 @@ namespace
const ConnectionTimeouts & timeouts,
const String & compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const URIParams & params = {})
const URIParams & params = {},
bool glob_url = false)
: SourceWithProgress(sample_block), name(std::move(name_))
, uri_info(uri_info_)
{
@ -186,53 +170,43 @@ namespace
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this](const URIInfo::FailoverOptions & uri_options)
{
WriteBufferFromOwnString error_message;
for (auto option = uri_options.begin(); option < uri_options.end(); ++option)
if (uri_options.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
if (uri_options.size() > 1)
{
auto request_uri = Poco::URI(*option);
read_buf = getFirstAvailableURLReadBuffer(
uri_options, context, params, http_method,
callback, timeouts, compression_method, credentials, headers);
}
else
{
ReadSettings read_settings = context->getReadSettings();
bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs;
auto request_uri = Poco::URI(uri_options[0]);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
try
{
std::string user_info = request_uri.getUserInfo();
if (!user_info.empty())
{
std::size_t n = user_info.find(':');
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n + 1));
}
}
setCredentials(credentials, request_uri);
/// Get first alive uri.
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
http_method,
callback,
timeouts,
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter()),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
catch (...)
{
if (uri_options.size() == 1)
throw;
if (option == uri_options.end() - 1)
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri options are unreachable. {}", error_message.str());
error_message << *option << " error: " << getCurrentExceptionMessage(false) << "\n";
tryLogCurrentException(__PRETTY_FUNCTION__);
}
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
http_method,
callback,
timeouts,
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter(),
/* delay_initiliazation */true,
/* use_external_buffer */false,
/* skip_url_not_found_error */skip_url_not_found_error),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
@ -283,6 +257,65 @@ namespace
}
}
static std::unique_ptr<ReadBuffer> getFirstAvailableURLReadBuffer(
const std::vector<String> & urls,
ContextPtr context,
const URIParams & params,
const String & http_method,
std::function<void(std::ostream &)> callback,
const ConnectionTimeouts & timeouts,
const String & compression_method,
Poco::Net::HTTPBasicCredentials & credentials,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers)
{
String first_exception_message;
ReadSettings read_settings = context->getReadSettings();
for (auto option = urls.begin(); option != urls.end(); ++option)
{
bool skip_url_not_found_error = read_settings.http_skip_not_found_url_for_globs && option == std::prev(urls.end());
auto request_uri = Poco::URI(*option);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
setCredentials(credentials, request_uri);
try
{
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
http_method,
callback,
timeouts,
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter(),
/* delay_initiliazation */false,
/* use_external_buffer */false,
/* skip_url_not_found_error */skip_url_not_found_error),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
catch (...)
{
if (first_exception_message.empty())
first_exception_message = getCurrentExceptionMessage(false);
if (urls.size() == 1)
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", urls.size(), first_exception_message);
}
private:
using InitializeFunc = std::function<void(const URIInfo::FailoverOptions &)>;
InitializeFunc initialize;
@ -297,7 +330,7 @@ namespace
/// have R/W access to reader pointer.
std::mutex reader_mutex;
Poco::Net::HTTPBasicCredentials credentials{};
Poco::Net::HTTPBasicCredentials credentials;
};
}
@ -407,6 +440,70 @@ std::function<void(std::ostream &)> IStorageURLBase::getReadPOSTDataCallback(
}
ColumnsDescription IStorageURLBase::getTableStructureFromData(
const String & format,
const String & uri,
const String & compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
const std::optional<FormatSettings> & format_settings,
ContextPtr context)
{
ReadBufferCreator read_buffer_creator;
Poco::Net::HTTPBasicCredentials credentials;
if (urlWithGlobs(uri))
{
std::vector<String> urls_to_check;
size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
for (const auto & description : uri_descriptions)
{
auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses);
urls_to_check.insert(urls_to_check.end(), options.begin(), options.end());
}
read_buffer_creator = [&, urls_to_check]()
{
return StorageURLSource::getFirstAvailableURLReadBuffer(
urls_to_check,
context,
{},
Poco::Net::HTTPRequest::HTTP_GET,
{},
ConnectionTimeouts::getHTTPTimeouts(context),
compression_method,
credentials,
headers);
};
}
else
{
auto parsed_uri = Poco::URI(uri);
StorageURLSource::setCredentials(credentials, parsed_uri);
read_buffer_creator = [&]()
{
return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
parsed_uri,
Poco::Net::HTTPRequest::HTTP_GET,
nullptr,
ConnectionTimeouts::getHTTPTimeouts(context),
credentials,
context->getSettingsRef().max_http_get_redirects,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
ReadWriteBufferFromHTTP::Range{},
context->getRemoteHostFilter(),
/* delay_initiliazation */true),
chooseCompressionMethod(parsed_uri.getPath(), compression_method));
};
}
return readSchemaFromFormat(format, format_settings, read_buffer_creator, context);
}
Pipe IStorageURLBase::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -417,10 +514,8 @@ Pipe IStorageURLBase::read(
unsigned num_streams)
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
bool with_globs = (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos)
|| uri.find('|') != std::string::npos;
if (with_globs)
if (urlWithGlobs(uri))
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
@ -452,7 +547,7 @@ Pipe IStorageURLBase::read(
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
compression_method, headers, params, /* glob_url */true));
}
return Pipe::unitePipes(std::move(pipes));
}