Merge pull request #5915 from yandex/merge-table-more-streams

Increase number of streams to SELECT from Merge table
This commit is contained in:
alexey-milovidov 2019-07-07 15:57:06 +03:00 committed by GitHub
commit 97b8b2c769
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 5 additions and 2 deletions

View File

@ -126,6 +126,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, mark_cache_min_lifetime, 10000, "If the maximum size of mark_cache is exceeded, delete only records older than mark_cache_min_lifetime seconds.") \
\
M(SettingFloat, max_streams_to_max_threads_ratio, 1, "Allows you to use more sources than the number of threads - to more evenly distribute work across threads. It is assumed that this is a temporary solution, since it will be possible in the future to make the number of sources equal to the number of threads, but for each source to dynamically select available work for itself.") \
M(SettingFloat, max_streams_multiplier_for_merge_tables, 5, "Ask more streams when reading from Merge table. Streams will be spread across tables that Merge table will use. This allows more even distribution of work across threads and especially helpful when merged tables differ in size.") \
\
M(SettingString, network_compression_method, "LZ4", "Allows you to select the method of data compression when writing.") \
\

View File

@ -165,7 +165,7 @@ BlockInputStreams StorageMerge::read(
const Context & context,
QueryProcessingStage::Enum processed_stage,
const size_t max_block_size,
const unsigned num_streams)
unsigned num_streams)
{
BlockInputStreams res;
@ -201,8 +201,10 @@ BlockInputStreams StorageMerge::read(
return createSourceStreams(
query_info, processed_stage, max_block_size, header, {}, {}, real_column_names, modified_context, 0, has_table_virtual_column);
size_t remaining_streams = num_streams;
size_t tables_count = selected_tables.size();
Float64 num_streams_multiplier = std::min(unsigned(tables_count), std::max(1U, unsigned(context.getSettingsRef().max_streams_multiplier_for_merge_tables)));
num_streams *= num_streams_multiplier;
size_t remaining_streams = num_streams;
for (auto it = selected_tables.begin(); it != selected_tables.end(); ++it)
{