diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index f0bfff38004..e36ccfe6141 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -3,34 +3,34 @@ #include #include #include -#include #include +#include -#include -#include -#include -#include #include #include #include +#include +#include +#include +#include #include #include #include #include +#include +#include #include #include "IO/HTTPCommon.h" #include "IO/ReadWriteBufferFromHTTP.h" -#include -#include -#include +#include +#include #include #include -#include #include -#include +#include namespace DB @@ -47,8 +47,7 @@ namespace ErrorCodes static bool urlWithGlobs(const String & uri) { - return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) - || uri.find('|') != std::string::npos; + return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos; } @@ -92,8 +91,7 @@ IStorageURLBase::IStorageURLBase( namespace { - ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders( - const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_) + ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders(const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_) { ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end()); // Propagate OpenTelemetry trace context, if any, downstream. @@ -102,13 +100,11 @@ namespace const auto & thread_trace_context = CurrentThread::get().thread_trace_context; if (thread_trace_context.trace_id != UUID()) { - headers.emplace_back("traceparent", - thread_trace_context.composeTraceparentHeader()); + headers.emplace_back("traceparent", thread_trace_context.composeTraceparentHeader()); if (!thread_trace_context.tracestate.empty()) { - headers.emplace_back("tracestate", - thread_trace_context.tracestate); + headers.emplace_back("tracestate", thread_trace_context.tracestate); } } } @@ -118,8 +114,7 @@ namespace class StorageURLSource : public SourceWithProgress { - - using URIParams = std::vector>; + using URIParams = std::vector>; public: struct URIInfo @@ -164,11 +159,11 @@ namespace UInt64 max_block_size, const ConnectionTimeouts & timeouts, const String & compression_method, + size_t download_threads, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {}, const URIParams & params = {}, bool glob_url = false) - : SourceWithProgress(sample_block), name(std::move(name_)) - , uri_info(uri_info_) + : SourceWithProgress(sample_block), name(std::move(name_)), uri_info(uri_info_) { auto headers = getHeaders(headers_); @@ -180,33 +175,40 @@ namespace auto first_option = uri_options.begin(); read_buf = getFirstAvailableURLReadBuffer( - first_option, uri_options.end(), context, params, http_method, - callback, timeouts, compression_method, credentials, headers, glob_url, uri_options.size() == 1); + first_option, + uri_options.end(), + context, + params, + http_method, + callback, + timeouts, + compression_method, + credentials, + headers, + glob_url, + uri_options.size() == 1, + download_threads); - auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings); + auto input_format + = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings); QueryPipelineBuilder builder; builder.init(Pipe(input_format)); - builder.addSimpleTransform([&](const Block & cur_header) - { - return std::make_shared(cur_header, columns, *input_format, context); - }); + builder.addSimpleTransform( + [&](const Block & cur_header) + { return std::make_shared(cur_header, columns, *input_format, context); }); pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder))); reader = std::make_unique(*pipeline); }; } - String getName() const override - { - return name; - } + String getName() const override { return name; } Chunk generate() override { while (true) { - if (!reader) { auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1); @@ -243,7 +245,8 @@ namespace Poco::Net::HTTPBasicCredentials & credentials, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, bool glob_url, - bool delay_initialization) + bool delay_initialization, + size_t download_threads) { String first_exception_message; ReadSettings read_settings = context->getReadSettings(); @@ -262,7 +265,7 @@ namespace const auto settings = context->getSettings(); try { - if (settings.max_download_threads > 1) + if (download_threads > 1) { try { @@ -296,7 +299,7 @@ namespace LOG_TRACE( &Poco::Logger::get("StorageURLSource"), "Using ParallelReadBuffer with {} workers with chunks of {} bytes", - settings.max_download_threads, + download_threads, settings.max_download_buffer_size); auto read_buffer_factory = std::make_unique( @@ -317,7 +320,7 @@ namespace /* skip_url_not_found_error */ skip_url_not_found_error); return wrapReadBufferWithCompressionMethod( std::make_unique( - std::move(read_buffer_factory), &IOThreadPool::get(), settings.max_download_threads), + std::move(read_buffer_factory), &IOThreadPool::get(), download_threads), chooseCompressionMethod(request_uri.getPath(), compression_method)); } } @@ -329,11 +332,7 @@ namespace } } - LOG_TRACE( - &Poco::Logger::get("StorageURLSource"), - "Using single-threaded read buffer", - settings.max_download_threads, - settings.max_download_buffer_size); + LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "Using single-threaded read buffer"); return wrapReadBufferWithCompressionMethod( std::make_unique( @@ -401,10 +400,10 @@ StorageURLSink::StorageURLSink( std::string content_encoding = toContentEncodingName(compression_method); write_buf = wrapWriteBufferWithCompressionMethod( - std::make_unique(Poco::URI(uri), http_method, content_type, content_encoding, timeouts), - compression_method, 3); - writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, - context, {} /* write callback */, format_settings); + std::make_unique(Poco::URI(uri), http_method, content_type, content_encoding, timeouts), + compression_method, + 3); + writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, {} /* write callback */, format_settings); } @@ -433,15 +432,15 @@ public: const ConnectionTimeouts & timeouts_, const CompressionMethod compression_method_, const String & http_method_) - : PartitionedSink(partition_by, context_, sample_block_) - , uri(uri_) - , format(format_) - , format_settings(format_settings_) - , sample_block(sample_block_) - , context(context_) - , timeouts(timeouts_) - , compression_method(compression_method_) - , http_method(http_method_) + : PartitionedSink(partition_by, context_, sample_block_) + , uri(uri_) + , format(format_) + , format_settings(format_settings_) + , sample_block(sample_block_) + , context(context_) + , timeouts(timeouts_) + , compression_method(compression_method_) + , http_method(http_method_) { } @@ -449,8 +448,8 @@ public: { auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id); context->getRemoteHostFilter().checkURL(Poco::URI(partition_path)); - return std::make_shared(partition_path, format, - format_settings, sample_block, context, timeouts, compression_method, http_method); + return std::make_shared( + partition_path, format, format_settings, sample_block, context, timeouts, compression_method, http_method); } private: @@ -540,7 +539,8 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( credentials, headers, false, - false); + false, + context->getSettingsRef().max_download_threads); }; try @@ -557,7 +557,10 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData( } while (++option < urls_to_check.end()); - throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from urls failed. Errors:\n{}", exception_messages); + throw Exception( + ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, + "All attempts to extract table structure from urls failed. Errors:\n{}", + exception_messages); } bool IStorageURLBase::isColumnOriented() const @@ -590,6 +593,8 @@ Pipe IStorageURLBase::read( block_for_format = metadata_snapshot->getSampleBlock(); } + size_t max_download_threads = local_context->getSettingsRef().max_download_threads; + if (urlWithGlobs(uri)) { size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements; @@ -606,14 +611,19 @@ Pipe IStorageURLBase::read( Pipes pipes; pipes.reserve(num_streams); + size_t remaining_download_threads = max_download_threads; + for (size_t i = 0; i < num_streams; ++i) { + size_t current_need_download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams); + size_t current_download_threads = std::min(current_need_download_threads, remaining_download_threads); + remaining_download_threads -= current_download_threads; + current_download_threads = std::max(static_cast(1), current_download_threads); + pipes.emplace_back(std::make_shared( uri_info, getReadMethod(), - getReadPOSTDataCallback( - column_names, columns_description, query_info, - local_context, processed_stage, max_block_size), + getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size), format_name, format_settings, getName(), @@ -622,7 +632,11 @@ Pipe IStorageURLBase::read( columns_description, max_block_size, ConnectionTimeouts::getHTTPTimeouts(local_context), - compression_method, headers, params, /* glob_url */true)); + compression_method, + current_download_threads, + headers, + params, + /* glob_url */ true)); } return Pipe::unitePipes(std::move(pipes)); } @@ -633,9 +647,7 @@ Pipe IStorageURLBase::read( return Pipe(std::make_shared( uri_info, getReadMethod(), - getReadPOSTDataCallback( - column_names, columns_description, query_info, - local_context, processed_stage, max_block_size), + getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size), format_name, format_settings, getName(), @@ -644,7 +656,10 @@ Pipe IStorageURLBase::read( columns_description, max_block_size, ConnectionTimeouts::getHTTPTimeouts(local_context), - compression_method, headers, params)); + compression_method, + max_download_threads, + headers, + params)); } } @@ -676,12 +691,10 @@ Pipe StorageURLWithFailover::read( auto uri_info = std::make_shared(); uri_info->uri_list_to_read.emplace_back(uri_options); - auto pipe = Pipe(std::make_shared( + auto pipe = Pipe(std::make_shared( uri_info, getReadMethod(), - getReadPOSTDataCallback( - column_names, columns_description, query_info, - local_context, processed_stage, max_block_size), + getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size), format_name, format_settings, getName(), @@ -690,7 +703,10 @@ Pipe StorageURLWithFailover::read( columns_description, max_block_size, ConnectionTimeouts::getHTTPTimeouts(local_context), - compression_method, headers, params)); + compression_method, + local_context->getSettingsRef().max_download_threads, + headers, + params)); std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng); return pipe; } @@ -710,17 +726,26 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad { return std::make_shared( partition_by_ast, - uri, format_name, - format_settings, metadata_snapshot->getSampleBlock(), context, + uri, + format_name, + format_settings, + metadata_snapshot->getSampleBlock(), + context, ConnectionTimeouts::getHTTPTimeouts(context), - chooseCompressionMethod(uri, compression_method), http_method); + chooseCompressionMethod(uri, compression_method), + http_method); } else { - return std::make_shared(uri, format_name, - format_settings, metadata_snapshot->getSampleBlock(), context, + return std::make_shared( + uri, + format_name, + format_settings, + metadata_snapshot->getSampleBlock(), + context, ConnectionTimeouts::getHTTPTimeouts(context), - chooseCompressionMethod(uri, compression_method), http_method); + chooseCompressionMethod(uri, compression_method), + http_method); } } @@ -737,8 +762,19 @@ StorageURL::StorageURL( const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_, const String & http_method_, ASTPtr partition_by_) - : IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_, - columns_, constraints_, comment, compression_method_, headers_, http_method_, partition_by_) + : IStorageURLBase( + uri_, + context_, + table_id_, + format_name_, + format_settings_, + columns_, + constraints_, + comment, + compression_method_, + headers_, + http_method_, + partition_by_) { context_->getRemoteHostFilter().checkURL(Poco::URI(uri)); } @@ -789,8 +825,7 @@ FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Argum // Apply changes from SETTINGS clause, with validation. user_format_settings.applyChanges(args.storage_def->settings->changes); - format_settings = getFormatSettings(args.getContext(), - user_format_settings); + format_settings = getFormatSettings(args.getContext(), user_format_settings); } else { @@ -809,12 +844,12 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex auto [common_configuration, storage_specific_args] = named_collection.value(); configuration.set(common_configuration); - if (!configuration.http_method.empty() - && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST + if (!configuration.http_method.empty() && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST && configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT) - throw Exception(ErrorCodes::BAD_ARGUMENTS, - "Http method can be POST or PUT (current: {}). For insert default is POST, for select GET", - configuration.http_method); + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Http method can be POST or PUT (current: {}). For insert default is POST, for select GET", + configuration.http_method); if (!storage_specific_args.empty()) { @@ -832,7 +867,8 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex { if (args.empty() || args.size() > 3) throw Exception( - "Storage URL requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional compression method.", + "Storage URL requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional " + "compression method.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (auto & arg : args) @@ -854,43 +890,45 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex void registerStorageURL(StorageFactory & factory) { - factory.registerStorage("URL", [](const StorageFactory::Arguments & args) - { - ASTs & engine_args = args.engine_args; - auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext()); - auto format_settings = StorageURL::getFormatSettingsFromArgs(args); - - ReadWriteBufferFromHTTP::HTTPHeaderEntries headers; - for (const auto & [header, value] : configuration.headers) + factory.registerStorage( + "URL", + [](const StorageFactory::Arguments & args) { - auto value_literal = value.safeGet(); - if (header == "Range") - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed"); - headers.emplace_back(std::make_pair(header, value_literal)); - } + ASTs & engine_args = args.engine_args; + auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext()); + auto format_settings = StorageURL::getFormatSettingsFromArgs(args); - ASTPtr partition_by; - if (args.storage_def->partition_by) - partition_by = args.storage_def->partition_by->clone(); + ReadWriteBufferFromHTTP::HTTPHeaderEntries headers; + for (const auto & [header, value] : configuration.headers) + { + auto value_literal = value.safeGet(); + if (header == "Range") + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed"); + headers.emplace_back(std::make_pair(header, value_literal)); + } - return StorageURL::create( - configuration.url, - args.table_id, - configuration.format, - format_settings, - args.columns, - args.constraints, - args.comment, - args.getContext(), - configuration.compression_method, - headers, - configuration.http_method, - partition_by); - }, - { - .supports_settings = true, - .supports_schema_inference = true, - .source_access_type = AccessType::URL, - }); + ASTPtr partition_by; + if (args.storage_def->partition_by) + partition_by = args.storage_def->partition_by->clone(); + + return StorageURL::create( + configuration.url, + args.table_id, + configuration.format, + format_settings, + args.columns, + args.constraints, + args.comment, + args.getContext(), + configuration.compression_method, + headers, + configuration.http_method, + partition_by); + }, + { + .supports_settings = true, + .supports_schema_inference = true, + .source_access_type = AccessType::URL, + }); } }