DISTINCT in order optimization

+ use SortDescription from input data stream in DistinctStep to decide if the optimization is applicable
This commit is contained in:
Igor Nikonov 2022-06-21 15:06:47 +00:00
parent 6ac68e8303
commit b0a98bd875
16 changed files with 113 additions and 104 deletions

View File

@ -603,7 +603,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \
M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \
M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization for primary key (prefix) columns in MergeTree family table engines.", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if columns in DISTINCT are sorted", 1) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.

View File

@ -1859,8 +1859,6 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
&& !query.final()
&& join_allow_read_in_order;
optimize_distinct_in_order = settings.optimize_distinct_in_order && storage && query.distinct;
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));

View File

@ -232,7 +232,6 @@ struct ExpressionAnalysisResult
bool remove_where_filter = false;
bool optimize_read_in_order = false;
bool optimize_aggregation_in_order = false;
bool optimize_distinct_in_order = false;
bool join_has_delayed_stream = false;
bool use_grouping_set_key = false;

View File

@ -1243,7 +1243,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
executeOrder(query_plan, input_order_info_for_order);
if (expressions.has_order_by && query.limitLength())
executeDistinct(query_plan, false, expressions.selected_columns, true, query_info.distinct_order_info);
executeDistinct(query_plan, false, expressions.selected_columns, true);
if (expressions.hasLimitBy())
{
@ -1400,7 +1400,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
// now, on shards (first_stage).
assert(!expressions.before_window);
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
executeDistinct(query_plan, true, expressions.selected_columns, true, query_info.distinct_order_info);
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
}
@ -1462,7 +1462,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
"Before window functions");
executeWindow(query_plan);
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
executeDistinct(query_plan, true, expressions.selected_columns, true, query_info.distinct_order_info);
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
else
{
@ -1470,7 +1470,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
{
executeWindow(query_plan);
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
executeDistinct(query_plan, true, expressions.selected_columns, true, query_info.distinct_order_info);
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
else
{
@ -1535,7 +1535,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
* then DISTINCT needs to be performed once again after merging all streams.
*/
if (!from_aggregation_stage && query.distinct)
executeDistinct(query_plan, false, expressions.selected_columns, false, query_info.distinct_order_info);
executeDistinct(query_plan, false, expressions.selected_columns, false);
if (!from_aggregation_stage && expressions.hasLimitBy())
{
@ -2112,11 +2112,6 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
else
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit);
}
if (!analysis_result.optimize_aggregation_in_order && analysis_result.optimize_distinct_in_order)
{
query_info.distinct_optimizer = std::make_shared<ReadInOrderOptimizerForDistinct>(analysis_result.selected_columns);
query_info.distinct_order_info = query_info.distinct_optimizer->getInputOrder(metadata_snapshot);
}
query_info.storage_limits = std::make_shared<StorageLimitsList>(storage_limits);
@ -2559,7 +2554,7 @@ void InterpreterSelectQuery::executeProjection(QueryPlan & query_plan, const Act
}
void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct, InputOrderInfoPtr distinct_info)
void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct)
{
auto & query = getSelectQuery();
if (query.distinct)
@ -2576,11 +2571,8 @@ void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
if (distinct_info && !settings.optimize_distinct_in_order)
distinct_info = nullptr;
auto distinct_step = std::make_unique<DistinctStep>(
query_plan.getCurrentDataStream(), limits, limit_for_distinct, columns, pre_distinct, distinct_info);
query_plan.getCurrentDataStream(), limits, limit_for_distinct, columns, pre_distinct, settings.optimize_distinct_in_order);
if (pre_distinct)
distinct_step->setStepDescription("Preliminary DISTINCT");

View File

@ -180,7 +180,7 @@ private:
void executeLimit(QueryPlan & query_plan);
void executeOffset(QueryPlan & query_plan);
static void executeProjection(QueryPlan & query_plan, const ActionsDAGPtr & expression);
void executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct, InputOrderInfoPtr distinct_info);
void executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct);
void executeExtremes(QueryPlan & query_plan);
void executeSubqueriesInSetsAndJoins(QueryPlan & query_plan, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
void

View File

@ -318,8 +318,8 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
/// Add distinct transform
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
auto distinct_step
= std::make_unique<DistinctStep>(query_plan.getCurrentDataStream(), limits, 0, result_header.getNames(), false, nullptr);
auto distinct_step = std::make_unique<DistinctStep>(
query_plan.getCurrentDataStream(), limits, 0, result_header.getNames(), false, settings.optimize_distinct_in_order);
query_plan.addStep(std::move(distinct_step));
}

View File

@ -4,6 +4,7 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
#include <Core/SortDescription.h>
namespace DB
{
@ -38,6 +39,18 @@ static ITransformingStep::Traits getTraits(bool pre_distinct, bool already_disti
};
}
static SortDescription getSortDescription(const SortDescription & input_sort_desc, const Names& columns)
{
SortDescription distinct_sort_desc;
for (const auto & sort_column_desc : input_sort_desc)
{
if (std::find(begin(columns), end(columns), sort_column_desc.column_name) == columns.end())
break;
distinct_sort_desc.emplace_back(sort_column_desc);
}
return distinct_sort_desc;
}
DistinctStep::DistinctStep(
const DataStream & input_stream_,
@ -45,7 +58,7 @@ DistinctStep::DistinctStep(
UInt64 limit_hint_,
const Names & columns_,
bool pre_distinct_,
const InputOrderInfoPtr & distinct_info_)
bool optimize_distinct_in_order_)
: ITransformingStep(
input_stream_,
input_stream_.header,
@ -53,8 +66,8 @@ DistinctStep::DistinctStep(
, set_size_limits(set_size_limits_)
, limit_hint(limit_hint_)
, columns(columns_)
, distinct_info(distinct_info_)
, pre_distinct(pre_distinct_)
, optimize_distinct_in_order(optimize_distinct_in_order_)
{
if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing
&& (!pre_distinct /// Main distinct
@ -74,29 +87,32 @@ void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil
if (!pre_distinct)
pipeline.resize(1);
if (pre_distinct && distinct_info)
if (pre_distinct && optimize_distinct_in_order)
{
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
if (SortDescription distinct_sort_desc = getSortDescription(input_streams.front().sort_description, columns);
!distinct_sort_desc.empty())
{
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<DistinctPrimaryKeyTransform>(
header, set_size_limits, limit_hint, distinct_info->order_key_prefix_descr, columns);
});
return std::make_shared<DistinctPrimaryKeyTransform>(
header, set_size_limits, limit_hint, distinct_sort_desc, columns);
});
return;
}
}
else
{
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<DistinctTransform>(header, set_size_limits, limit_hint, columns);
});
}
pipeline.addSimpleTransform(
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipelineBuilder::StreamType::Main)
return nullptr;
return std::make_shared<DistinctTransform>(header, set_size_limits, limit_hint, columns);
});
}
void DistinctStep::describeActions(FormatSettings & settings) const

View File

@ -16,7 +16,7 @@ public:
UInt64 limit_hint_,
const Names & columns_,
bool pre_distinct_, /// If is enabled, execute distinct for separate streams. Otherwise, merge streams.
const InputOrderInfoPtr & distinct_info_);
bool optimize_distinct_in_order_);
String getName() const override { return "Distinct"; }
@ -29,8 +29,8 @@ private:
SizeLimits set_size_limits;
UInt64 limit_hint;
Names columns;
InputOrderInfoPtr distinct_info;
bool pre_distinct;
bool optimize_distinct_in_order;
};
}

View File

@ -117,6 +117,15 @@ ReadFromMergeTree::ReadFromMergeTree(
/// Add explicit description.
setStepDescription(data.getStorageID().getFullNameNotQuoted());
Names sorting_key_columns = storage_snapshot->getMetadataForQuery()->getSortingKeyColumns();
SortDescription sort_description;
for (const auto & column : sorting_key_columns)
{
sort_description.emplace_back(column, 1);
}
output_stream->sort_description = std::move(sort_description);
}
Pipe ReadFromMergeTree::readFromPool(

View File

@ -111,7 +111,7 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeIndexes(JSONBuilder::JSONMap & map) const override;
const StorageID getStorageID() const { return data.getStorageID(); }
StorageID getStorageID() const { return data.getStorageID(); }
UInt64 getSelectedParts() const { return selected_parts; }
UInt64 getSelectedRows() const { return selected_rows; }
UInt64 getSelectedMarks() const { return selected_marks; }

View File

@ -114,8 +114,7 @@ bool DistinctPrimaryKeyTransform::isCurrentKey(const size_t row_pos)
{
for (size_t i = 0; i < sorted_columns.size(); ++i)
{
const auto & sort_col_desc = sorted_columns_descr[i];
int res = sort_col_desc.direction * current_key[i]->compareAt(0, row_pos, *sorted_columns[i], sort_col_desc.nulls_direction);
int res = current_key[i]->compareAt(0, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction);
if (res != 0)
return false;
}
@ -125,7 +124,7 @@ bool DistinctPrimaryKeyTransform::isCurrentKey(const size_t row_pos)
size_t DistinctPrimaryKeyTransform::getRangeEnd(size_t range_begin, size_t range_end)
{
size_t low = range_begin;
size_t high = range_end-1;
size_t high = range_end - 1;
while (low <= high)
{
size_t mid = low + (high - low) / 2;

View File

@ -262,29 +262,4 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(
return getInputOrderImpl(metadata_snapshot, required_sort_description, elements_actions, limit);
}
ReadInOrderOptimizerForDistinct::ReadInOrderOptimizerForDistinct(const Names & source_columns_)
: source_columns(source_columns_.begin(), source_columns_.end())
{
}
InputOrderInfoPtr ReadInOrderOptimizerForDistinct::getInputOrder(const StorageMetadataPtr & metadata_snapshot) const
{
Names sorting_key_columns = metadata_snapshot->getSortingKeyColumns();
if (sorting_key_columns.empty())
return {};
SortDescription order_key_prefix_descr;
for (const auto & sorting_key_column : sorting_key_columns)
{
if (source_columns.find(sorting_key_column) == source_columns.end())
break;
order_key_prefix_descr.emplace_back(sorting_key_column, 1, 1);
}
if (order_key_prefix_descr.empty())
return {};
// todo: InputOrderInfo contains more info then needed for distinct, probably it makes sense to replace it
return std::make_shared<InputOrderInfo>(SortDescription{}, std::move(order_key_prefix_descr), 1, 0);
}
}

View File

@ -39,14 +39,4 @@ private:
SortDescription required_sort_description;
const ASTSelectQuery & query;
};
class ReadInOrderOptimizerForDistinct
{
public:
explicit ReadInOrderOptimizerForDistinct(const Names & source_columns_);
InputOrderInfoPtr getInputOrder(const StorageMetadataPtr & metadata_snapshot) const;
private:
NameSet source_columns;
};
}

View File

@ -38,9 +38,6 @@ using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
class ReadInOrderOptimizer;
using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
class ReadInOrderOptimizerForDistinct;
using ReadInOrderOptimizerForDistinctPtr = std::shared_ptr<const ReadInOrderOptimizerForDistinct>;
class Cluster;
using ClusterPtr = std::shared_ptr<Cluster>;
@ -165,9 +162,6 @@ struct SelectQueryInfoBase
/// Can be modified while reading from storage
InputOrderInfoPtr input_order_info;
ReadInOrderOptimizerForDistinctPtr distinct_optimizer;
InputOrderInfoPtr distinct_order_info;
/// Prepared sets are used for indices by storage engine.
/// Example: x IN (1, 2, 3)
PreparedSets sets;

View File

@ -1,3 +1,16 @@
enable optimize_distinct_in_order
distinct pipeline on empty table -> no optimization, source is ReadFromPreparedSource instead of ReadFromMergeTree
(Expression)
ExpressionTransform
(Distinct)
DistinctTransform
(Distinct)
DistinctTransform
(Expression)
ExpressionTransform
(ReadFromPreparedSource)
NullSource 0 → 1
insert into table to use ReadFromMergeTree source
disable optimize_distinct_in_order
pipeline does _not_ contain the optimization
(Expression)
@ -8,10 +21,10 @@ ExpressionTransform
DistinctTransform
(Expression)
ExpressionTransform
(ReadFromPreparedSource)
NullSource 0 → 1
(ReadFromMergeTree)
MergeTreeInOrder 0 → 1
enable optimize_distinct_in_order
distinct with primary key prefix -> pipeline contains the optimization
distinct with all primary key columns -> optimization applied
(Expression)
ExpressionTransform
(Distinct)
@ -20,9 +33,22 @@ ExpressionTransform
DistinctPrimaryKeyTransform
(Expression)
ExpressionTransform
(ReadFromPreparedSource)
NullSource 0 → 1
distinct with non-primary key prefix -> pipeline does _not_ contain the optimization
(ReadFromMergeTree)
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
distinct with primary key prefix -> optimization applied
(Expression)
ExpressionTransform
(Distinct)
DistinctTransform
(Distinct)
DistinctPrimaryKeyTransform
(Expression)
ExpressionTransform
(ReadFromMergeTree)
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
distinct with non-primary key prefix -> no optimization
(Expression)
ExpressionTransform
(Distinct)
@ -31,8 +57,9 @@ ExpressionTransform
DistinctTransform
(Expression)
ExpressionTransform
(ReadFromPreparedSource)
NullSource 0 → 1
(ReadFromMergeTree)
Concat 2 → 1
MergeTreeInOrder × 2 0 → 1
the same values in every chunk, distinct in order should skip entire chunks with the same key as previous one
single-threaded distinct
0
@ -64,7 +91,7 @@ multi-threaded distinct
table with not only primary key columns
distinct with key-prefix only
0
distinct with full key, 2 columns
distinct with full key
0 0
0 1
0 2

View File

@ -1,7 +1,14 @@
drop table if exists distinct_in_order sync;
create table distinct_in_order (a int, b int, c int) engine=MergeTree() order by (a, b, c);
select 'enable optimize_distinct_in_order';
set optimize_distinct_in_order=1;
select 'distinct pipeline on empty table -> no optimization, source is ReadFromPreparedSource instead of ReadFromMergeTree';
explain pipeline select distinct * from distinct_in_order settings max_threads=1;
select 'insert into table to use ReadFromMergeTree source';
insert into distinct_in_order select number % number, number % 10, number % 5 from numbers(1,10);
select 'disable optimize_distinct_in_order';
set optimize_distinct_in_order=0;
select 'pipeline does _not_ contain the optimization';
@ -9,9 +16,12 @@ explain pipeline select distinct * from distinct_in_order settings max_threads=1
select 'enable optimize_distinct_in_order';
set optimize_distinct_in_order=1;
select 'distinct with primary key prefix -> pipeline contains the optimization';
select 'distinct with all primary key columns -> optimization applied';
insert into distinct_in_order select number % number, number % 10, number % 5 from numbers(1,10);
explain pipeline select distinct * from distinct_in_order settings max_threads=1;
select 'distinct with primary key prefix -> optimization applied';
explain pipeline select distinct a, c from distinct_in_order settings max_threads=1;
select 'distinct with non-primary key prefix -> pipeline does _not_ contain the optimization';
select 'distinct with non-primary key prefix -> no optimization';
explain pipeline select distinct b, c from distinct_in_order settings max_threads=1;
select 'the same values in every chunk, distinct in order should skip entire chunks with the same key as previous one';
@ -39,9 +49,9 @@ create table distinct_in_order (a int, b int, c int) engine=MergeTree() order by
insert into distinct_in_order select number % number, number % 10, number % 5 from numbers(1,1000000);
select 'distinct with key-prefix only';
select distinct a from distinct_in_order;
select 'distinct with full key, 2 columns';
select 'distinct with full key';
select distinct a,b from distinct_in_order order by b;
select 'distinct with key prefix and non-sorted column';
select distinct a,c from distinct_in_order order by c;
-- drop table if exists distinct_in_order sync;
drop table if exists distinct_in_order sync;