Merge pull request #8308 from ClickHouse/deprecate-merge_tree_uniform_read_distribution

Deprecate "merge_tree_uniform_read_distribution" setting
This commit is contained in:
alexey-milovidov 2019-12-20 15:28:34 +03:00 committed by GitHub
commit 7f10630873
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 19 additions and 77 deletions

View File

@ -122,8 +122,6 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, merge_tree_max_rows_to_use_cache, (128 * 8192), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
M(SettingUInt64, merge_tree_max_bytes_to_use_cache, (192 * 10 * 1024 * 1024), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
\
M(SettingBool, merge_tree_uniform_read_distribution, true, "Distribute read from MergeTree over threads evenly, ensuring stable average execution time of each thread within one read operation.", 0) \
\
M(SettingUInt64, mysql_max_rows_to_insert, 65536, "The maximum number of rows in MySQL batch insertion of the MySQL storage engine", 0) \
\
M(SettingUInt64, optimize_min_equality_disjunction_chain_length, 3, "The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization ", 0) \
@ -393,6 +391,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13", 0) \
M(SettingBool, compile, false, "Obsolete setting, does nothing. Will be removed after 2020-03-13", 0) \
M(SettingUInt64, min_count_to_compile, 0, "Obsolete setting, does nothing. Will be removed after 2020-03-13", 0) \
M(SettingBool, merge_tree_uniform_read_distribution, true, "Obsolete setting, does nothing. Will be removed after 2020-05-20", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -139,7 +139,7 @@ bool isStorageTouchedByMutations(
return true;
}
context_copy.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_copy.getSettingsRef().max_streams_to_max_threads_ratio = 1;
context_copy.getSettingsRef().max_threads = 1;
ASTPtr select_query = prepareQueryAffectedAST(commands);

View File

@ -938,7 +938,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
auto storage_from_source_part = StorageFromMergeTreeDataPart::create(source_part);
auto context_for_reading = context;
context_for_reading.getSettingsRef().merge_tree_uniform_read_distribution = 0;
context_for_reading.getSettingsRef().max_streams_to_max_threads_ratio = 1;
context_for_reading.getSettingsRef().max_threads = 1;
std::vector<MutationCommand> commands_for_part;

View File

@ -747,9 +747,13 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
use_uncompressed_cache = false;
Pipes res;
if (0 == sum_marks)
return res;
if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 1)
if (num_streams > 1)
{
/// Parallel query execution.
/// Reduce the number of num_streams if the data is small.
if (sum_marks < num_streams * min_marks_for_concurrent_read && parts.size() < num_streams)
num_streams = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
@ -777,82 +781,22 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
res.emplace_back(std::move(source));
}
}
else if (sum_marks > 0)
else
{
const size_t min_marks_per_stream = (sum_marks - 1) / num_streams + 1;
/// Sequential query execution.
for (size_t i = 0; i < num_streams && !parts.empty(); ++i)
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
size_t need_marks = min_marks_per_stream;
RangesInDataPart & part = parts[part_index];
/// Loop over parts.
/// We will iteratively take part or some subrange of a part from the back
/// and assign a stream to read from it.
while (need_marks > 0 && !parts.empty())
{
RangesInDataPart part = parts.back();
parts.pop_back();
auto source = std::make_shared<MergeTreeSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges, use_uncompressed_cache,
query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true,
virt_columns, part.part_index_in_query);
size_t & marks_in_part = sum_marks_in_parts.back();
/// We will not take too few rows from a part.
if (marks_in_part >= min_marks_for_concurrent_read &&
need_marks < min_marks_for_concurrent_read)
need_marks = min_marks_for_concurrent_read;
/// Do not leave too few rows in the part.
if (marks_in_part > need_marks &&
marks_in_part - need_marks < min_marks_for_concurrent_read)
need_marks = marks_in_part;
MarkRanges ranges_to_get_from_part;
/// We take the whole part if it is small enough.
if (marks_in_part <= need_marks)
{
/// Restore the order of segments.
std::reverse(part.ranges.begin(), part.ranges.end());
ranges_to_get_from_part = part.ranges;
need_marks -= marks_in_part;
sum_marks_in_parts.pop_back();
}
else
{
/// Loop through ranges in part. Take enough ranges to cover "need_marks".
while (need_marks > 0)
{
if (part.ranges.empty())
throw Exception("Unexpected end of ranges while spreading marks among streams", ErrorCodes::LOGICAL_ERROR);
MarkRange & range = part.ranges.back();
const size_t marks_in_range = range.end - range.begin;
const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
range.begin += marks_to_get_from_range;
marks_in_part -= marks_to_get_from_range;
need_marks -= marks_to_get_from_range;
if (range.begin == range.end)
part.ranges.pop_back();
}
parts.emplace_back(part);
}
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, ranges_to_get_from_part,
use_uncompressed_cache, query_info.prewhere_info, true, settings.min_bytes_to_use_direct_io,
settings.max_read_buffer_size, true, virt_columns, part.part_index_in_query);
res.emplace_back(std::move(source_processor));
}
res.emplace_back(std::move(source));
}
if (!parts.empty())
throw Exception("Couldn't spread marks among streams", ErrorCodes::LOGICAL_ERROR);
}
return res;
@ -1102,8 +1046,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
Pipes pipes;
/// NOTE `merge_tree_uniform_read_distribution` is not used for FINAL
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
RangesInDataPart & part = parts[part_index];