diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 9c8a3860807..8cbe9f8d6e1 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -527,7 +527,7 @@ StorageS3Source::StorageS3Source( const String & bucket_, const String & version_id_, std::shared_ptr file_iterator_, - const size_t download_thread_num_, + const size_t max_parsing_threads_, std::optional query_info_) : ISource(info.source_header, false) , WithContext(context_) @@ -546,7 +546,7 @@ StorageS3Source::StorageS3Source( , query_info(std::move(query_info_)) , requested_virtual_columns(info.requested_virtual_columns) , file_iterator(file_iterator_) - , download_thread_num(download_thread_num_) + , max_parsing_threads(max_parsing_threads_) , create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1) , create_reader_scheduler(threadPoolCallbackRunner(create_reader_pool, "CreateS3Reader")) { @@ -573,9 +573,17 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader() auto read_buf = createS3ReadBuffer(key_with_info.key, object_size); auto input_format = FormatFactory::instance().getInput( - format, *read_buf, sample_block, getContext(), max_block_size, - format_settings, std::nullopt, std::nullopt, - /* is_remote_fs */ true, compression_method); + format, + *read_buf, + sample_block, + getContext(), + max_block_size, + format_settings, + max_parsing_threads, + /* max_download_threads= */ std::nullopt, + /* is_remote_fs */ true, + compression_method); + if (query_info.has_value()) input_format->setQueryInfo(query_info.value(), getContext()); @@ -1035,7 +1043,9 @@ Pipe StorageS3::read( auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals()); - const size_t max_download_threads = local_context->getSettingsRef().max_download_threads; + const size_t max_threads = local_context->getSettingsRef().max_threads; + const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams); + for (size_t i = 0; i < num_streams; ++i) { pipes.emplace_back(std::make_shared( @@ -1051,7 +1061,7 @@ Pipe StorageS3::read( query_configuration.url.bucket, query_configuration.url.version_id, iterator_wrapper, - max_download_threads, + max_parsing_threads, query_info)); } diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index f0486a8a0b0..aadceb1217b 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -130,7 +130,7 @@ public: const String & bucket, const String & version_id, std::shared_ptr file_iterator_, - size_t download_thread_num, + size_t max_parsing_threads, std::optional query_info); ~StorageS3Source() override; @@ -218,7 +218,7 @@ private: NamesAndTypesList requested_virtual_columns; std::shared_ptr file_iterator; - size_t download_thread_num = 1; + size_t max_parsing_threads = 1; Poco::Logger * log = &Poco::Logger::get("StorageS3Source"); diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 617b421fa24..5638934cb6b 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -221,7 +221,7 @@ StorageURLSource::StorageURLSource( UInt64 max_block_size, const ConnectionTimeouts & timeouts, CompressionMethod compression_method, - size_t download_threads, + size_t max_parsing_threads, const SelectQueryInfo & query_info, const HTTPHeaderEntries & headers_, const URIParams & params, @@ -275,7 +275,6 @@ StorageURLSource::StorageURLSource( file_progress_callback(FileProgress(0, file_size)); } - // TODO: Pass max_parsing_threads and max_download_threads adjusted for num_streams. input_format = FormatFactory::instance().getInput( format, *read_buf, @@ -283,9 +282,9 @@ StorageURLSource::StorageURLSource( context, max_block_size, format_settings, - download_threads, - /*max_download_threads*/ std::nullopt, - /* is_remote_fs */ true, + max_parsing_threads, + /* max_download_threads= */ std::nullopt, + /* is_remote_fs= */ true, compression_method); input_format->setQueryInfo(query_info, context); @@ -706,8 +705,6 @@ Pipe IStorageURLBase::read( { auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size); - size_t max_download_threads = local_context->getSettingsRef().max_download_threads; - std::shared_ptr iterator_wrapper{nullptr}; bool is_url_with_globs = urlWithGlobs(uri); size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements; @@ -754,7 +751,9 @@ Pipe IStorageURLBase::read( Pipes pipes; pipes.reserve(num_streams); - size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams); + const size_t max_threads = local_context->getSettingsRef().max_threads; + const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams); + for (size_t i = 0; i < num_streams; ++i) { pipes.emplace_back(std::make_shared( @@ -775,7 +774,7 @@ Pipe IStorageURLBase::read( max_block_size, getHTTPTimeouts(local_context), compression_method, - download_threads, + max_parsing_threads, query_info, headers, params, @@ -793,7 +792,7 @@ Pipe StorageURLWithFailover::read( ContextPtr local_context, QueryProcessingStage::Enum processed_stage, size_t max_block_size, - size_t /*num_streams*/) + size_t num_streams) { auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size); @@ -807,6 +806,9 @@ Pipe StorageURLWithFailover::read( auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals()); + const size_t max_threads = local_context->getSettingsRef().max_threads; + const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams); + auto pipe = Pipe(std::make_shared( read_from_format_info, iterator_wrapper, @@ -819,7 +821,7 @@ Pipe StorageURLWithFailover::read( max_block_size, getHTTPTimeouts(local_context), compression_method, - local_context->getSettingsRef().max_download_threads, + max_parsing_threads, query_info, headers, params)); diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 140f3d42f7b..9b74e3236ca 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -170,7 +170,7 @@ public: UInt64 max_block_size, const ConnectionTimeouts & timeouts, CompressionMethod compression_method, - size_t download_threads, + size_t max_parsing_threads, const SelectQueryInfo & query_info, const HTTPHeaderEntries & headers_ = {}, const URIParams & params = {},