fix sample with final

This commit is contained in:
Anton Popov 2020-05-12 21:22:58 +03:00
parent b707156e09
commit 67213b8ad4
4 changed files with 51 additions and 20 deletions

View File

@ -601,6 +601,11 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
.save_marks_in_cache = true
};
/// Projection, that needed to drop columns, which have appeared by execution
/// of some extra expressions, and to allow execute the same expressions later.
/// NOTE: It may lead to double computation of expressions.
ExpressionActionsPtr result_projection;
if (select.final())
{
/// Add columns needed to calculate the sorting expression and the sign.
@ -623,7 +628,8 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
query_info,
virt_column_names,
settings,
reader_settings);
reader_settings,
result_projection);
}
else if (settings.optimize_read_in_order && query_info.input_sorting_info)
{
@ -644,7 +650,8 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
sorting_key_prefix_expr,
virt_column_names,
settings,
reader_settings);
reader_settings,
result_projection);
}
else
{
@ -667,6 +674,13 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
pipe.getHeader(), filter_expression, filter_function->getColumnName(), false));
}
if (result_projection)
{
for (auto & pipe : res)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(
pipe.getHeader(), result_projection));
}
/// By the way, if a distributed query or query to a Merge table is made, then the `_sample_factor` column can have different values.
if (sample_factor_column_queried)
{
@ -831,7 +845,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
const ExpressionActionsPtr & sorting_key_prefix_expr,
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const
const MergeTreeReaderSettings & reader_settings,
ExpressionActionsPtr & out_projection) const
{
size_t sum_marks = 0;
const InputSortingInfoPtr & input_sorting_info = query_info.input_sorting_info;
@ -1007,19 +1022,14 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
sort_description.emplace_back(data.sorting_key_columns[j],
input_sorting_info->direction, 1);
/// Project input columns to drop columns from sorting_key_prefix_expr
/// to allow execute the same expression later.
/// NOTE: It may lead to double computation of expression.
auto projection = createProjection(pipes.back(), data);
out_projection = createProjection(pipes.back(), data);
for (auto & pipe : pipes)
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), sorting_key_prefix_expr));
auto merging_sorted = std::make_shared<MergingSortedTransform>(
pipes.back().getHeader(), pipes.size(), sort_description, max_block_size);
Pipe merged(std::move(pipes), std::move(merging_sorted));
merged.addSimpleTransform(std::make_shared<ExpressionTransform>(merged.getHeader(), projection));
res.emplace_back(std::move(merged));
res.emplace_back(std::move(pipes), std::move(merging_sorted));
}
else
res.emplace_back(std::move(pipes.front()));
@ -1037,7 +1047,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const
const MergeTreeReaderSettings & reader_settings,
ExpressionActionsPtr & out_projection) const
{
const auto data_settings = data.getSettings();
size_t sum_marks = 0;
@ -1065,10 +1076,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
use_uncompressed_cache = false;
Pipes pipes;
/// Project input columns to drop columns from sorting_key_expr
/// to allow execute the same expression later.
/// NOTE: It may lead to double computation of expression.
ExpressionActionsPtr projection;
for (const auto & part : parts)
{
@ -1079,8 +1086,8 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
virt_columns, part.part_index_in_query);
Pipe pipe(std::move(source_processor));
if (!projection)
projection = createProjection(pipe, data);
if (!out_projection)
out_projection = createProjection(pipe, data);
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), data.sorting_key_expr));
pipes.emplace_back(std::move(pipe));
@ -1154,7 +1161,6 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
if (merged_processor)
{
Pipe pipe(std::move(pipes), std::move(merged_processor));
pipe.addSimpleTransform(std::make_shared<ExpressionTransform>(pipe.getHeader(), projection));
pipes = Pipes();
pipes.emplace_back(std::move(pipe));
}

View File

@ -67,7 +67,8 @@ private:
const ExpressionActionsPtr & sorting_key_prefix_expr,
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const;
const MergeTreeReaderSettings & reader_settings,
ExpressionActionsPtr & out_projection) const;
Pipes spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts,
@ -77,7 +78,8 @@ private:
const SelectQueryInfo & query_info,
const Names & virt_columns,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const;
const MergeTreeReaderSettings & reader_settings,
ExpressionActionsPtr & out_projection) const;
/// Get the approximate value (bottom estimate - only by full marks) of the number of rows falling under the index.
size_t getApproximateTotalRowsToRead(

View File

@ -0,0 +1,10 @@
8 8
9 9
10 10
14 14
15 15
9140302661501632497
9199082625845137542
8769270213041934496
4926958392161000708
89922286675368115

View File

@ -0,0 +1,13 @@
drop table if exists tab;
create table tab (x UInt64, v UInt64) engine = ReplacingMergeTree(v) order by (x, sipHash64(x)) sample by sipHash64(x);
insert into tab select number, number from numbers(1000);
select * from tab final sample 1/2 order by x limit 5;
drop table tab;
create table tab (x UInt64, v UInt64) engine = ReplacingMergeTree(v) order by (x, sipHash64(x)) sample by sipHash64(x);
insert into tab select number, number from numbers(1000);
select sipHash64(x) from tab sample 1/2 order by x, sipHash64(x) limit 5;
drop table tab;