diff --git a/dbms/include/DB/Interpreters/Settings.h b/dbms/include/DB/Interpreters/Settings.h index 6e58df468cb..43f9f68fe69 100644 --- a/dbms/include/DB/Interpreters/Settings.h +++ b/dbms/include/DB/Interpreters/Settings.h @@ -119,7 +119,10 @@ struct Settings * (Чтобы большие запросы не вымывали кэш.) */ \ M(SettingUInt64, merge_tree_max_rows_to_use_cache, (1024 * 1024)) \ \ + /** Распределять чтение из MergeTree по потокам равномерно, обеспечивая стабильное среднее время исполнения каждого потока в пределах одного чтения. */ \ M(SettingBool, merge_tree_uniform_read_distribution, false) \ + /** Переиспользовать MergeTreeReader'ы после вычитывания подзадачи на чтение. */ \ + M(SettingBool, merge_tree_uniform_read_reuse_readers, false) \ \ /** Минимальная длина выражения expr = x1 OR ... expr = xN для оптимизации */ \ M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3) \ diff --git a/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h b/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h index 299c8ac1b1e..13dc0e9c43b 100644 --- a/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h +++ b/dbms/include/DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h @@ -69,7 +69,14 @@ protected: injectVirtualColumns(res); if (task->mark_ranges.empty()) + { + if (0 == storage.context.getSettings().merge_tree_uniform_read_reuse_readers) + { + reader = {}; + pre_reader = {}; + } task = {}; + } } return res; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index c1df0c93826..1d77361674c 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -369,14 +369,30 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( if (sum_marks > max_marks_to_use_cache) use_uncompressed_cache = false; - MergeTreeReadPoolPtr pool = std::make_shared( - threads, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true, - column_names); - BlockInputStreams res; /// @todo remove old code - if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 0) + if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 1) + { + MergeTreeReadPoolPtr pool = std::make_shared( + threads, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true, + column_names); + + for (std::size_t i = 0; i < threads; ++i) + res.emplace_back(new MergeTreeThreadBlockInputStream{ + i, pool, min_marks_for_concurrent_read, max_block_size, data, use_uncompressed_cache, prewhere_actions, + prewhere_column, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, virt_columns + }); + + /// Оценим общее количество строк - для прогресс-бара. + const std::size_t total_rows = data.index_granularity * sum_marks; + + /// Выставим приблизительное количество строк только для первого источника + static_cast(*res.front()).setTotalRowsApprox(total_rows); + + LOG_TRACE(log, "Reading approx. " << total_rows); + } + else if (sum_marks > 0) { const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1; @@ -458,22 +474,6 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads( if (!parts.empty()) throw Exception("Couldn't spread marks among threads", ErrorCodes::LOGICAL_ERROR); } - else if (sum_marks > 0) - { - for (std::size_t i = 0; i < threads; ++i) - res.emplace_back(new MergeTreeThreadBlockInputStream{ - i, pool, min_marks_for_concurrent_read, max_block_size, data, use_uncompressed_cache, prewhere_actions, - prewhere_column, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, virt_columns - }); - - /// Оценим общее количество строк - для прогресс-бара. - const std::size_t total_rows = data.index_granularity * sum_marks; - - /// Выставим приблизительное количество строк только для первого источника - static_cast(*res.front()).setTotalRowsApprox(total_rows); - - LOG_TRACE(log, "Reading approx. " << total_rows); - } return res; } @@ -503,31 +503,58 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal if (sum_marks > max_marks_to_use_cache) use_uncompressed_cache = false; - MergeTreeReadPoolPtr pool = std::make_shared( - parts.size(), sum_marks, min_marks_for_read_task, parts, data, prewhere_actions, prewhere_column, true, - column_names, true); - BlockInputStreams to_merge; - for (const auto i : ext::range(0, parts.size())) + if (settings.merge_tree_uniform_read_distribution == 1) { - BlockInputStreamPtr source_stream{ - new MergeTreeThreadBlockInputStream{ - i, pool, min_marks_for_read_task, max_block_size, data, use_uncompressed_cache, prewhere_actions, - prewhere_column, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, virt_columns - } - }; + MergeTreeReadPoolPtr pool = std::make_shared( + parts.size(), sum_marks, min_marks_for_read_task, parts, data, prewhere_actions, prewhere_column, true, + column_names, true); - to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression())); + for (const auto i : ext::range(0, parts.size())) + { + BlockInputStreamPtr source_stream{ + new MergeTreeThreadBlockInputStream{ + i, pool, min_marks_for_read_task, max_block_size, data, use_uncompressed_cache, prewhere_actions, + prewhere_column, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, virt_columns + } + }; + + to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression())); + } + + /// Оценим общее количество строк - для прогресс-бара. + const std::size_t total_rows = data.index_granularity * sum_marks; + + /// Выставим приблизительное количество строк только для первого источника + static_cast(*to_merge.front()).setTotalRowsApprox(total_rows); + + LOG_TRACE(log, "Reading approx. " << total_rows); } + else + { + for (size_t part_index = 0; part_index < parts.size(); ++part_index) + { + RangesInDataPart & part = parts[part_index]; - /// Оценим общее количество строк - для прогресс-бара. - const std::size_t total_rows = data.index_granularity * sum_marks; + BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream( + data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data, + part.data_part, part.ranges, use_uncompressed_cache, + prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size); - /// Выставим приблизительное количество строк только для первого источника - static_cast(*to_merge.front()).setTotalRowsApprox(total_rows); + for (const String & virt_column : virt_columns) + { + if (virt_column == "_part") + source_stream = new AddingConstColumnBlockInputStream( + source_stream, new DataTypeString, part.data_part->name, "_part"); + else if (virt_column == "_part_index") + source_stream = new AddingConstColumnBlockInputStream( + source_stream, new DataTypeUInt64, part.part_index_in_query, "_part_index"); + } - LOG_TRACE(log, "Reading approx. " << total_rows); + to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression())); + } + } BlockInputStreams res; if (to_merge.size() == 1)