From eba3011adacd91955cc84d24b893d5224448aefa Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 7 Feb 2022 20:40:47 +0100 Subject: [PATCH 1/5] Fix --- src/Core/Settings.h | 1 + src/IO/ReadSettings.h | 1 + src/IO/ReadWriteBufferFromHTTP.h | 68 +++++++-- src/Interpreters/Context.cpp | 1 + src/Storages/StorageURL.cpp | 229 +++++++++++++++++++------------ 5 files changed, 202 insertions(+), 98 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 48dd637a943..c9019acf9f1 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -262,6 +262,7 @@ class IColumn; M(UInt64, http_max_fields, 1000000, "Maximum number of fields in HTTP header", 0) \ M(UInt64, http_max_field_name_size, 1048576, "Maximum length of field name in HTTP header", 0) \ M(UInt64, http_max_field_value_size, 1048576, "Maximum length of field value in HTTP header", 0) \ + M(Bool, http_skip_not_found_url_for_globs, true, "Skip url's for globs with HTTP_NOT_FOUND error", 0) \ M(Bool, optimize_throw_if_noop, false, "If setting is enabled and OPTIMIZE query didn't actually assign a merge then an explanatory exception is thrown", 0) \ M(Bool, use_index_for_in_with_subqueries, true, "Try using an index if there is a subquery or a table expression on the right side of the IN operator.", 0) \ M(Bool, joined_subquery_requires_alias, true, "Force joined subqueries and table functions to have aliases for correct name qualification.", 0) \ diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index f6c1158a896..e290cbab36b 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -82,6 +82,7 @@ struct ReadSettings size_t http_max_tries = 1; size_t http_retry_initial_backoff_ms = 100; size_t http_retry_max_backoff_ms = 1600; + bool http_skip_not_found_url_for_globs = true; /// Set to true for MergeTree tables to make sure /// that last position (offset in compressed file) is always passed. diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index 0314fa33f11..a9ab7a8348e 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -21,6 +21,7 @@ #include #include #include +#include namespace ProfileEvents @@ -129,6 +130,8 @@ namespace detail /// In case of redirects, save result uri to use it if we retry the request. std::optional saved_uri_redirect; + bool http_skip_not_found_url; + ReadSettings settings; Poco::Logger * log; @@ -146,7 +149,7 @@ namespace detail return read_range.begin + offset_from_begin_pos; } - std::istream * call(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_) + std::istream * callImpl(Poco::URI uri_, Poco::Net::HTTPResponse & response, const std::string & method_) { // With empty path poco will send "POST HTTP/1.1" its bug. if (uri_.getPath().empty()) @@ -211,7 +214,7 @@ namespace detail { try { - call(uri, response, Poco::Net::HTTPRequest::HTTP_HEAD); + call(response, Poco::Net::HTTPRequest::HTTP_HEAD); while (isRedirect(response.getStatus())) { @@ -220,7 +223,7 @@ namespace detail session->updateSession(uri_redirect); - istr = call(uri_redirect, response, method); + istr = callImpl(uri_redirect, response, method); } break; @@ -253,7 +256,8 @@ namespace detail Range read_range_ = {}, const RemoteHostFilter & remote_host_filter_ = {}, bool delay_initialization = false, - bool use_external_buffer_ = false) + bool use_external_buffer_ = false, + bool glob_url = false) : SeekableReadBufferWithSize(nullptr, 0) , uri {uri_} , method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} @@ -265,6 +269,7 @@ namespace detail , buffer_size {buffer_size_} , use_external_buffer {use_external_buffer_} , read_range(read_range_) + , http_skip_not_found_url(settings_.http_skip_not_found_url_for_globs && glob_url) , settings {settings_} , log(&Poco::Logger::get("ReadWriteBufferFromHTTP")) { @@ -280,14 +285,46 @@ namespace detail initialize(); } + enum class InitializeError + { + NON_RETRIABLE_ERROR, + SKIP_NOT_FOUND_URL, + NONE, + }; + + InitializeError call(Poco::Net::HTTPResponse & response, const String & method_) + { + try + { + istr = callImpl(saved_uri_redirect ? *saved_uri_redirect : uri, response, method_); + } + catch (...) + { + if (response.getStatus() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND + && http_skip_not_found_url) + { + return InitializeError::SKIP_NOT_FOUND_URL; + } + else + { + throw; + } + } + + return InitializeError::NONE; + } + /** * Note: In case of error return false if error is not retriable, otherwise throw. */ - bool initialize() + InitializeError initialize() { Poco::Net::HTTPResponse response; - istr = call(saved_uri_redirect ? *saved_uri_redirect : uri, response, method); + auto error = call(response, method); + if (error == InitializeError::SKIP_NOT_FOUND_URL) + return error; + assert(error == InitializeError::NONE); while (isRedirect(response.getStatus())) { @@ -296,7 +333,7 @@ namespace detail session->updateSession(uri_redirect); - istr = call(uri_redirect, response, method); + istr = callImpl(uri_redirect, response, method); saved_uri_redirect = uri_redirect; } @@ -310,7 +347,7 @@ namespace detail Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: [{}, {}]", read_range.begin, read_range.end ? *read_range.end : '-')); - return false; + return InitializeError::NON_RETRIABLE_ERROR; } else if (read_range.end) { @@ -346,7 +383,7 @@ namespace detail throw; } - return true; + return InitializeError::NONE; } bool nextImpl() override @@ -394,12 +431,16 @@ namespace detail { /// If error is not retriable -- false is returned and exception is set. /// Otherwise the error is thrown and retries continue. - bool initialized = initialize(); - if (!initialized) + auto error = initialize(); + if (error == InitializeError::NON_RETRIABLE_ERROR) { assert(exception); break; } + else if (error == InitializeError::SKIP_NOT_FOUND_URL) + { + return false; + } if (use_external_buffer) { @@ -570,11 +611,12 @@ public: Range read_range_ = {}, const RemoteHostFilter & remote_host_filter_ = {}, bool delay_initialization_ = true, - bool use_external_buffer_ = false) + bool use_external_buffer_ = false, + bool glob_url_ = false) : Parent(std::make_shared(uri_, timeouts, max_redirects), uri_, credentials_, method_, out_stream_callback_, buffer_size_, settings_, http_header_entries_, read_range_, remote_host_filter_, - delay_initialization_, use_external_buffer_) + delay_initialization_, use_external_buffer_, glob_url_) { } }; diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 0cb046b9de4..822f1dcb534 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3148,6 +3148,7 @@ ReadSettings Context::getReadSettings() const res.http_max_tries = settings.http_max_tries; res.http_retry_initial_backoff_ms = settings.http_retry_initial_backoff_ms; res.http_retry_max_backoff_ms = settings.http_retry_max_backoff_ms; + res.http_skip_not_found_url_for_globs = settings.http_skip_not_found_url_for_globs; res.mmap_cache = getMMappedFileCache().get(); diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 32ab126faa9..ee401ad4153 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -39,6 +39,13 @@ namespace ErrorCodes } +static bool urlWithGlobs(const String & uri) +{ + return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) + || uri.find('|') != std::string::npos; +} + + IStorageURLBase::IStorageURLBase( const String & uri_, ContextPtr context_, @@ -69,41 +76,12 @@ IStorageURLBase::IStorageURLBase( } else storage_metadata.setColumns(columns_); + storage_metadata.setConstraints(constraints_); storage_metadata.setComment(comment); setInMemoryMetadata(storage_metadata); } -ColumnsDescription IStorageURLBase::getTableStructureFromData( - const String & format, - const String & uri, - const String & compression_method, - const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, - const std::optional & format_settings, - ContextPtr context) -{ - auto read_buffer_creator = [&]() - { - auto parsed_uri = Poco::URI(uri); - return wrapReadBufferWithCompressionMethod( - std::make_unique( - parsed_uri, - Poco::Net::HTTPRequest::HTTP_GET, - nullptr, - ConnectionTimeouts::getHTTPTimeouts(context), - Poco::Net::HTTPBasicCredentials{}, - context->getSettingsRef().max_http_get_redirects, - DBMS_DEFAULT_BUFFER_SIZE, - context->getReadSettings(), - headers, - ReadWriteBufferFromHTTP::Range{}, - context->getRemoteHostFilter()), - chooseCompressionMethod(parsed_uri.getPath(), compression_method)); - }; - - return readSchemaFromFormat(format, format_settings, read_buffer_creator, context); -} - namespace { ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders( @@ -165,7 +143,8 @@ namespace const ConnectionTimeouts & timeouts, const String & compression_method, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {}, - const URIParams & params = {}) + const URIParams & params = {}, + bool glob_url = false) : SourceWithProgress(sample_block), name(std::move(name_)) , uri_info(uri_info_) { @@ -174,54 +153,10 @@ namespace /// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline. initialize = [=, this](const URIInfo::FailoverOptions & uri_options) { - WriteBufferFromOwnString error_message; - for (auto option = uri_options.begin(); option < uri_options.end(); ++option) - { - auto request_uri = Poco::URI(*option); - for (const auto & [param, value] : params) - request_uri.addQueryParameter(param, value); - - try - { - std::string user_info = request_uri.getUserInfo(); - if (!user_info.empty()) - { - std::size_t n = user_info.find(':'); - if (n != std::string::npos) - { - credentials.setUsername(user_info.substr(0, n)); - credentials.setPassword(user_info.substr(n + 1)); - } - } - - /// Get first alive uri. - read_buf = wrapReadBufferWithCompressionMethod( - std::make_unique( - request_uri, - http_method, - callback, - timeouts, - credentials, - context->getSettingsRef().max_http_get_redirects, - DBMS_DEFAULT_BUFFER_SIZE, - context->getReadSettings(), - headers, - ReadWriteBufferFromHTTP::Range{}, - context->getRemoteHostFilter()), - chooseCompressionMethod(request_uri.getPath(), compression_method)); - } - catch (...) - { - if (uri_options.size() == 1) - throw; - - if (option == uri_options.end() - 1) - throw Exception(ErrorCodes::NETWORK_ERROR, "All uri options are unreachable. {}", error_message.str()); - - error_message << *option << " error: " << getCurrentExceptionMessage(false) << "\n"; - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } + read_buf = getFirstAvailableURLReadBuffer( + uri_options, context, params, http_method, + callback, timeouts, compression_method, headers, + /* skip_url_not_found_error */glob_url); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings); QueryPipelineBuilder builder; @@ -271,6 +206,72 @@ namespace } } + static std::unique_ptr getFirstAvailableURLReadBuffer( + const std::vector & urls, + ContextPtr context, + const URIParams & params, + const String & http_method, + std::function callback, + const ConnectionTimeouts & timeouts, + const String & compression_method, + const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, + bool skip_url_not_found_error) + { + Poco::Net::HTTPBasicCredentials credentials{}; + String first_exception_message; + + for (auto option = urls.begin(); option < urls.end(); ++option) + { + auto request_uri = Poco::URI(*option); + for (const auto & [param, value] : params) + request_uri.addQueryParameter(param, value); + + try + { + std::string user_info = request_uri.getUserInfo(); + if (!user_info.empty()) + { + std::size_t n = user_info.find(':'); + if (n != std::string::npos) + { + credentials.setUsername(user_info.substr(0, n)); + credentials.setPassword(user_info.substr(n + 1)); + } + } + + return wrapReadBufferWithCompressionMethod( + std::make_unique( + request_uri, + http_method, + callback, + timeouts, + credentials, + context->getSettingsRef().max_http_get_redirects, + DBMS_DEFAULT_BUFFER_SIZE, + context->getReadSettings(), + headers, + ReadWriteBufferFromHTTP::Range{}, + context->getRemoteHostFilter(), + /* delay_initiliazation */urls.size() == 1, + /* use_external_buffer */false, + skip_url_not_found_error), + chooseCompressionMethod(request_uri.getPath(), compression_method)); + } + catch (...) + { + if (first_exception_message.empty()) + first_exception_message = getCurrentExceptionMessage(false); + + if (urls.size() == 1) + throw; + + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + + throw Exception(ErrorCodes::NETWORK_ERROR, "All uri ({}) options are unreachable: {}", urls.size(), first_exception_message); + } + private: using InitializeFunc = std::function; InitializeFunc initialize; @@ -284,8 +285,6 @@ namespace /// onCancell and generate can be called concurrently and both of them /// have R/W access to reader pointer. std::mutex reader_mutex; - - Poco::Net::HTTPBasicCredentials credentials{}; }; } @@ -395,6 +394,68 @@ std::function IStorageURLBase::getReadPOSTDataCallback( } +ColumnsDescription IStorageURLBase::getTableStructureFromData( + const String & format, + const String & uri, + const String & compression_method, + const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, + const std::optional & format_settings, + ContextPtr context) +{ + ReadBufferCreator read_buffer_creator; + + if (urlWithGlobs(uri)) + { + std::vector urls_to_check; + + size_t max_addresses = context->getSettingsRef().glob_expansion_max_elements; + auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses); + for (const auto & description : uri_descriptions) + { + auto options = parseRemoteDescription(description, 0, description.size(), '|', max_addresses); + urls_to_check.insert(urls_to_check.end(), options.begin(), options.end()); + } + + read_buffer_creator = [&, urls_to_check]() + { + return StorageURLSource::getFirstAvailableURLReadBuffer( + urls_to_check, + context, + {}, + Poco::Net::HTTPRequest::HTTP_GET, + {}, + ConnectionTimeouts::getHTTPTimeouts(context), + compression_method, + headers, + /* skip_url_not_found_error */true); + }; + } + else + { + read_buffer_creator = [&]() + { + auto parsed_uri = Poco::URI(uri); + return wrapReadBufferWithCompressionMethod( + std::make_unique( + parsed_uri, + Poco::Net::HTTPRequest::HTTP_GET, + nullptr, + ConnectionTimeouts::getHTTPTimeouts(context), + Poco::Net::HTTPBasicCredentials{}, + context->getSettingsRef().max_http_get_redirects, + DBMS_DEFAULT_BUFFER_SIZE, + context->getReadSettings(), + headers, + ReadWriteBufferFromHTTP::Range{}, + context->getRemoteHostFilter(), + /* delay_initiliazation */true), + chooseCompressionMethod(parsed_uri.getPath(), compression_method)); + }; + } + + return readSchemaFromFormat(format, format_settings, read_buffer_creator, context); +} + Pipe IStorageURLBase::read( const Names & column_names, const StorageMetadataPtr & metadata_snapshot, @@ -405,10 +466,8 @@ Pipe IStorageURLBase::read( unsigned num_streams) { auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size); - bool with_globs = (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) - || uri.find('|') != std::string::npos; - if (with_globs) + if (urlWithGlobs(uri)) { size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements; auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses); @@ -440,7 +499,7 @@ Pipe IStorageURLBase::read( metadata_snapshot->getColumns(), max_block_size, ConnectionTimeouts::getHTTPTimeouts(local_context), - compression_method, headers, params)); + compression_method, headers, params, /* glob_url */true)); } return Pipe::unitePipes(std::move(pipes)); } From 3834bdbae094f113058e06d5eced0b969317b44e Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 8 Feb 2022 10:59:20 +0100 Subject: [PATCH 2/5] Fixes --- src/IO/ReadWriteBufferFromHTTP.h | 57 ++++++++++++++++++-------------- src/Storages/StorageURL.cpp | 21 +++++++----- 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index a9ab7a8348e..d7b75c405ba 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -240,6 +240,17 @@ namespace detail return read_range.end; } + enum class InitializeError + { + /// If error is not retriable, `exception` variable must be set. + NON_RETRIABLE_ERROR, + /// Allows to skip not found urls for globs + SKIP_NOT_FOUND_URL, + NONE, + }; + + InitializeError initialization_error = InitializeError::NONE; + public: using NextCallback = std::function; using OutStreamCallback = std::function; @@ -282,17 +293,14 @@ namespace detail settings.http_max_tries, settings.http_retry_initial_backoff_ms, settings.http_retry_max_backoff_ms); if (!delay_initialization) + { initialize(); + if (exception) + std::rethrow_exception(exception); + } } - enum class InitializeError - { - NON_RETRIABLE_ERROR, - SKIP_NOT_FOUND_URL, - NONE, - }; - - InitializeError call(Poco::Net::HTTPResponse & response, const String & method_) + void call(Poco::Net::HTTPResponse & response, const String & method_) { try { @@ -303,28 +311,27 @@ namespace detail if (response.getStatus() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND && http_skip_not_found_url) { - return InitializeError::SKIP_NOT_FOUND_URL; + initialization_error = InitializeError::SKIP_NOT_FOUND_URL; } else { throw; } } - - return InitializeError::NONE; } /** - * Note: In case of error return false if error is not retriable, otherwise throw. + * Throws if error is not retriable, otherwise sets initialization_error = NON_RETRIABLE_ERROR and + * saves exception into `exception` variable. In case url is not found and skip_not_found_url == true, + * sets initialization_error = DKIP_NOT_FOUND_URL, otherwise throws. */ - InitializeError initialize() + void initialize() { Poco::Net::HTTPResponse response; - auto error = call(response, method); - if (error == InitializeError::SKIP_NOT_FOUND_URL) - return error; - assert(error == InitializeError::NONE); + call(response, method); + if (initialization_error != InitializeError::NONE) + return; while (isRedirect(response.getStatus())) { @@ -347,7 +354,7 @@ namespace detail Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: [{}, {}]", read_range.begin, read_range.end ? *read_range.end : '-')); - return InitializeError::NON_RETRIABLE_ERROR; + initialization_error = InitializeError::NON_RETRIABLE_ERROR; } else if (read_range.end) { @@ -382,12 +389,14 @@ namespace detail sess->attachSessionData(e.message()); throw; } - - return InitializeError::NONE; } bool nextImpl() override { + if (initialization_error == InitializeError::SKIP_NOT_FOUND_URL) + return false; + assert(initialization_error == InitializeError::NONE); + if (next_callback) next_callback(count()); @@ -429,15 +438,13 @@ namespace detail { if (!impl) { - /// If error is not retriable -- false is returned and exception is set. - /// Otherwise the error is thrown and retries continue. - auto error = initialize(); - if (error == InitializeError::NON_RETRIABLE_ERROR) + initialize(); + if (initialization_error == InitializeError::NON_RETRIABLE_ERROR) { assert(exception); break; } - else if (error == InitializeError::SKIP_NOT_FOUND_URL) + else if (initialization_error == InitializeError::SKIP_NOT_FOUND_URL) { return false; } diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index ee401ad4153..574cc691e29 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -69,6 +69,7 @@ IStorageURLBase::IStorageURLBase( , partition_by(partition_by_) { StorageInMemoryMetadata storage_metadata; + if (columns_.empty()) { auto columns = getTableStructureFromData(format_name, uri, compression_method, headers, format_settings, context_); @@ -155,8 +156,8 @@ namespace { read_buf = getFirstAvailableURLReadBuffer( uri_options, context, params, http_method, - callback, timeouts, compression_method, headers, - /* skip_url_not_found_error */glob_url); + callback, timeouts, compression_method, credentials, headers, + /* glob_url */glob_url); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings); QueryPipelineBuilder builder; @@ -214,15 +215,16 @@ namespace std::function callback, const ConnectionTimeouts & timeouts, const String & compression_method, + Poco::Net::HTTPBasicCredentials & credentials, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, - bool skip_url_not_found_error) + bool glob_url) { - Poco::Net::HTTPBasicCredentials credentials{}; String first_exception_message; + bool skip_url_not_found_error = urls.size() == 1 && glob_url; - for (auto option = urls.begin(); option < urls.end(); ++option) + for (const auto & option : urls) { - auto request_uri = Poco::URI(*option); + auto request_uri = Poco::URI(option); for (const auto & [param, value] : params) request_uri.addQueryParameter(param, value); @@ -285,6 +287,8 @@ namespace /// onCancell and generate can be called concurrently and both of them /// have R/W access to reader pointer. std::mutex reader_mutex; + + Poco::Net::HTTPBasicCredentials credentials; }; } @@ -403,6 +407,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( ContextPtr context) { ReadBufferCreator read_buffer_creator; + Poco::Net::HTTPBasicCredentials credentials; if (urlWithGlobs(uri)) { @@ -426,8 +431,8 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( {}, ConnectionTimeouts::getHTTPTimeouts(context), compression_method, - headers, - /* skip_url_not_found_error */true); + credentials, + headers, /* glob_url */true); }; } else From dc5f035265a5130c07627dea0a36e5bd2463b5cc Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 8 Feb 2022 12:57:35 +0100 Subject: [PATCH 3/5] Fix --- src/IO/ReadWriteBufferFromHTTP.h | 13 ++--- src/Storages/StorageURL.cpp | 88 +++++++++++++++++++++++--------- 2 files changed, 71 insertions(+), 30 deletions(-) diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index d7b75c405ba..c8e360ef867 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -268,7 +268,7 @@ namespace detail const RemoteHostFilter & remote_host_filter_ = {}, bool delay_initialization = false, bool use_external_buffer_ = false, - bool glob_url = false) + bool http_skip_not_found_url_ = false) : SeekableReadBufferWithSize(nullptr, 0) , uri {uri_} , method {!method_.empty() ? method_ : out_stream_callback_ ? Poco::Net::HTTPRequest::HTTP_POST : Poco::Net::HTTPRequest::HTTP_GET} @@ -280,7 +280,7 @@ namespace detail , buffer_size {buffer_size_} , use_external_buffer {use_external_buffer_} , read_range(read_range_) - , http_skip_not_found_url(settings_.http_skip_not_found_url_for_globs && glob_url) + , http_skip_not_found_url(http_skip_not_found_url_) , settings {settings_} , log(&Poco::Logger::get("ReadWriteBufferFromHTTP")) { @@ -321,9 +321,9 @@ namespace detail } /** - * Throws if error is not retriable, otherwise sets initialization_error = NON_RETRIABLE_ERROR and + * Throws if error is retriable, otherwise sets initialization_error = NON_RETRIABLE_ERROR and * saves exception into `exception` variable. In case url is not found and skip_not_found_url == true, - * sets initialization_error = DKIP_NOT_FOUND_URL, otherwise throws. + * sets initialization_error = SKIP_NOT_FOUND_URL, otherwise throws. */ void initialize() { @@ -355,6 +355,7 @@ namespace detail "Cannot read with range: [{}, {}]", read_range.begin, read_range.end ? *read_range.end : '-')); initialization_error = InitializeError::NON_RETRIABLE_ERROR; + return; } else if (read_range.end) { @@ -619,11 +620,11 @@ public: const RemoteHostFilter & remote_host_filter_ = {}, bool delay_initialization_ = true, bool use_external_buffer_ = false, - bool glob_url_ = false) + bool skip_not_found_url_ = false) : Parent(std::make_shared(uri_, timeouts, max_redirects), uri_, credentials_, method_, out_stream_callback_, buffer_size_, settings_, http_header_entries_, read_range_, remote_host_filter_, - delay_initialization_, use_external_buffer_, glob_url_) + delay_initialization_, use_external_buffer_, skip_not_found_url_) { } }; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 574cc691e29..562a429f190 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -130,6 +130,20 @@ namespace reader->cancel(); } + static void setCredentials(Poco::Net::HTTPBasicCredentials & credentials, const Poco::URI & request_uri) + { + const auto & user_info = request_uri.getUserInfo(); + if (!user_info.empty()) + { + std::size_t n = user_info.find(':'); + if (n != std::string::npos) + { + credentials.setUsername(user_info.substr(0, n)); + credentials.setPassword(user_info.substr(n + 1)); + } + } + } + StorageURLSource( URIInfoPtr uri_info_, const std::string & http_method, @@ -154,10 +168,44 @@ namespace /// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline. initialize = [=, this](const URIInfo::FailoverOptions & uri_options) { - read_buf = getFirstAvailableURLReadBuffer( - uri_options, context, params, http_method, - callback, timeouts, compression_method, credentials, headers, - /* glob_url */glob_url); + if (uri_options.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list"); + + if (uri_options.size() > 1) + { + read_buf = getFirstAvailableURLReadBuffer( + uri_options, context, params, http_method, + callback, timeouts, compression_method, credentials, headers); + } + else + { + ReadSettings read_settings = context->getReadSettings(); + bool skip_url_not_found_error = glob_url && read_settings.http_skip_not_found_url_for_globs; + auto request_uri = Poco::URI(uri_options[0]); + + for (const auto & [param, value] : params) + request_uri.addQueryParameter(param, value); + + setCredentials(credentials, request_uri); + + read_buf = wrapReadBufferWithCompressionMethod( + std::make_unique( + request_uri, + http_method, + callback, + timeouts, + credentials, + context->getSettingsRef().max_http_get_redirects, + DBMS_DEFAULT_BUFFER_SIZE, + read_settings, + headers, + ReadWriteBufferFromHTTP::Range{}, + context->getRemoteHostFilter(), + /* delay_initiliazation */true, + /* use_external_buffer */false, + /* skip_url_not_found_error */skip_url_not_found_error), + chooseCompressionMethod(request_uri.getPath(), compression_method)); + } auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings); QueryPipelineBuilder builder; @@ -216,31 +264,23 @@ namespace const ConnectionTimeouts & timeouts, const String & compression_method, Poco::Net::HTTPBasicCredentials & credentials, - const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, - bool glob_url) + const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers) { String first_exception_message; - bool skip_url_not_found_error = urls.size() == 1 && glob_url; + ReadSettings read_settings = context->getReadSettings(); - for (const auto & option : urls) + for (auto option = urls.begin(); option != urls.end(); ++option) { - auto request_uri = Poco::URI(option); + bool skip_url_not_found_error = read_settings.http_skip_not_found_url_for_globs && option == std::prev(urls.end()); + auto request_uri = Poco::URI(*option); + for (const auto & [param, value] : params) request_uri.addQueryParameter(param, value); + setCredentials(credentials, request_uri); + try { - std::string user_info = request_uri.getUserInfo(); - if (!user_info.empty()) - { - std::size_t n = user_info.find(':'); - if (n != std::string::npos) - { - credentials.setUsername(user_info.substr(0, n)); - credentials.setPassword(user_info.substr(n + 1)); - } - } - return wrapReadBufferWithCompressionMethod( std::make_unique( request_uri, @@ -250,13 +290,13 @@ namespace credentials, context->getSettingsRef().max_http_get_redirects, DBMS_DEFAULT_BUFFER_SIZE, - context->getReadSettings(), + read_settings, headers, ReadWriteBufferFromHTTP::Range{}, context->getRemoteHostFilter(), - /* delay_initiliazation */urls.size() == 1, + /* delay_initiliazation */false, /* use_external_buffer */false, - skip_url_not_found_error), + /* skip_url_not_found_error */skip_url_not_found_error), chooseCompressionMethod(request_uri.getPath(), compression_method)); } catch (...) @@ -432,7 +472,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( ConnectionTimeouts::getHTTPTimeouts(context), compression_method, credentials, - headers, /* glob_url */true); + headers); }; } else From cae1517693added79a7fd956763fabb18e68357a Mon Sep 17 00:00:00 2001 From: Kruglov Pavel <48961922+Avogar@users.noreply.github.com> Date: Tue, 8 Feb 2022 17:31:02 +0300 Subject: [PATCH 4/5] Fix build --- src/Storages/StorageURL.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index f90fc164704..81554eef771 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -479,7 +479,7 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( else { auto parsed_uri = Poco::URI(uri); - setCredentials(credentials, parsed_uri); + StorageURLSource::setCredentials(credentials, parsed_uri); read_buffer_creator = [&]() { return wrapReadBufferWithCompressionMethod( From e8a8f46864553243ab2c06f90bf9edb1757ed69f Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 8 Feb 2022 21:58:03 +0100 Subject: [PATCH 5/5] Fix style check --- src/IO/ReadWriteBufferFromHTTP.h | 1 - src/Storages/StorageURL.cpp | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index f0604176ce1..4522f863db6 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -21,7 +21,6 @@ #include #include #include -#include namespace ProfileEvents diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 81554eef771..fc58f8226aa 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -36,6 +36,7 @@ namespace ErrorCodes extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int NETWORK_ERROR; extern const int BAD_ARGUMENTS; + extern const int LOGICAL_ERROR; }