diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index ef6d947bb67..fabd4693e33 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -126,6 +126,7 @@ struct Settings : public SettingsCollection M(SettingUInt64, mark_cache_min_lifetime, 10000, "If the maximum size of mark_cache is exceeded, delete only records older than mark_cache_min_lifetime seconds.") \ \ M(SettingFloat, max_streams_to_max_threads_ratio, 1, "Allows you to use more sources than the number of threads - to more evenly distribute work across threads. It is assumed that this is a temporary solution, since it will be possible in the future to make the number of sources equal to the number of threads, but for each source to dynamically select available work for itself.") \ + M(SettingFloat, max_streams_multiplier_for_merge_tables, 5, "Ask more streams when reading from Merge table. Streams will be spread across tables that Merge table will use. This allows more even distribution of work across threads and especially helpful when merged tables differ in size.") \ \ M(SettingString, network_compression_method, "LZ4", "Allows you to select the method of data compression when writing.") \ \ diff --git a/dbms/src/Storages/StorageMerge.cpp b/dbms/src/Storages/StorageMerge.cpp index 713ca9b7be9..49ac239d325 100644 --- a/dbms/src/Storages/StorageMerge.cpp +++ b/dbms/src/Storages/StorageMerge.cpp @@ -165,7 +165,7 @@ BlockInputStreams StorageMerge::read( const Context & context, QueryProcessingStage::Enum processed_stage, const size_t max_block_size, - const unsigned num_streams) + unsigned num_streams) { BlockInputStreams res; @@ -201,8 +201,10 @@ BlockInputStreams StorageMerge::read( return createSourceStreams( query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column); - size_t remaining_streams = num_streams; size_t tables_count = selected_tables.size(); + Float64 num_streams_multiplier = std::min(unsigned(tables_count), std::max(1U, unsigned(context.getSettingsRef().max_streams_multiplier_for_merge_tables))); + num_streams *= num_streams_multiplier; + size_t remaining_streams = num_streams; for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it) {