Merge pull request #15938 from Avogar/select_final

Select final
This commit is contained in:
Anton Popov 2020-11-03 13:49:47 +03:00 committed by GitHub
commit 85ab1e9bc3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 224 additions and 124 deletions

View File

@ -126,6 +126,7 @@ class IColumn;
M(UInt64, merge_tree_coarse_index_granularity, 8, "If the index segment can contain the required keys, divide it into as many parts and recursively check them.", 0) \
M(UInt64, merge_tree_max_rows_to_use_cache, (128 * 8192), "The maximum number of rows per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
M(UInt64, merge_tree_max_bytes_to_use_cache, (192 * 10 * 1024 * 1024), "The maximum number of bytes per request, to use the cache of uncompressed data. If the request is large, the cache is not used. (For large queries not to flush out the cache.)", 0) \
M(Bool, do_not_merge_across_partitions_select_final, false, "Merge parts only in one partition in select final", 0) \
\
M(UInt64, mysql_max_rows_to_insert, 65536, "The maximum number of rows in MySQL batch insertion of the MySQL storage engine", 0) \
\

View File

@ -1237,144 +1237,200 @@ Pipe MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (sum_marks > max_marks_to_use_cache)
use_uncompressed_cache = false;
Pipe pipe;
{
Pipes pipes;
for (const auto & part : parts)
{
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data, metadata_snapshot, part.data_part, max_block_size, settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes, column_names, part.ranges,
use_uncompressed_cache,
query_info.prewhere_info, true, reader_settings,
virt_columns, part.part_index_in_query);
pipes.emplace_back(std::move(source_processor));
}
pipe = Pipe::unitePipes(std::move(pipes));
}
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform([&metadata_snapshot](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, metadata_snapshot->getSortingKey().expression);
});
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipe.getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
auto get_merging_processor = [&]() -> MergingTransformPtr
{
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
{
return std::make_shared<MergingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, max_block_size);
}
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.sign_column, true, max_block_size);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.columns_to_sum, partition_key_columns, max_block_size);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, max_block_size);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.version_column, max_block_size);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, pipe.numOutputPorts(),
sort_description, data.merging_params.sign_column, max_block_size);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
__builtin_unreachable();
};
if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads;
if (num_streams <= 1 || sort_description.empty())
/// If setting do_not_merge_across_partitions_select_final is true than we won't merge parts from different partitions.
/// We have all parts in parts vector, where parts with same partition are nerby.
/// So we will store iterators pointed to the beginning of each partition range (and parts.end()),
/// then we will create a pipe for each partition that will run selecting processor and merging processor
/// for the parts with this partition. In the end we will unite all the pipes.
std::vector<RangesInDataParts::iterator> parts_to_merge_ranges;
auto it = parts.begin();
parts_to_merge_ranges.push_back(it);
if (settings.do_not_merge_across_partitions_select_final)
{
pipe.addTransform(get_merging_processor());
return pipe;
}
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (auto & desc : sort_description)
{
if (!desc.column_name.empty())
key_columns.push_back(header.getPositionByName(desc.column_name));
else
key_columns.emplace_back(desc.column_number);
}
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<AddingSelectorTransform>(stream_header, num_streams, key_columns);
});
pipe.transform([&](OutputPortRawPtrs ports)
{
Processors processors;
std::vector<OutputPorts::iterator> output_ports;
processors.reserve(ports.size() + num_streams);
output_ports.reserve(ports.size());
for (auto & port : ports)
while (it != parts.end())
{
auto copier = std::make_shared<CopyTransform>(header, num_streams);
connect(*port, copier->getInputPort());
output_ports.emplace_back(copier->getOutputs().begin());
processors.emplace_back(std::move(copier));
it = std::find_if(
it, parts.end(), [&it](auto & part) { return it->data_part->info.partition_id != part.data_part->info.partition_id; });
parts_to_merge_ranges.push_back(it);
}
/// We divide threads for each partition equally. But we will create at least the number of partitions threads.
/// (So, the total number of threads could be more than initial num_streams.
num_streams /= (parts_to_merge_ranges.size() - 1);
}
else
{
/// If do_not_merge_across_partitions_select_final is false we just merge all the parts.
parts_to_merge_ranges.push_back(parts.end());
}
Pipes partition_pipes;
for (size_t range_index = 0; range_index < parts_to_merge_ranges.size() - 1; ++range_index)
{
Pipe pipe;
for (size_t i = 0; i < num_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
auto input = merge->getInputs().begin();
Pipes pipes;
/// Connect i-th merge with i-th input port of every copier.
for (size_t j = 0; j < ports.size(); ++j)
for (auto part_it = parts_to_merge_ranges[range_index]; part_it != parts_to_merge_ranges[range_index + 1]; ++part_it)
{
connect(*output_ports[j], *input);
++output_ports[j];
++input;
auto source_processor = std::make_shared<MergeTreeSelectProcessor>(
data,
metadata_snapshot,
part_it->data_part,
max_block_size,
settings.preferred_block_size_bytes,
settings.preferred_max_column_in_block_size_bytes,
column_names,
part_it->ranges,
use_uncompressed_cache,
query_info.prewhere_info,
true,
reader_settings,
virt_columns,
part_it->part_index_in_query);
pipes.emplace_back(std::move(source_processor));
}
processors.emplace_back(std::move(merge));
pipe = Pipe::unitePipes(std::move(pipes));
}
return processors;
});
/// Drop temporary columns, added by 'sorting_key_expr'
if (!out_projection)
out_projection = createProjection(pipe, data);
return pipe;
/// If do_not_merge_across_partitions_select_final is true and there is only one part in partition
/// with level > 0 then we won't postprocess this part
if (settings.do_not_merge_across_partitions_select_final &&
std::distance(parts_to_merge_ranges[range_index], parts_to_merge_ranges[range_index + 1]) == 1 &&
parts_to_merge_ranges[range_index]->data_part->info.level > 0)
{
partition_pipes.emplace_back(std::move(pipe));
continue;
}
pipe.addSimpleTransform([&metadata_snapshot](const Block & header)
{
return std::make_shared<ExpressionTransform>(header, metadata_snapshot->getSortingKey().expression);
});
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
Block header = pipe.getHeader();
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
auto get_merging_processor = [&]() -> MergingTransformPtr
{
switch (data.merging_params.mode)
{
case MergeTreeData::MergingParams::Ordinary:
{
return std::make_shared<MergingSortedTransform>(header, pipe.numOutputPorts(), sort_description, max_block_size);
}
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(
header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, true, max_block_size);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(
header,
pipe.numOutputPorts(),
sort_description,
data.merging_params.columns_to_sum,
partition_key_columns,
max_block_size);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, pipe.numOutputPorts(), sort_description, max_block_size);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(
header, pipe.numOutputPorts(), sort_description, data.merging_params.version_column, max_block_size);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(
header, pipe.numOutputPorts(), sort_description, data.merging_params.sign_column, max_block_size);
case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
__builtin_unreachable();
};
if (num_streams <= 1 || sort_description.empty())
{
pipe.addTransform(get_merging_processor());
partition_pipes.emplace_back(std::move(pipe));
continue;
}
ColumnNumbers key_columns;
key_columns.reserve(sort_description.size());
for (auto & desc : sort_description)
{
if (!desc.column_name.empty())
key_columns.push_back(header.getPositionByName(desc.column_name));
else
key_columns.emplace_back(desc.column_number);
}
pipe.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<AddingSelectorTransform>(stream_header, num_streams, key_columns);
});
pipe.transform([&](OutputPortRawPtrs ports)
{
Processors processors;
std::vector<OutputPorts::iterator> output_ports;
processors.reserve(ports.size() + num_streams);
output_ports.reserve(ports.size());
for (auto & port : ports)
{
auto copier = std::make_shared<CopyTransform>(header, num_streams);
connect(*port, copier->getInputPort());
output_ports.emplace_back(copier->getOutputs().begin());
processors.emplace_back(std::move(copier));
}
for (size_t i = 0; i < num_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
auto input = merge->getInputs().begin();
/// Connect i-th merge with i-th input port of every copier.
for (size_t j = 0; j < ports.size(); ++j)
{
connect(*output_ports[j], *input);
++output_ports[j];
++input;
}
processors.emplace_back(std::move(merge));
}
return processors;
});
partition_pipes.emplace_back(std::move(pipe));
}
return Pipe::unitePipes(std::move(partition_pipes));
}
/// Calculates a set of mark ranges, that could possibly contain keys, required by condition.

