From b1794a47c39dc466efe5ad1f8691ae4646531677 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 17 Dec 2021 14:03:37 +0300 Subject: [PATCH] StorageURL improve --- src/Storages/StorageURL.cpp | 87 ++++++++++++++++++++++++------------- 1 file changed, 56 insertions(+), 31 deletions(-) diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index fe05d168c31..5fa16a25900 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -95,11 +95,20 @@ namespace class StorageURLSource : public SourceWithProgress { + using URIParams = std::vector>; public: + struct URIInfo + { + using FailoverOptions = std::vector; + std::vector uri_list_to_read; + std::atomic next_uri_to_read = 0; + }; + using URIInfoPtr = std::shared_ptr; + StorageURLSource( - const std::vector & uri_options, + URIInfoPtr uri_info_, const std::string & http_method, std::function callback, const String & format, @@ -114,10 +123,12 @@ namespace const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {}, const URIParams & params = {}) : SourceWithProgress(sample_block), name(std::move(name_)) + , uri_info(uri_info_) { auto headers = getHeaders(headers_); + /// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline. - initialize = [=, this] + initialize = [=, this](const URIInfo::FailoverOptions & uri_options) { WriteBufferFromOwnString error_message; for (auto option = uri_options.begin(); option < uri_options.end(); ++option) @@ -135,10 +146,11 @@ namespace if (n != std::string::npos) { credentials.setUsername(user_info.substr(0, n)); - credentials.setPassword(user_info.substr(n+1)); + credentials.setPassword(user_info.substr(n + 1)); } } + /// Get first alive uri. read_buf = wrapReadBufferWithCompressionMethod( std::make_unique( request_uri, @@ -188,29 +200,34 @@ namespace Chunk generate() override { - if (initialize) + while (true) { - initialize(); - initialize = {}; + if (!reader) + { + auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1); + if (current_uri_pos >= uri_info->uri_list_to_read.size()) + return {}; + + auto current_uri = uri_info->uri_list_to_read[current_uri_pos]; + initialize(current_uri); + } + + Chunk chunk; + if (reader->pull(chunk)) + return chunk; + + pipeline->reset(); + reader.reset(); } - - if (!reader) - return {}; - - Chunk chunk; - if (reader->pull(chunk)) - return chunk; - - pipeline->reset(); - reader.reset(); - - return {}; } private: - std::function initialize; + using InitializeFunc = std::function; + InitializeFunc initialize; String name; + URIInfoPtr uri_info; + std::unique_ptr read_buf; std::unique_ptr pipeline; std::unique_ptr reader; @@ -332,7 +349,7 @@ Pipe IStorageURLBase::read( ContextPtr local_context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, - unsigned /*num_streams*/) + unsigned num_streams) { auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size); bool with_globs = (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) @@ -341,18 +358,23 @@ Pipe IStorageURLBase::read( if (with_globs) { size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements; - std::vector url_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses); - std::vector uri_options; + auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses); + + if (num_streams > uri_descriptions.size()) + num_streams = uri_descriptions.size(); + + /// For each uri (which acts like shard) check if it has failover options + auto uri_info = std::make_shared(); + for (const auto & description : uri_descriptions) + uri_info->uri_list_to_read.emplace_back(parseRemoteDescription(description, 0, description.size(), '|', max_addresses)); Pipes pipes; - for (const auto & url_description : url_descriptions) - { - /// For each uri (which acts like shard) check if it has failover options - uri_options = parseRemoteDescription(url_description, 0, url_description.size(), '|', max_addresses); - StoragePtr shard; + pipes.reserve(num_streams); + for (size_t i = 0; i < num_streams; ++i) + { pipes.emplace_back(std::make_shared( - uri_options, + uri_info, getReadMethod(), getReadPOSTDataCallback( column_names, metadata_snapshot, query_info, @@ -371,9 +393,10 @@ Pipe IStorageURLBase::read( } else { - std::vector uri_options{uri}; + auto uri_info = std::make_shared(); + uri_info->uri_list_to_read.emplace_back(std::vector{uri}); return Pipe(std::make_shared( - uri_options, + uri_info, getReadMethod(), getReadPOSTDataCallback( column_names, metadata_snapshot, query_info, @@ -402,8 +425,10 @@ Pipe StorageURLWithFailover::read( { auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size); + auto uri_info = std::make_shared(); + uri_info->uri_list_to_read.emplace_back(uri_options); auto pipe = Pipe(std::make_shared( - uri_options, + uri_info, getReadMethod(), getReadPOSTDataCallback( column_names, metadata_snapshot, query_info,