Add setting do_not_merge_across_partitions

This commit is contained in:
Pavel Kruglov 2020-10-13 17:54:52 +03:00
parent f64344f7c5
commit 8200bab859
2 changed files with 167 additions and 123 deletions

View File

@ -1237,144 +1237,186 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
Pipe pipe;
{
Pipes pipes;
for (const auto & part : parts)
{
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges,
use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
pipes.emplace_back(std::move(source_processor));
}
pipe = Pipe::unitePipes(std::move(pipes));
}
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform([&metadata_snapshot](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, metadata_snapshot->getSortingKey().expression);
});
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipe.getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
auto get_merging_processor = [&]() -> MergingTransformPtr
{
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
{
return std::make_shared<MergingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, max_block_size);
}
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.sign_column, true, max_block_size);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.columns_to_sum, partition_key_columns, max_block_size);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, max_block_size);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.version_column, max_block_size);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.sign_column, max_block_size);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
__builtin_unreachable();
};
if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads;
if (num_streams <= 1 || sort_description.empty())
std::vector<RangesInDataParts::iterator> parts_to_merge_ranges;
auto it = parts.begin();
parts_to_merge_ranges.push_back(it);
if (data_settings->do_not_merge_across_partitions_select_final)
{
pipe.addTransform(get_merging_processor());
return pipe;
}
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (auto & desc : sort_description)
{
if (!desc.column_name.empty())
key_columns.push_back(header.getPositionByName(desc.column_name));
else
key_columns.emplace_back(desc.column_number);
}
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<AddingSelectorTransform>(stream_header, num_streams, key_columns);
});
pipe.transform([&](OutputPortRawPtrs ports)
{
Processors processors;
std::vector<OutputPorts::iterator> output_ports;
processors.reserve(ports.size() + num_streams);
output_ports.reserve(ports.size());
for (auto & port : ports)
while (it != parts.end())
{
auto copier = std::make_shared<CopyTransform>(header, num_streams);
connect(*port, copier->getInputPort());
output_ports.emplace_back(copier->getOutputs().begin());
processors.emplace_back(std::move(copier));
it = std::find_if(
it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; });
parts_to_merge_ranges.push_back(it);
}
num_streams /= (parts_to_merge_ranges.size() - 1);
}
else
{
parts_to_merge_ranges.push_back(parts.end());
}
Pipes select_and_merge_pipes;
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{
Pipe pipe;
for (size_t i = 0; i < num_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
auto input = merge->getInputs().begin();
Pipes pipes;
/// Connect i-th merge with i-th input port of every copier.
for (size_t j = 0; j < ports.size(); ++j)
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
{
connect(*output_ports[j], *input);
++output_ports[j];
++input;
LOG_DEBUG(log, "Partition id: {}", part_it->data_part->info.partition_id);
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data,
metadata_snapshot,
part_it->data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
part_it->ranges,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part_it->part_index_in_query);
pipes.emplace_back(std::move(source_processor));
}
processors.emplace_back(std::move(merge));
pipe = Pipe::unitePipes(std::move(pipes));
}
return processors;
});
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe, data);
return pipe;
if (std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 && parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{
select_and_merge_pipes.emplace_back(std::move(pipe));
continue;
}
pipe.addSimpleTransform([&metadata_snapshot](const Block & header) {
return std::make_shared<ExpressionTransform>(header, metadata_snapshot->getSortingKey().expression);
});
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipe.getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
auto get_merging_processor = [&]() -> MergingTransformPtr {
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary: {
return std::make_shared<MergingSortedTransform>(header, pipe.numOutputPorts(), sort_description, max_block_size);
}
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(
header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, true, max_block_size);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(
header,
pipe.numOutputPorts(),
sort_description,
data.merging_params.columns_to_sum,
partition_key_columns,
max_block_size);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, pipe.numOutputPorts(), sort_description, max_block_size);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(
header, pipe.numOutputPorts(), sort_description, data.merging_params.version_column, max_block_size);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(
header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, max_block_size);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
__builtin_unreachable();
};
if (num_streams <= 1 || sort_description.empty())
{
pipe.addTransform(get_merging_processor());
select_and_merge_pipes.emplace_back(std::move(pipe));
continue;
}
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (auto & desc : sort_description)
{
if (!desc.column_name.empty())
key_columns.push_back(header.getPositionByName(desc.column_name));
else
key_columns.emplace_back(desc.column_number);
}
pipe.addSimpleTransform([&](const Block & stream_header) {
return std::make_shared<AddingSelectorTransform>(stream_header, num_streams, key_columns);
});
pipe.transform([&](OutputPortRawPtrs ports) {
Processors processors;
std::vector<OutputPorts::iterator> output_ports;
processors.reserve(ports.size() + num_streams);
output_ports.reserve(ports.size());
LOG_DEBUG(log, "Output ports size: {}", ports.size());
for (auto & port : ports)
{
auto copier = std::make_shared<CopyTransform>(header, num_streams);
connect(*port, copier->getInputPort());
output_ports.emplace_back(copier->getOutputs().begin());
processors.emplace_back(std::move(copier));
}
for (size_t i = 0; i < num_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
auto input = merge->getInputs().begin();
/// Connect i-th merge with i-th input port of every copier.
for (size_t j = 0; j < ports.size(); ++j)
{
connect(*output_ports[j], *input);
++output_ports[j];
++input;
}
processors.emplace_back(std::move(merge));
}
return processors;
});
select_and_merge_pipes.emplace_back(std::move(pipe));
}
return Pipe::unitePipes(std::move(select_and_merge_pipes));
}
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.

View File

@ -112,6 +112,8 @@ struct Settings;
/** Obsolete settings. Kept for backward compatibility only. */ \
M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \
M(UInt64, check_delay_period, 60, "Obsolete setting, does nothing.", 0) \
/** Select settings */ \
M(Bool, do_not_merge_across_partitions_select_final, false, "Merge parts only in one partition in select final", 0) \
/// Settings that should not change after the creation of a table.
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \