mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-01 12:01:58 +00:00
DISTINCT in order optimization
+ optimization for DISTINCT containing primary key columns
This commit is contained in:
parent
e1547058cf
commit
6ac68e8303
@ -603,6 +603,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
|
||||
M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \
|
||||
M(Bool, compatibility_ignore_auto_increment_in_create_table, false, "Ignore AUTO_INCREMENT keyword in column declaration if true, otherwise return error. It simplifies migration from MySQL", 0) \
|
||||
M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \
|
||||
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization for primary key (prefix) columns in MergeTree family table engines.", 0) \
|
||||
// End of COMMON_SETTINGS
|
||||
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.
|
||||
|
||||
|
@ -1859,6 +1859,8 @@ ExpressionAnalysisResult::ExpressionAnalysisResult(
|
||||
&& !query.final()
|
||||
&& join_allow_read_in_order;
|
||||
|
||||
optimize_distinct_in_order = settings.optimize_distinct_in_order && storage && query.distinct;
|
||||
|
||||
/// If there is aggregation, we execute expressions in SELECT and ORDER BY on the initiating server, otherwise on the source servers.
|
||||
query_analyzer.appendSelect(chain, only_types || (need_aggregate ? !second_stage : !first_stage));
|
||||
|
||||
|
@ -232,6 +232,7 @@ struct ExpressionAnalysisResult
|
||||
bool remove_where_filter = false;
|
||||
bool optimize_read_in_order = false;
|
||||
bool optimize_aggregation_in_order = false;
|
||||
bool optimize_distinct_in_order = false;
|
||||
bool join_has_delayed_stream = false;
|
||||
|
||||
bool use_grouping_set_key = false;
|
||||
|
@ -1243,7 +1243,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
||||
executeOrder(query_plan, input_order_info_for_order);
|
||||
|
||||
if (expressions.has_order_by && query.limitLength())
|
||||
executeDistinct(query_plan, false, expressions.selected_columns, true);
|
||||
executeDistinct(query_plan, false, expressions.selected_columns, true, query_info.distinct_order_info);
|
||||
|
||||
if (expressions.hasLimitBy())
|
||||
{
|
||||
@ -1400,7 +1400,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
||||
// now, on shards (first_stage).
|
||||
assert(!expressions.before_window);
|
||||
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
|
||||
executeDistinct(query_plan, true, expressions.selected_columns, true);
|
||||
executeDistinct(query_plan, true, expressions.selected_columns, true, query_info.distinct_order_info);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1462,7 +1462,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
||||
"Before window functions");
|
||||
executeWindow(query_plan);
|
||||
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
|
||||
executeDistinct(query_plan, true, expressions.selected_columns, true);
|
||||
executeDistinct(query_plan, true, expressions.selected_columns, true, query_info.distinct_order_info);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1470,7 +1470,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
||||
{
|
||||
executeWindow(query_plan);
|
||||
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
|
||||
executeDistinct(query_plan, true, expressions.selected_columns, true);
|
||||
executeDistinct(query_plan, true, expressions.selected_columns, true, query_info.distinct_order_info);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1535,7 +1535,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
|
||||
* then DISTINCT needs to be performed once again after merging all streams.
|
||||
*/
|
||||
if (!from_aggregation_stage && query.distinct)
|
||||
executeDistinct(query_plan, false, expressions.selected_columns, false);
|
||||
executeDistinct(query_plan, false, expressions.selected_columns, false, query_info.distinct_order_info);
|
||||
|
||||
if (!from_aggregation_stage && expressions.hasLimitBy())
|
||||
{
|
||||
@ -2112,6 +2112,11 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
|
||||
else
|
||||
query_info.input_order_info = query_info.order_optimizer->getInputOrder(metadata_snapshot, context, limit);
|
||||
}
|
||||
if (!analysis_result.optimize_aggregation_in_order && analysis_result.optimize_distinct_in_order)
|
||||
{
|
||||
query_info.distinct_optimizer = std::make_shared<ReadInOrderOptimizerForDistinct>(analysis_result.selected_columns);
|
||||
query_info.distinct_order_info = query_info.distinct_optimizer->getInputOrder(metadata_snapshot);
|
||||
}
|
||||
|
||||
query_info.storage_limits = std::make_shared<StorageLimitsList>(storage_limits);
|
||||
|
||||
@ -2554,7 +2559,7 @@ void InterpreterSelectQuery::executeProjection(QueryPlan & query_plan, const Act
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct)
|
||||
void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct, InputOrderInfoPtr distinct_info)
|
||||
{
|
||||
auto & query = getSelectQuery();
|
||||
if (query.distinct)
|
||||
@ -2571,8 +2576,11 @@ void InterpreterSelectQuery::executeDistinct(QueryPlan & query_plan, bool before
|
||||
|
||||
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
|
||||
|
||||
auto distinct_step
|
||||
= std::make_unique<DistinctStep>(query_plan.getCurrentDataStream(), limits, limit_for_distinct, columns, pre_distinct);
|
||||
if (distinct_info && !settings.optimize_distinct_in_order)
|
||||
distinct_info = nullptr;
|
||||
|
||||
auto distinct_step = std::make_unique<DistinctStep>(
|
||||
query_plan.getCurrentDataStream(), limits, limit_for_distinct, columns, pre_distinct, distinct_info);
|
||||
|
||||
if (pre_distinct)
|
||||
distinct_step->setStepDescription("Preliminary DISTINCT");
|
||||
|
@ -180,7 +180,7 @@ private:
|
||||
void executeLimit(QueryPlan & query_plan);
|
||||
void executeOffset(QueryPlan & query_plan);
|
||||
static void executeProjection(QueryPlan & query_plan, const ActionsDAGPtr & expression);
|
||||
void executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct);
|
||||
void executeDistinct(QueryPlan & query_plan, bool before_order, Names columns, bool pre_distinct, InputOrderInfoPtr distinct_info);
|
||||
void executeExtremes(QueryPlan & query_plan);
|
||||
void executeSubqueriesInSetsAndJoins(QueryPlan & query_plan, std::unordered_map<String, SubqueryForSet> & subqueries_for_sets);
|
||||
void
|
||||
|
@ -319,7 +319,7 @@ void InterpreterSelectWithUnionQuery::buildQueryPlan(QueryPlan & query_plan)
|
||||
SizeLimits limits(settings.max_rows_in_distinct, settings.max_bytes_in_distinct, settings.distinct_overflow_mode);
|
||||
|
||||
auto distinct_step
|
||||
= std::make_unique<DistinctStep>(query_plan.getCurrentDataStream(), limits, 0, result_header.getNames(), false);
|
||||
= std::make_unique<DistinctStep>(query_plan.getCurrentDataStream(), limits, 0, result_header.getNames(), false, nullptr);
|
||||
|
||||
query_plan.addStep(std::move(distinct_step));
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
#include <Processors/QueryPlan/DistinctStep.h>
|
||||
#include <Processors/Transforms/DistinctPrimaryKeyTransform.h>
|
||||
#include <Processors/Transforms/DistinctTransform.h>
|
||||
#include <QueryPipeline/QueryPipelineBuilder.h>
|
||||
#include <IO/Operators.h>
|
||||
@ -43,7 +44,8 @@ DistinctStep::DistinctStep(
|
||||
const SizeLimits & set_size_limits_,
|
||||
UInt64 limit_hint_,
|
||||
const Names & columns_,
|
||||
bool pre_distinct_)
|
||||
bool pre_distinct_,
|
||||
const InputOrderInfoPtr & distinct_info_)
|
||||
: ITransformingStep(
|
||||
input_stream_,
|
||||
input_stream_.header,
|
||||
@ -51,6 +53,7 @@ DistinctStep::DistinctStep(
|
||||
, set_size_limits(set_size_limits_)
|
||||
, limit_hint(limit_hint_)
|
||||
, columns(columns_)
|
||||
, distinct_info(distinct_info_)
|
||||
, pre_distinct(pre_distinct_)
|
||||
{
|
||||
if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing
|
||||
@ -71,13 +74,29 @@ void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil
|
||||
if (!pre_distinct)
|
||||
pipeline.resize(1);
|
||||
|
||||
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
|
||||
if (pre_distinct && distinct_info)
|
||||
{
|
||||
if (stream_type != QueryPipelineBuilder::StreamType::Main)
|
||||
return nullptr;
|
||||
pipeline.addSimpleTransform(
|
||||
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
|
||||
{
|
||||
if (stream_type != QueryPipelineBuilder::StreamType::Main)
|
||||
return nullptr;
|
||||
|
||||
return std::make_shared<DistinctTransform>(header, set_size_limits, limit_hint, columns);
|
||||
});
|
||||
return std::make_shared<DistinctPrimaryKeyTransform>(
|
||||
header, set_size_limits, limit_hint, distinct_info->order_key_prefix_descr, columns);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
pipeline.addSimpleTransform(
|
||||
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
|
||||
{
|
||||
if (stream_type != QueryPipelineBuilder::StreamType::Main)
|
||||
return nullptr;
|
||||
|
||||
return std::make_shared<DistinctTransform>(header, set_size_limits, limit_hint, columns);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void DistinctStep::describeActions(FormatSettings & settings) const
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
#include <Processors/QueryPlan/ITransformingStep.h>
|
||||
#include <QueryPipeline/SizeLimits.h>
|
||||
#include <Storages/SelectQueryInfo.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -10,11 +11,12 @@ class DistinctStep : public ITransformingStep
|
||||
{
|
||||
public:
|
||||
DistinctStep(
|
||||
const DataStream & input_stream_,
|
||||
const SizeLimits & set_size_limits_,
|
||||
UInt64 limit_hint_,
|
||||
const Names & columns_,
|
||||
bool pre_distinct_); /// If is enabled, execute distinct for separate streams. Otherwise, merge streams.
|
||||
const DataStream & input_stream_,
|
||||
const SizeLimits & set_size_limits_,
|
||||
UInt64 limit_hint_,
|
||||
const Names & columns_,
|
||||
bool pre_distinct_, /// If is enabled, execute distinct for separate streams. Otherwise, merge streams.
|
||||
const InputOrderInfoPtr & distinct_info_);
|
||||
|
||||
String getName() const override { return "Distinct"; }
|
||||
|
||||
@ -27,6 +29,7 @@ private:
|
||||
SizeLimits set_size_limits;
|
||||
UInt64 limit_hint;
|
||||
Names columns;
|
||||
InputOrderInfoPtr distinct_info;
|
||||
bool pre_distinct;
|
||||
};
|
||||
|
||||
|
214
src/Processors/Transforms/DistinctPrimaryKeyTransform.cpp
Normal file
214
src/Processors/Transforms/DistinctPrimaryKeyTransform.cpp
Normal file
@ -0,0 +1,214 @@
|
||||
#include <Processors/Transforms/DistinctPrimaryKeyTransform.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int SET_SIZE_LIMIT_EXCEEDED;
|
||||
}
|
||||
|
||||
DistinctPrimaryKeyTransform::DistinctPrimaryKeyTransform(
|
||||
const Block & header_,
|
||||
const SizeLimits & output_size_limits_,
|
||||
UInt64 limit_hint_,
|
||||
const SortDescription & sorted_columns_descr_,
|
||||
const Names & source_columns)
|
||||
: ISimpleTransform(header_, header_, true)
|
||||
, limit_hint(limit_hint_)
|
||||
, output_size_limits(output_size_limits_)
|
||||
, sorted_columns_descr(sorted_columns_descr_)
|
||||
{
|
||||
/// calculate sorted columns positions
|
||||
sorted_columns_pos.reserve(sorted_columns_descr.size());
|
||||
for (auto const & descr : sorted_columns_descr)
|
||||
{
|
||||
size_t pos = header_.getPositionByName(descr.column_name);
|
||||
sorted_columns_pos.emplace_back(pos);
|
||||
}
|
||||
|
||||
/// calculate not-sorted columns positions
|
||||
other_columns_pos.reserve(source_columns.size());
|
||||
for (const auto & source_column : source_columns)
|
||||
{
|
||||
size_t pos = header_.getPositionByName(source_column);
|
||||
if (std::find(sorted_columns_pos.begin(), sorted_columns_pos.end(), pos) != sorted_columns_pos.end())
|
||||
continue;
|
||||
|
||||
const auto & col = header_.getByPosition(pos).column;
|
||||
if (col && !isColumnConst(*col))
|
||||
other_columns_pos.emplace_back(pos);
|
||||
}
|
||||
|
||||
/// reserve space in auxiliary column vectors for processing
|
||||
sorted_columns.reserve(sorted_columns_pos.size());
|
||||
other_columns.reserve(other_columns_pos.size());
|
||||
current_key.reserve(sorted_columns.size());
|
||||
}
|
||||
|
||||
void DistinctPrimaryKeyTransform::initChunkProcessing(const Columns & input_columns)
|
||||
{
|
||||
sorted_columns.clear();
|
||||
for (size_t pos : sorted_columns_pos)
|
||||
sorted_columns.emplace_back(input_columns[pos].get());
|
||||
|
||||
other_columns.clear();
|
||||
for (size_t pos : other_columns_pos)
|
||||
other_columns.emplace_back(input_columns[pos].get());
|
||||
|
||||
if (!other_columns.empty() && data.type == ClearableSetVariants::Type::EMPTY)
|
||||
data.init(ClearableSetVariants::chooseMethod(other_columns, other_columns_sizes));
|
||||
}
|
||||
|
||||
size_t DistinctPrimaryKeyTransform::ordinaryDistinctOnRange(IColumn::Filter & filter, size_t range_begin, size_t range_end)
|
||||
{
|
||||
size_t count = 0;
|
||||
switch (data.type)
|
||||
{
|
||||
case ClearableSetVariants::Type::EMPTY:
|
||||
break;
|
||||
// clang-format off
|
||||
#define M(NAME) \
|
||||
case ClearableSetVariants::Type::NAME: \
|
||||
count = buildFilterForRange(*data.NAME, filter, range_begin, range_end, data); \
|
||||
break;
|
||||
|
||||
APPLY_FOR_SET_VARIANTS(M)
|
||||
#undef M
|
||||
// clang-format on
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
template <typename Method>
|
||||
size_t DistinctPrimaryKeyTransform::buildFilterForRange(
|
||||
Method & method, IColumn::Filter & filter, size_t range_begin, size_t range_end, ClearableSetVariants & variants)
|
||||
{
|
||||
typename Method::State state(other_columns, other_columns_sizes, nullptr);
|
||||
method.data.clear();
|
||||
|
||||
size_t count = 0;
|
||||
for (size_t i = range_begin; i < range_end; ++i)
|
||||
{
|
||||
auto emplace_result = state.emplaceKey(method.data, i, variants.string_pool);
|
||||
|
||||
/// emit the record if there is no such key in the current set, skip otherwise
|
||||
filter[i] = emplace_result.isInserted();
|
||||
if (filter[i])
|
||||
++count;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
void DistinctPrimaryKeyTransform::setCurrentKey(const size_t row_pos)
|
||||
{
|
||||
current_key.clear();
|
||||
for (auto const & col : sorted_columns)
|
||||
{
|
||||
current_key.emplace_back(col->cloneEmpty());
|
||||
current_key.back()->insertFrom(*col, row_pos);
|
||||
}
|
||||
}
|
||||
|
||||
bool DistinctPrimaryKeyTransform::isCurrentKey(const size_t row_pos)
|
||||
{
|
||||
for (size_t i = 0; i < sorted_columns.size(); ++i)
|
||||
{
|
||||
const auto & sort_col_desc = sorted_columns_descr[i];
|
||||
int res = sort_col_desc.direction * current_key[i]->compareAt(0, row_pos, *sorted_columns[i], sort_col_desc.nulls_direction);
|
||||
if (res != 0)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t DistinctPrimaryKeyTransform::getRangeEnd(size_t range_begin, size_t range_end)
|
||||
{
|
||||
size_t low = range_begin;
|
||||
size_t high = range_end-1;
|
||||
while (low <= high)
|
||||
{
|
||||
size_t mid = low + (high - low) / 2;
|
||||
if (isCurrentKey(mid))
|
||||
low = mid + 1;
|
||||
else
|
||||
{
|
||||
high = mid - 1;
|
||||
range_end = mid;
|
||||
}
|
||||
}
|
||||
return range_end;
|
||||
}
|
||||
|
||||
size_t DistinctPrimaryKeyTransform::getStartPosition(const size_t chunk_rows)
|
||||
{
|
||||
if (!current_key.empty()) // current_key is empty on very first transform() call
|
||||
{
|
||||
if (other_columns.empty() && isCurrentKey(0))
|
||||
return getRangeEnd(0, chunk_rows);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void DistinctPrimaryKeyTransform::transform(Chunk & chunk)
|
||||
{
|
||||
const size_t chunk_rows = chunk.getNumRows();
|
||||
size_t output_rows = 0;
|
||||
Columns input_columns = chunk.detachColumns();
|
||||
|
||||
/// split input columns into sorted and other("non-sorted") columns
|
||||
initChunkProcessing(input_columns);
|
||||
|
||||
/// build filter:
|
||||
/// (1) find range with the same values in sorted columns -> [range_begin, range_end)
|
||||
/// (2) for found range
|
||||
/// if there is no "non-sorted" columns: filter out all rows in range except first one
|
||||
/// otherwise: apply ordinary distinct
|
||||
/// (3) repeat until chunk is processed
|
||||
IColumn::Filter filter(chunk_rows);
|
||||
size_t range_begin = getStartPosition(chunk_rows);
|
||||
if (range_begin > 0)
|
||||
std::fill(filter.begin(), filter.begin() + range_begin, 0); /// skip rows already included in distinct on previous transform()
|
||||
|
||||
size_t range_end = range_begin;
|
||||
while (range_end != chunk_rows)
|
||||
{
|
||||
// set current key to find range
|
||||
setCurrentKey(range_begin);
|
||||
|
||||
// find new range [range_begin, range_end)
|
||||
range_end = getRangeEnd(range_begin, chunk_rows);
|
||||
|
||||
// update filter for range
|
||||
if (other_columns.empty())
|
||||
{
|
||||
filter[range_begin] = 1;
|
||||
std::fill(filter.begin() + range_begin + 1, filter.begin() + range_end, 0);
|
||||
++output_rows;
|
||||
}
|
||||
else
|
||||
{
|
||||
// ordinary distinct in range if there are "non-sorted" columns
|
||||
output_rows += ordinaryDistinctOnRange(filter, range_begin, range_end);
|
||||
}
|
||||
|
||||
// set where next range start
|
||||
range_begin = range_end;
|
||||
}
|
||||
|
||||
/// apply the built filter
|
||||
for (auto & input_column : input_columns)
|
||||
input_column = input_column->filter(filter, output_rows);
|
||||
|
||||
chunk.setColumns(std::move(input_columns), output_rows);
|
||||
|
||||
/// Update total output rows and check limits
|
||||
total_output_rows += output_rows;
|
||||
if ((limit_hint && total_output_rows >= limit_hint)
|
||||
|| !output_size_limits.check(total_output_rows, data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
|
||||
{
|
||||
stopReading();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
63
src/Processors/Transforms/DistinctPrimaryKeyTransform.h
Normal file
63
src/Processors/Transforms/DistinctPrimaryKeyTransform.h
Normal file
@ -0,0 +1,63 @@
|
||||
#pragma once
|
||||
|
||||
#include <Columns/IColumn.h>
|
||||
#include <Core/ColumnNumbers.h>
|
||||
#include <Core/SortDescription.h>
|
||||
#include <Interpreters/SetVariants.h>
|
||||
#include <Processors/ISimpleTransform.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
///
|
||||
/// DISTINCT optimization for MergeTree family engines
|
||||
/// Applied in case of DISTINCT is done over primary key(prefix) columns
|
||||
/// It leverages their sorting property
|
||||
///
|
||||
class DistinctPrimaryKeyTransform : public ISimpleTransform
|
||||
{
|
||||
public:
|
||||
DistinctPrimaryKeyTransform(
|
||||
const Block & header_,
|
||||
const SizeLimits & output_size_limits_,
|
||||
UInt64 limit_hint_,
|
||||
const SortDescription & sorted_columns_descr_,
|
||||
const Names & source_columns_);
|
||||
|
||||
String getName() const override { return "DistinctPrimaryKeyTransform"; }
|
||||
|
||||
protected:
|
||||
void transform(Chunk & chunk) override;
|
||||
|
||||
private:
|
||||
void initChunkProcessing(const Columns & input_columns);
|
||||
size_t getStartPosition(size_t chunk_rows);
|
||||
size_t ordinaryDistinctOnRange(IColumn::Filter & filter, size_t range_begin, size_t range_end);
|
||||
inline void setCurrentKey(size_t row_pos);
|
||||
inline bool isCurrentKey(size_t row_pos);
|
||||
inline size_t getRangeEnd(size_t range_begin, size_t range_end);
|
||||
|
||||
template <typename Method>
|
||||
size_t
|
||||
buildFilterForRange(Method & method, IColumn::Filter & filter, size_t range_begin, size_t range_end, ClearableSetVariants & variants);
|
||||
|
||||
|
||||
ClearableSetVariants data;
|
||||
const size_t limit_hint;
|
||||
size_t total_output_rows = 0;
|
||||
|
||||
/// Restrictions on the maximum size of the output data.
|
||||
const SizeLimits output_size_limits;
|
||||
|
||||
const SortDescription sorted_columns_descr;
|
||||
ColumnNumbers sorted_columns_pos;
|
||||
ColumnRawPtrs sorted_columns; // used during processing
|
||||
|
||||
ColumnNumbers other_columns_pos;
|
||||
Sizes other_columns_sizes;
|
||||
ColumnRawPtrs other_columns; // used during processing
|
||||
|
||||
MutableColumns current_key;
|
||||
};
|
||||
|
||||
}
|
@ -1,13 +1,15 @@
|
||||
#include <Storages/ReadInOrderOptimizer.h>
|
||||
|
||||
#include <Core/SortDescription.h>
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <Interpreters/TreeRewriter.h>
|
||||
#include <Interpreters/replaceAliasColumnsInQuery.h>
|
||||
#include <Functions/IFunction.h>
|
||||
#include <Interpreters/TableJoin.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -260,4 +262,29 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrder(
|
||||
return getInputOrderImpl(metadata_snapshot, required_sort_description, elements_actions, limit);
|
||||
}
|
||||
|
||||
|
||||
ReadInOrderOptimizerForDistinct::ReadInOrderOptimizerForDistinct(const Names & source_columns_)
|
||||
: source_columns(source_columns_.begin(), source_columns_.end())
|
||||
{
|
||||
}
|
||||
|
||||
InputOrderInfoPtr ReadInOrderOptimizerForDistinct::getInputOrder(const StorageMetadataPtr & metadata_snapshot) const
|
||||
{
|
||||
Names sorting_key_columns = metadata_snapshot->getSortingKeyColumns();
|
||||
if (sorting_key_columns.empty())
|
||||
return {};
|
||||
|
||||
SortDescription order_key_prefix_descr;
|
||||
for (const auto & sorting_key_column : sorting_key_columns)
|
||||
{
|
||||
if (source_columns.find(sorting_key_column) == source_columns.end())
|
||||
break;
|
||||
order_key_prefix_descr.emplace_back(sorting_key_column, 1, 1);
|
||||
}
|
||||
if (order_key_prefix_descr.empty())
|
||||
return {};
|
||||
|
||||
// todo: InputOrderInfo contains more info then needed for distinct, probably it makes sense to replace it
|
||||
return std::make_shared<InputOrderInfo>(SortDescription{}, std::move(order_key_prefix_descr), 1, 0);
|
||||
}
|
||||
}
|
||||
|
@ -39,4 +39,14 @@ private:
|
||||
SortDescription required_sort_description;
|
||||
const ASTSelectQuery & query;
|
||||
};
|
||||
|
||||
class ReadInOrderOptimizerForDistinct
|
||||
{
|
||||
public:
|
||||
explicit ReadInOrderOptimizerForDistinct(const Names & source_columns_);
|
||||
InputOrderInfoPtr getInputOrder(const StorageMetadataPtr & metadata_snapshot) const;
|
||||
|
||||
private:
|
||||
NameSet source_columns;
|
||||
};
|
||||
}
|
||||
|
@ -38,6 +38,9 @@ using TreeRewriterResultPtr = std::shared_ptr<const TreeRewriterResult>;
|
||||
class ReadInOrderOptimizer;
|
||||
using ReadInOrderOptimizerPtr = std::shared_ptr<const ReadInOrderOptimizer>;
|
||||
|
||||
class ReadInOrderOptimizerForDistinct;
|
||||
using ReadInOrderOptimizerForDistinctPtr = std::shared_ptr<const ReadInOrderOptimizerForDistinct>;
|
||||
|
||||
class Cluster;
|
||||
using ClusterPtr = std::shared_ptr<Cluster>;
|
||||
|
||||
@ -95,10 +98,12 @@ struct InputOrderInfo
|
||||
InputOrderInfo(
|
||||
const SortDescription & order_key_fixed_prefix_descr_,
|
||||
const SortDescription & order_key_prefix_descr_,
|
||||
int direction_, UInt64 limit_)
|
||||
int direction_,
|
||||
UInt64 limit_)
|
||||
: order_key_fixed_prefix_descr(order_key_fixed_prefix_descr_)
|
||||
, order_key_prefix_descr(order_key_prefix_descr_)
|
||||
, direction(direction_), limit(limit_)
|
||||
, direction(direction_)
|
||||
, limit(limit_)
|
||||
{
|
||||
}
|
||||
|
||||
@ -160,6 +165,9 @@ struct SelectQueryInfoBase
|
||||
/// Can be modified while reading from storage
|
||||
InputOrderInfoPtr input_order_info;
|
||||
|
||||
ReadInOrderOptimizerForDistinctPtr distinct_optimizer;
|
||||
InputOrderInfoPtr distinct_order_info;
|
||||
|
||||
/// Prepared sets are used for indices by storage engine.
|
||||
/// Example: x IN (1, 2, 3)
|
||||
PreparedSets sets;
|
||||
|
@ -0,0 +1,83 @@
|
||||
disable optimize_distinct_in_order
|
||||
pipeline does _not_ contain the optimization
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(Distinct)
|
||||
DistinctTransform
|
||||
(Distinct)
|
||||
DistinctTransform
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(ReadFromPreparedSource)
|
||||
NullSource 0 → 1
|
||||
enable optimize_distinct_in_order
|
||||
distinct with primary key prefix -> pipeline contains the optimization
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(Distinct)
|
||||
DistinctTransform
|
||||
(Distinct)
|
||||
DistinctPrimaryKeyTransform
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(ReadFromPreparedSource)
|
||||
NullSource 0 → 1
|
||||
distinct with non-primary key prefix -> pipeline does _not_ contain the optimization
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(Distinct)
|
||||
DistinctTransform
|
||||
(Distinct)
|
||||
DistinctTransform
|
||||
(Expression)
|
||||
ExpressionTransform
|
||||
(ReadFromPreparedSource)
|
||||
NullSource 0 → 1
|
||||
the same values in every chunk, distinct in order should skip entire chunks with the same key as previous one
|
||||
single-threaded distinct
|
||||
0
|
||||
multi-threaded distinct
|
||||
0
|
||||
skip part of chunk since it contians values from previous one
|
||||
single-threaded distinct
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
multi-threaded distinct
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
table with not only primary key columns
|
||||
distinct with key-prefix only
|
||||
0
|
||||
distinct with full key, 2 columns
|
||||
0 0
|
||||
0 1
|
||||
0 2
|
||||
0 3
|
||||
0 4
|
||||
0 5
|
||||
0 6
|
||||
0 7
|
||||
0 8
|
||||
0 9
|
||||
distinct with key prefix and non-sorted column
|
||||
0 0
|
||||
0 1
|
||||
0 2
|
||||
0 3
|
||||
0 4
|
@ -0,0 +1,47 @@
|
||||
|
||||
drop table if exists distinct_in_order sync;
|
||||
create table distinct_in_order (a int, b int, c int) engine=MergeTree() order by (a, b, c);
|
||||
|
||||
select 'disable optimize_distinct_in_order';
|
||||
set optimize_distinct_in_order=0;
|
||||
select 'pipeline does _not_ contain the optimization';
|
||||
explain pipeline select distinct * from distinct_in_order settings max_threads=1;
|
||||
|
||||
select 'enable optimize_distinct_in_order';
|
||||
set optimize_distinct_in_order=1;
|
||||
select 'distinct with primary key prefix -> pipeline contains the optimization';
|
||||
explain pipeline select distinct a, c from distinct_in_order settings max_threads=1;
|
||||
select 'distinct with non-primary key prefix -> pipeline does _not_ contain the optimization';
|
||||
explain pipeline select distinct b, c from distinct_in_order settings max_threads=1;
|
||||
|
||||
select 'the same values in every chunk, distinct in order should skip entire chunks with the same key as previous one';
|
||||
drop table if exists distinct_in_order sync;
|
||||
create table distinct_in_order (a int) engine=MergeTree() order by a settings index_granularity=10;
|
||||
insert into distinct_in_order (a) select * from zeros(30);
|
||||
select 'single-threaded distinct';
|
||||
select distinct * from distinct_in_order settings max_block_size=10, max_threads=1;
|
||||
select 'multi-threaded distinct';
|
||||
select distinct * from distinct_in_order settings max_block_size=10;
|
||||
|
||||
select 'skip part of chunk since it contians values from previous one';
|
||||
drop table if exists distinct_in_order sync;
|
||||
create table distinct_in_order (a int) engine=MergeTree() order by a settings index_granularity=10;
|
||||
insert into distinct_in_order (a) select * from zeros(10);
|
||||
insert into distinct_in_order select * from numbers(10);
|
||||
select 'single-threaded distinct';
|
||||
select distinct a from distinct_in_order settings max_block_size=10, max_threads=1;
|
||||
select 'multi-threaded distinct';
|
||||
select distinct a from distinct_in_order settings max_block_size=10;
|
||||
|
||||
select 'table with not only primary key columns';
|
||||
drop table if exists distinct_in_order sync;
|
||||
create table distinct_in_order (a int, b int, c int) engine=MergeTree() order by (a, b);
|
||||
insert into distinct_in_order select number % number, number % 10, number % 5 from numbers(1,1000000);
|
||||
select 'distinct with key-prefix only';
|
||||
select distinct a from distinct_in_order;
|
||||
select 'distinct with full key, 2 columns';
|
||||
select distinct a,b from distinct_in_order order by b;
|
||||
select 'distinct with key prefix and non-sorted column';
|
||||
select distinct a,c from distinct_in_order order by c;
|
||||
|
||||
-- drop table if exists distinct_in_order sync;
|
Loading…
Reference in New Issue
Block a user