From 965f96bd8476d0edbc50521029345db03d9f249f Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Wed, 20 Jul 2022 20:44:47 +0000 Subject: [PATCH] DISTINCT in order: perf improvement + reduce allocations in DistinctSortedChunkTransform + use it for final distinct as well --- src/Processors/QueryPlan/DistinctStep.cpp | 23 ++------- .../DistinctSortedChunkTransform.cpp | 49 ++++++++++++------- .../Transforms/DistinctSortedChunkTransform.h | 16 +++--- ...ct_in_order_optimization_explain.reference | 4 +- ..._distinct_in_order_optimization_explain.sh | 2 +- 5 files changed, 49 insertions(+), 45 deletions(-) diff --git a/src/Processors/QueryPlan/DistinctStep.cpp b/src/Processors/QueryPlan/DistinctStep.cpp index 553732fbcc5..d1ca985bb2a 100644 --- a/src/Processors/QueryPlan/DistinctStep.cpp +++ b/src/Processors/QueryPlan/DistinctStep.cpp @@ -94,8 +94,10 @@ void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil SortDescription distinct_sort_desc = getSortDescription(input_stream.sort_description, columns); if (!distinct_sort_desc.empty()) { - /// pre-distinct for sorted chunks - if (pre_distinct) + const bool sorted_stream = input_stream.sort_mode == DataStream::SortMode::Stream; + /// pre-distinct for sorted chunks or + /// final distinct for sorted stream (sorting inside and among chunks) + if (pre_distinct || sorted_stream) { pipeline.addSimpleTransform( [&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr @@ -104,22 +106,7 @@ void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil return nullptr; return std::make_shared( - header, set_size_limits, limit_hint, distinct_sort_desc, columns); - }); - return; - } - /// final distinct for sorted stream (sorting inside and among chunks) - if (input_stream.sort_mode == DataStream::SortMode::Stream) - { - assert(input_stream.has_single_port); - - pipeline.addSimpleTransform( - [&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr - { - if (stream_type != QueryPipelineBuilder::StreamType::Main) - return nullptr; - - return std::make_shared(header, distinct_sort_desc, set_size_limits, limit_hint, columns); + header, set_size_limits, limit_hint, distinct_sort_desc, columns, sorted_stream); }); return; } diff --git a/src/Processors/Transforms/DistinctSortedChunkTransform.cpp b/src/Processors/Transforms/DistinctSortedChunkTransform.cpp index 064c827a8cc..8604cca5a5c 100644 --- a/src/Processors/Transforms/DistinctSortedChunkTransform.cpp +++ b/src/Processors/Transforms/DistinctSortedChunkTransform.cpp @@ -13,11 +13,13 @@ DistinctSortedChunkTransform::DistinctSortedChunkTransform( const SizeLimits & output_size_limits_, UInt64 limit_hint_, const SortDescription & sorted_columns_descr_, - const Names & source_columns) + const Names & source_columns, + const bool sorted_stream_) : ISimpleTransform(header_, header_, true) , limit_hint(limit_hint_) , output_size_limits(output_size_limits_) , sorted_columns_descr(sorted_columns_descr_) + , sorted_stream(sorted_stream_) { /// calculate sorted columns positions sorted_columns_pos.reserve(sorted_columns_descr.size()); @@ -43,7 +45,7 @@ DistinctSortedChunkTransform::DistinctSortedChunkTransform( /// reserve space in auxiliary column vectors for processing sorted_columns.reserve(sorted_columns_pos.size()); other_columns.reserve(other_columns_pos.size()); - current_key.reserve(sorted_columns.size()); + prev_chunk_latest_key.reserve(sorted_columns.size()); } void DistinctSortedChunkTransform::initChunkProcessing(const Columns & input_columns) @@ -101,28 +103,40 @@ size_t DistinctSortedChunkTransform::buildFilterForRange( return count; } -void DistinctSortedChunkTransform::setCurrentKey(const size_t row_pos) +void DistinctSortedChunkTransform::saveLatestKey(const size_t row_pos) { - current_key.clear(); + prev_chunk_latest_key.clear(); for (auto const & col : sorted_columns) { - current_key.emplace_back(col->cloneEmpty()); - current_key.back()->insertFrom(*col, row_pos); + prev_chunk_latest_key.emplace_back(col->cloneEmpty()); + prev_chunk_latest_key.back()->insertFrom(*col, row_pos); } } -bool DistinctSortedChunkTransform::isCurrentKey(const size_t row_pos) const +bool DistinctSortedChunkTransform::isKey(const size_t key_pos, const size_t row_pos) const { for (size_t i = 0; i < sorted_columns.size(); ++i) { - int res = current_key[i]->compareAt(0, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction); + const int res = sorted_columns[i]->compareAt(key_pos, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction); if (res != 0) return false; } return true; } -size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end) const +bool DistinctSortedChunkTransform::isLatestKeyFromPrevChunk(const size_t row_pos) const +{ + for (size_t i = 0; i < sorted_columns.size(); ++i) + { + const int res = prev_chunk_latest_key[i]->compareAt(0, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction); + if (res != 0) + return false; + } + return true; +} + +template +size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end, Predicate pred) const { assert(begin < end); @@ -133,7 +147,7 @@ size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end) const for (size_t pos = begin; pos < linear_probe_end; ++pos) { - if (!isCurrentKey(pos)) + if (!pred(begin, pos)) return pos; } @@ -142,7 +156,7 @@ size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end) const while (low <= high) { size_t mid = low + (high - low) / 2; - if (isCurrentKey(mid)) + if (pred(begin, mid)) low = mid + 1; else { @@ -155,13 +169,13 @@ size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end) const std::pair DistinctSortedChunkTransform::continueWithPrevRange(const size_t chunk_rows, IColumn::Filter & filter) { - /// current_key is empty on very first transform() call + /// prev_chunk_latest_key is empty on very first transform() call /// or first row doesn't match a key from previous transform() - if (current_key.empty() || !isCurrentKey(0)) + if (prev_chunk_latest_key.empty() || !isLatestKeyFromPrevChunk(0)) return {0, 0}; size_t output_rows = 0; - const size_t range_end = getRangeEnd(0, chunk_rows); + const size_t range_end = getRangeEnd(0, chunk_rows, [&](size_t, size_t row_pos) { return isLatestKeyFromPrevChunk(row_pos); }); if (other_columns.empty()) std::fill(filter.begin(), filter.begin() + range_end, 0); /// skip rows already included in distinct on previous transform() else @@ -191,11 +205,8 @@ void DistinctSortedChunkTransform::transform(Chunk & chunk) size_t range_end = range_begin; while (range_end != chunk_rows) { - // set current key to find range - setCurrentKey(range_begin); - // find new range [range_begin, range_end) - range_end = getRangeEnd(range_begin, chunk_rows); + range_end = getRangeEnd(range_begin, chunk_rows, [&](size_t key_pos, size_t row_pos) { return isKey(key_pos, row_pos); }); // update filter for range if (other_columns.empty()) @@ -214,6 +225,8 @@ void DistinctSortedChunkTransform::transform(Chunk & chunk) range_begin = range_end; } + saveLatestKey(chunk_rows - 1); + /// apply the built filter for (auto & input_column : input_columns) input_column = input_column->filter(filter, output_rows); diff --git a/src/Processors/Transforms/DistinctSortedChunkTransform.h b/src/Processors/Transforms/DistinctSortedChunkTransform.h index 2e21c36f7dc..0ce8addbf7e 100644 --- a/src/Processors/Transforms/DistinctSortedChunkTransform.h +++ b/src/Processors/Transforms/DistinctSortedChunkTransform.h @@ -32,9 +32,10 @@ public: const SizeLimits & output_size_limits_, UInt64 limit_hint_, const SortDescription & sorted_columns_descr_, - const Names & source_columns_); + const Names & source_columns_, + bool sorted_stream_); - String getName() const override { return "DistinctSortedChunkTransform"; } + String getName() const override { return (!sorted_stream ? "DistinctSortedChunkTransform" : "DistinctSortedStreamTransform"); } protected: void transform(Chunk & chunk) override; @@ -43,9 +44,11 @@ private: void initChunkProcessing(const Columns & input_columns); std::pair continueWithPrevRange(size_t chunk_rows, IColumn::Filter & filter); size_t ordinaryDistinctOnRange(IColumn::Filter & filter, size_t range_begin, size_t range_end, bool clear_data); - inline void setCurrentKey(size_t row_pos); - inline bool isCurrentKey(size_t row_pos) const; - inline size_t getRangeEnd(size_t range_begin, size_t range_end) const; + inline void saveLatestKey(size_t row_pos); + inline bool isLatestKeyFromPrevChunk(size_t row_pos) const; + inline bool isKey(size_t key_pos, size_t row_pos) const; + template + inline size_t getRangeEnd(size_t range_begin, size_t range_end, Predicate pred) const; template size_t buildFilterForRange(Method & method, IColumn::Filter & filter, size_t range_begin, size_t range_end, bool clear_data); @@ -66,7 +69,8 @@ private: Sizes other_columns_sizes; ColumnRawPtrs other_columns; // used during processing - MutableColumns current_key; + MutableColumns prev_chunk_latest_key; + const bool sorted_stream = false; }; } diff --git a/tests/queries/0_stateless/02317_distinct_in_order_optimization_explain.reference b/tests/queries/0_stateless/02317_distinct_in_order_optimization_explain.reference index 2dac69edc41..f30d3fa30ea 100644 --- a/tests/queries/0_stateless/02317_distinct_in_order_optimization_explain.reference +++ b/tests/queries/0_stateless/02317_distinct_in_order_optimization_explain.reference @@ -7,13 +7,13 @@ DistinctSortedChunkTransform -- distinct with primary key prefix -> pre-distinct optimization only DistinctSortedChunkTransform -- distinct with primary key prefix and order by on column in distinct -> pre-distinct and final distinct optimization -DistinctSortedTransform +DistinctSortedStreamTransform DistinctSortedChunkTransform -- distinct with primary key prefix and order by on column _not_ in distinct -> pre-distinct optimization only DistinctSortedChunkTransform -- distinct with non-primary key prefix -> no optimizations No optimizations -- distinct with non-primary key prefix and order by on column in distinct -> final distinct optimization only -DistinctSortedTransform +DistinctSortedStreamTransform -- distinct with non-primary key prefix and order by on column _not_ in distinct -> no optimizations No optimizations diff --git a/tests/queries/0_stateless/02317_distinct_in_order_optimization_explain.sh b/tests/queries/0_stateless/02317_distinct_in_order_optimization_explain.sh index 21f50a147ac..9af0e98ecf4 100755 --- a/tests/queries/0_stateless/02317_distinct_in_order_optimization_explain.sh +++ b/tests/queries/0_stateless/02317_distinct_in_order_optimization_explain.sh @@ -8,7 +8,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) DISABLE_OPTIMIZATION="set optimize_distinct_in_order=0" ENABLE_OPTIMIZATION="set optimize_distinct_in_order=1" -GREP_OPTIMIZATIONS="grep 'DistinctSortedChunkTransform\|DistinctSortedTransform'" +GREP_OPTIMIZATIONS="grep 'DistinctSortedChunkTransform\|DistinctSortedStreamTransform'" TRIM_LEADING_SPACES="sed -e 's/^[ \t]*//'" FIND_OPTIMIZATIONS="$GREP_OPTIMIZATIONS | $TRIM_LEADING_SPACES"