better pipeline while reading in pk_order

This commit is contained in:
CurtizJ 2019-07-27 01:18:27 +03:00
parent b1cc019bd4
commit b1d981ec3a
6 changed files with 97 additions and 46 deletions

View File

@ -535,19 +535,13 @@ InterpreterSelectQuery::analyzeExpressions(QueryProcessingStage::Enum from_stage
} }
} }
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer->appendSelect(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage)); query_analyzer->appendSelect(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.selected_columns = chain.getLastStep().required_output; res.selected_columns = chain.getLastStep().required_output;
res.before_select = chain.getLastActions(); res.has_order_by = query_analyzer->appendOrderBy(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage));
res.before_order_and_select = chain.getLastActions();
chain.addStep(); chain.addStep();
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
if (query_analyzer->appendOrderBy(chain, dry_run || (res.need_aggregate ? !res.second_stage : !res.first_stage)))
{
res.has_order_by = true;
res.before_order = chain.getLastActions();
chain.addStep();
}
if (query_analyzer->appendLimitBy(chain, dry_run || !res.second_stage)) if (query_analyzer->appendLimitBy(chain, dry_run || !res.second_stage))
{ {
res.has_limit_by = true; res.has_limit_by = true;
@ -654,8 +648,7 @@ static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & c
} }
static SortingInfoPtr optimizeSortingWithPK(const MergeTreeData & merge_tree, const ASTSelectQuery & query, static SortingInfoPtr optimizeSortingWithPK(const MergeTreeData & merge_tree, const ASTSelectQuery & query, const Context & context)
const Context & context, const ExpressionActionsPtr & expressions)
{ {
if (!merge_tree.hasSortingKey()) if (!merge_tree.hasSortingKey())
return {}; return {};
@ -667,11 +660,19 @@ static SortingInfoPtr optimizeSortingWithPK(const MergeTreeData & merge_tree, co
const auto & sorting_key_columns = merge_tree.getSortingKeyColumns(); const auto & sorting_key_columns = merge_tree.getSortingKeyColumns();
size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size()); size_t prefix_size = std::min(order_descr.size(), sorting_key_columns.size());
auto is_virtual_column = [](const String & column_name)
{
return column_name == "_part" || column_name == "_part_index"
|| column_name == "_partition_id" || column_name == "_sample_factor";
};
auto order_by_expr = query.orderBy(); auto order_by_expr = query.orderBy();
auto syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, auto syntax_result = SyntaxAnalyzer(context).analyze(order_by_expr, merge_tree.getColumns().getAll());
expressions->getRequiredColumnsWithTypes(), expressions->getSampleBlock().getNames());
for (size_t i = 0; i < prefix_size; ++i) for (size_t i = 0; i < prefix_size; ++i)
{ {
if (is_virtual_column(order_descr[i].column_name))
break;
/// Read in pk order in case of exact match with order key element /// Read in pk order in case of exact match with order key element
/// or in some simple cases when order key element is wrapped into monotonic function. /// or in some simple cases when order key element is wrapped into monotonic function.
int current_direction = order_descr[i].direction; int current_direction = order_descr[i].direction;
@ -730,7 +731,7 @@ static SortingInfoPtr optimizeSortingWithPK(const MergeTreeData & merge_tree, co
if (prefix_order_descr.empty()) if (prefix_order_descr.empty())
return {}; return {};
return std::make_shared<SortingInfo>(std::move(prefix_order_descr), expressions, read_direction); return std::make_shared<SortingInfo>(std::move(prefix_order_descr), read_direction);
} }
@ -792,6 +793,11 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
} }
SortingInfoPtr sorting_info; SortingInfoPtr sorting_info;
if (settings.optimize_pk_order && storage && query.orderBy() && !query.groupBy() && !query.final())
{
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
sorting_info = optimizeSortingWithPK(*merge_tree_data, query, context);
}
if (dry_run) if (dry_run)
{ {
@ -805,12 +811,6 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (storage && expressions.filter_info && expressions.prewhere_info) if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE); throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
if (settings.optimize_pk_order && storage && query.orderBy() && !query.groupBy() && !query.final())
{
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
sorting_info = optimizeSortingWithPK(*merge_tree_data, query,context, expressions.before_order);
}
if (expressions.prewhere_info) if (expressions.prewhere_info)
{ {
if constexpr (pipeline_with_processors) if constexpr (pipeline_with_processors)
@ -847,12 +847,6 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (storage && expressions.filter_info && expressions.prewhere_info) if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE); throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
if (settings.optimize_pk_order && storage && query.orderBy() && !query.groupBy() && !query.final())
{
if (const MergeTreeData * merge_tree_data = dynamic_cast<const MergeTreeData *>(storage.get()))
sorting_info = optimizeSortingWithPK(*merge_tree_data, query,context, expressions.before_order);
}
/** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */ /** Read the data from Storage. from_stage - to what stage the request was completed in Storage. */
executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere); executeFetchColumns(from_stage, pipeline, sorting_info, expressions.prewhere_info, expressions.columns_to_remove_after_prewhere);
@ -960,7 +954,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final); executeAggregation(pipeline, expressions.before_aggregation, aggregate_overflow_row, aggregate_final);
else else
{ {
executeExpression(pipeline, expressions.before_select); executeExpression(pipeline, expressions.before_order_and_select);
executeDistinct(pipeline, true, expressions.selected_columns); executeDistinct(pipeline, true, expressions.selected_columns);
} }
@ -972,11 +966,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.has_having) if (!expressions.second_stage && !expressions.need_aggregate && !expressions.has_having)
{ {
if (expressions.has_order_by) if (expressions.has_order_by)
{
if (!query_info.sorting_info) // Otherwise we have executed expressions while reading
executeExpression(pipeline, expressions.before_order);
executeOrder(pipeline, query_info.sorting_info); executeOrder(pipeline, query_info.sorting_info);
}
if (expressions.has_order_by && query.limitLength()) if (expressions.has_order_by && query.limitLength())
executeDistinct(pipeline, false, expressions.selected_columns); executeDistinct(pipeline, false, expressions.selected_columns);
@ -1030,7 +1020,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
else if (expressions.has_having) else if (expressions.has_having)
executeHaving(pipeline, expressions.before_having); executeHaving(pipeline, expressions.before_having);
executeExpression(pipeline, expressions.before_select); executeExpression(pipeline, expressions.before_order_and_select);
executeDistinct(pipeline, true, expressions.selected_columns); executeDistinct(pipeline, true, expressions.selected_columns);
need_second_distinct_pass = query.distinct && pipeline.hasMixedStreams(); need_second_distinct_pass = query.distinct && pipeline.hasMixedStreams();
@ -1071,11 +1061,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final)) if (!expressions.first_stage && !expressions.need_aggregate && !(query.group_by_with_totals && !aggregate_final))
executeMergeSorted(pipeline); executeMergeSorted(pipeline);
else /// Otherwise, just sort. else /// Otherwise, just sort.
{
if (!query_info.sorting_info) // Otherwise we have executed them while reading
executeExpression(pipeline, expressions.before_order);
executeOrder(pipeline, query_info.sorting_info); executeOrder(pipeline, query_info.sorting_info);
}
} }
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT, /** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,

View File

@ -152,8 +152,7 @@ private:
ExpressionActionsPtr before_where; ExpressionActionsPtr before_where;
ExpressionActionsPtr before_aggregation; ExpressionActionsPtr before_aggregation;
ExpressionActionsPtr before_having; ExpressionActionsPtr before_having;
ExpressionActionsPtr before_order; ExpressionActionsPtr before_order_and_select;
ExpressionActionsPtr before_select;
ExpressionActionsPtr before_limit_by; ExpressionActionsPtr before_limit_by;
ExpressionActionsPtr final_projection; ExpressionActionsPtr final_projection;

View File

@ -973,13 +973,19 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsPKOrd
streams_per_thread.push_back(source_stream); streams_per_thread.push_back(source_stream);
} }
if (sorting_info->actions && !sorting_info->actions->getActions().empty())
for (auto & stream : streams_per_thread)
stream = std::make_shared<ExpressionBlockInputStream>(stream, sorting_info->actions);
if (streams_per_thread.size() > 1) if (streams_per_thread.size() > 1)
{
SortDescription sort_description;
for (size_t j = 0; j < query_info.sorting_info->prefix_order_descr.size(); ++j)
sort_description.emplace_back(data.sorting_key_columns[j],
sorting_info->direction, 1);
for (auto & stream : streams_per_thread)
stream = std::make_shared<ExpressionBlockInputStream>(stream, data.sorting_key_expr);
streams.push_back(std::make_shared<MergingSortedBlockInputStream>( streams.push_back(std::make_shared<MergingSortedBlockInputStream>(
streams_per_thread, sorting_info->prefix_order_descr, max_block_size)); streams_per_thread, sort_description, max_block_size));
}
else else
streams.push_back(streams_per_thread.at(0)); streams.push_back(streams_per_thread.at(0));
} }

View File

@ -37,11 +37,10 @@ struct FilterInfo
struct SortingInfo struct SortingInfo
{ {
SortDescription prefix_order_descr; SortDescription prefix_order_descr;
ExpressionActionsPtr actions;
int direction; int direction;
SortingInfo(const SortDescription & prefix_order_descr_, const ExpressionActionsPtr & actions_, int direction_) SortingInfo(const SortDescription & prefix_order_descr_, int direction_)
: prefix_order_descr(prefix_order_descr_), actions(actions_), direction(direction_) {} : prefix_order_descr(prefix_order_descr_), direction(direction_) {}
}; };
using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>; using PrewhereInfoPtr = std::shared_ptr<PrewhereInfo>;

View File

@ -163,3 +163,38 @@
1 3 103 1 3 103
1 2 102 1 2 102
1 1 101 1 1 101
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00 -1249512288
2019-05-05 00:00:00 -916059969
2019-05-05 00:00:00 -859523951
2019-05-05 00:00:00 -45363190
2019-05-05 00:00:00 345522721
2019-05-14 00:00:00 99
2019-05-14 00:00:00 89
2019-05-14 00:00:00 79
2019-05-14 00:00:00 69
2019-05-14 00:00:00 59
2019-05-14 00:00:00 99
2019-05-14 00:00:00 89
2019-05-14 00:00:00 79
2019-05-14 00:00:00 69
2019-05-14 00:00:00 59
2019-05-14 00:00:00 99
2019-05-14 00:00:00 89
2019-05-14 00:00:00 79
2019-05-14 00:00:00 69
2019-05-14 00:00:00 59
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
2019-05-05 00:00:00
1 5
1 5
1 5
1 3
1 3

View File

@ -28,3 +28,29 @@ SELECT a, b, c FROM test.pk_order ORDER BY a, b DESC, c DESC;
SELECT a, b, c FROM test.pk_order ORDER BY a DESC, b DESC, c DESC; SELECT a, b, c FROM test.pk_order ORDER BY a DESC, b DESC, c DESC;
DROP TABLE IF EXISTS test.pk_order; DROP TABLE IF EXISTS test.pk_order;
CREATE TABLE pk_order (d DateTime, a Int32, b Int32) ENGINE = MergeTree ORDER BY (d, a)
PARTITION BY toDate(d) SETTINGS index_granularity=1;
INSERT INTO pk_order
SELECT toDateTime('2019-05-05 00:00:00') + INTERVAL number % 10 DAY, number, intHash32(number) from numbers(100);
set max_block_size = 1;
-- Currently checking number of read rows while reading in pk order not working precise. TODO: fix it.
-- SET max_rows_to_read = 10;
SELECT d FROM pk_order ORDER BY d LIMIT 5;
SELECT d, b FROM pk_order ORDER BY d, b LIMIT 5;
SELECT d, a FROM pk_order ORDER BY d DESC, a DESC LIMIT 5;
SELECT d, a FROM pk_order ORDER BY d DESC, -a LIMIT 5;
SELECT d, a FROM pk_order ORDER BY d DESC, a DESC LIMIT 5;
SELECT toStartOfHour(d) as d1 FROM pk_order ORDER BY d1 LIMIT 5;
DROP TABLE pk_order;
CREATE TABLE pk_order (a Int, b Int) ENGINE = MergeTree ORDER BY (a / b);
INSERT INTO pk_order SELECT number % 10 + 1, number % 6 + 1 from numbers(100);
SELECT * FROM pk_order ORDER BY (a / b), a LIMIT 5;