View File

@ -112,7 +112,6 @@ struct Settings;
/** Obsolete settings. Kept for backward compatibility only. */ \
M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \
M(UInt64, check_delay_period, 60, "Obsolete setting, does nothing.", 0) \
/// Settings that should not change after the creation of a table.
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \
M(index_granularity)

View File

@ -0,0 +1,23 @@
<test>
<settings>
<do_not_merge_across_partitions_select_final>1</do_not_merge_across_partitions_select_final>
</settings>
<create_query>
CREATE TABLE optimized_select_final (t DateTime, x Int32)
ENGINE = ReplacingMergeTree()
PARTITION BY toYYYYMM(t) ORDER BY x
</create_query>
<fill_query>INSERT INTO optimized_select_final SELECT toDate('2000-01-01'), number FROM numbers(5000000)</fill_query>
<fill_query>INSERT INTO optimized_select_final SELECT toDate('2020-01-01'), number FROM numbers(5000000)</fill_query>
<fill_query>INSERT INTO optimized_select_final SELECT toDate('2021-01-01'), number FROM numbers(5000000)</fill_query>
<fill_query>INSERT INTO optimized_select_final SELECT toDate('2022-01-01'), number FROM numbers(5000000)</fill_query>
<fill_query>OPTIMIZE TABLE optimized_select_final</fill_query>
<query>SELECT * FROM optimized_select_final FINAL FORMAT Null</query>
<drop_query>DROP TABLE IF EXISTS optimized_select_final</drop_query>
</test>

View File

@ -0,0 +1,6 @@
2000-01-01 00:00:00 0
2020-01-01 00:00:00 0
2000-01-01 00:00:00 1
2020-01-01 00:00:00 1
2000-01-01 00:00:00 2
2020-01-01 00:00:00 2

View File

@ -0,0 +1,15 @@
DROP TABLE IF EXISTS select_final;
CREATE TABLE select_final (t DateTime, x Int32) ENGINE = ReplacingMergeTree() PARTITION BY toYYYYMM(t) ORDER BY x;
INSERT INTO select_final SELECT toDate('2000-01-01'), number FROM numbers(2);
INSERT INTO select_final SELECT toDate('2000-01-01'), number + 1 FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number FROM numbers(2);
INSERT INTO select_final SELECT toDate('2020-01-01'), number + 1 FROM numbers(2);
SELECT * FROM select_final FINAL ORDER BY x SETTINGS do_not_merge_across_partitions_select_final = 1;
DROP TABLE select_final;