mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-04 13:32:13 +00:00
DISTINCT in order: perf improvement
+ reduce allocations in DistinctSortedChunkTransform + use it for final distinct as well
This commit is contained in:
parent
3fb3015375
commit
965f96bd84
@ -94,8 +94,10 @@ void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil
|
||||
SortDescription distinct_sort_desc = getSortDescription(input_stream.sort_description, columns);
|
||||
if (!distinct_sort_desc.empty())
|
||||
{
|
||||
/// pre-distinct for sorted chunks
|
||||
if (pre_distinct)
|
||||
const bool sorted_stream = input_stream.sort_mode == DataStream::SortMode::Stream;
|
||||
/// pre-distinct for sorted chunks or
|
||||
/// final distinct for sorted stream (sorting inside and among chunks)
|
||||
if (pre_distinct || sorted_stream)
|
||||
{
|
||||
pipeline.addSimpleTransform(
|
||||
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
|
||||
@ -104,22 +106,7 @@ void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil
|
||||
return nullptr;
|
||||
|
||||
return std::make_shared<DistinctSortedChunkTransform>(
|
||||
header, set_size_limits, limit_hint, distinct_sort_desc, columns);
|
||||
});
|
||||
return;
|
||||
}
|
||||
/// final distinct for sorted stream (sorting inside and among chunks)
|
||||
if (input_stream.sort_mode == DataStream::SortMode::Stream)
|
||||
{
|
||||
assert(input_stream.has_single_port);
|
||||
|
||||
pipeline.addSimpleTransform(
|
||||
[&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
|
||||
{
|
||||
if (stream_type != QueryPipelineBuilder::StreamType::Main)
|
||||
return nullptr;
|
||||
|
||||
return std::make_shared<DistinctSortedTransform>(header, distinct_sort_desc, set_size_limits, limit_hint, columns);
|
||||
header, set_size_limits, limit_hint, distinct_sort_desc, columns, sorted_stream);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
@ -13,11 +13,13 @@ DistinctSortedChunkTransform::DistinctSortedChunkTransform(
|
||||
const SizeLimits & output_size_limits_,
|
||||
UInt64 limit_hint_,
|
||||
const SortDescription & sorted_columns_descr_,
|
||||
const Names & source_columns)
|
||||
const Names & source_columns,
|
||||
const bool sorted_stream_)
|
||||
: ISimpleTransform(header_, header_, true)
|
||||
, limit_hint(limit_hint_)
|
||||
, output_size_limits(output_size_limits_)
|
||||
, sorted_columns_descr(sorted_columns_descr_)
|
||||
, sorted_stream(sorted_stream_)
|
||||
{
|
||||
/// calculate sorted columns positions
|
||||
sorted_columns_pos.reserve(sorted_columns_descr.size());
|
||||
@ -43,7 +45,7 @@ DistinctSortedChunkTransform::DistinctSortedChunkTransform(
|
||||
/// reserve space in auxiliary column vectors for processing
|
||||
sorted_columns.reserve(sorted_columns_pos.size());
|
||||
other_columns.reserve(other_columns_pos.size());
|
||||
current_key.reserve(sorted_columns.size());
|
||||
prev_chunk_latest_key.reserve(sorted_columns.size());
|
||||
}
|
||||
|
||||
void DistinctSortedChunkTransform::initChunkProcessing(const Columns & input_columns)
|
||||
@ -101,28 +103,40 @@ size_t DistinctSortedChunkTransform::buildFilterForRange(
|
||||
return count;
|
||||
}
|
||||
|
||||
void DistinctSortedChunkTransform::setCurrentKey(const size_t row_pos)
|
||||
void DistinctSortedChunkTransform::saveLatestKey(const size_t row_pos)
|
||||
{
|
||||
current_key.clear();
|
||||
prev_chunk_latest_key.clear();
|
||||
for (auto const & col : sorted_columns)
|
||||
{
|
||||
current_key.emplace_back(col->cloneEmpty());
|
||||
current_key.back()->insertFrom(*col, row_pos);
|
||||
prev_chunk_latest_key.emplace_back(col->cloneEmpty());
|
||||
prev_chunk_latest_key.back()->insertFrom(*col, row_pos);
|
||||
}
|
||||
}
|
||||
|
||||
bool DistinctSortedChunkTransform::isCurrentKey(const size_t row_pos) const
|
||||
bool DistinctSortedChunkTransform::isKey(const size_t key_pos, const size_t row_pos) const
|
||||
{
|
||||
for (size_t i = 0; i < sorted_columns.size(); ++i)
|
||||
{
|
||||
int res = current_key[i]->compareAt(0, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction);
|
||||
const int res = sorted_columns[i]->compareAt(key_pos, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction);
|
||||
if (res != 0)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end) const
|
||||
bool DistinctSortedChunkTransform::isLatestKeyFromPrevChunk(const size_t row_pos) const
|
||||
{
|
||||
for (size_t i = 0; i < sorted_columns.size(); ++i)
|
||||
{
|
||||
const int res = prev_chunk_latest_key[i]->compareAt(0, row_pos, *sorted_columns[i], sorted_columns_descr[i].nulls_direction);
|
||||
if (res != 0)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
template<typename Predicate>
|
||||
size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end, Predicate pred) const
|
||||
{
|
||||
assert(begin < end);
|
||||
|
||||
@ -133,7 +147,7 @@ size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end) const
|
||||
|
||||
for (size_t pos = begin; pos < linear_probe_end; ++pos)
|
||||
{
|
||||
if (!isCurrentKey(pos))
|
||||
if (!pred(begin, pos))
|
||||
return pos;
|
||||
}
|
||||
|
||||
@ -142,7 +156,7 @@ size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end) const
|
||||
while (low <= high)
|
||||
{
|
||||
size_t mid = low + (high - low) / 2;
|
||||
if (isCurrentKey(mid))
|
||||
if (pred(begin, mid))
|
||||
low = mid + 1;
|
||||
else
|
||||
{
|
||||
@ -155,13 +169,13 @@ size_t DistinctSortedChunkTransform::getRangeEnd(size_t begin, size_t end) const
|
||||
|
||||
std::pair<size_t, size_t> DistinctSortedChunkTransform::continueWithPrevRange(const size_t chunk_rows, IColumn::Filter & filter)
|
||||
{
|
||||
/// current_key is empty on very first transform() call
|
||||
/// prev_chunk_latest_key is empty on very first transform() call
|
||||
/// or first row doesn't match a key from previous transform()
|
||||
if (current_key.empty() || !isCurrentKey(0))
|
||||
if (prev_chunk_latest_key.empty() || !isLatestKeyFromPrevChunk(0))
|
||||
return {0, 0};
|
||||
|
||||
size_t output_rows = 0;
|
||||
const size_t range_end = getRangeEnd(0, chunk_rows);
|
||||
const size_t range_end = getRangeEnd(0, chunk_rows, [&](size_t, size_t row_pos) { return isLatestKeyFromPrevChunk(row_pos); });
|
||||
if (other_columns.empty())
|
||||
std::fill(filter.begin(), filter.begin() + range_end, 0); /// skip rows already included in distinct on previous transform()
|
||||
else
|
||||
@ -191,11 +205,8 @@ void DistinctSortedChunkTransform::transform(Chunk & chunk)
|
||||
size_t range_end = range_begin;
|
||||
while (range_end != chunk_rows)
|
||||
{
|
||||
// set current key to find range
|
||||
setCurrentKey(range_begin);
|
||||
|
||||
// find new range [range_begin, range_end)
|
||||
range_end = getRangeEnd(range_begin, chunk_rows);
|
||||
range_end = getRangeEnd(range_begin, chunk_rows, [&](size_t key_pos, size_t row_pos) { return isKey(key_pos, row_pos); });
|
||||
|
||||
// update filter for range
|
||||
if (other_columns.empty())
|
||||
@ -214,6 +225,8 @@ void DistinctSortedChunkTransform::transform(Chunk & chunk)
|
||||
range_begin = range_end;
|
||||
}
|
||||
|
||||
saveLatestKey(chunk_rows - 1);
|
||||
|
||||
/// apply the built filter
|
||||
for (auto & input_column : input_columns)
|
||||
input_column = input_column->filter(filter, output_rows);
|
||||
|
@ -32,9 +32,10 @@ public:
|
||||
const SizeLimits & output_size_limits_,
|
||||
UInt64 limit_hint_,
|
||||
const SortDescription & sorted_columns_descr_,
|
||||
const Names & source_columns_);
|
||||
const Names & source_columns_,
|
||||
bool sorted_stream_);
|
||||
|
||||
String getName() const override { return "DistinctSortedChunkTransform"; }
|
||||
String getName() const override { return (!sorted_stream ? "DistinctSortedChunkTransform" : "DistinctSortedStreamTransform"); }
|
||||
|
||||
protected:
|
||||
void transform(Chunk & chunk) override;
|
||||
@ -43,9 +44,11 @@ private:
|
||||
void initChunkProcessing(const Columns & input_columns);
|
||||
std::pair<size_t, size_t> continueWithPrevRange(size_t chunk_rows, IColumn::Filter & filter);
|
||||
size_t ordinaryDistinctOnRange(IColumn::Filter & filter, size_t range_begin, size_t range_end, bool clear_data);
|
||||
inline void setCurrentKey(size_t row_pos);
|
||||
inline bool isCurrentKey(size_t row_pos) const;
|
||||
inline size_t getRangeEnd(size_t range_begin, size_t range_end) const;
|
||||
inline void saveLatestKey(size_t row_pos);
|
||||
inline bool isLatestKeyFromPrevChunk(size_t row_pos) const;
|
||||
inline bool isKey(size_t key_pos, size_t row_pos) const;
|
||||
template<typename Predicate>
|
||||
inline size_t getRangeEnd(size_t range_begin, size_t range_end, Predicate pred) const;
|
||||
|
||||
template <typename Method>
|
||||
size_t buildFilterForRange(Method & method, IColumn::Filter & filter, size_t range_begin, size_t range_end, bool clear_data);
|
||||
@ -66,7 +69,8 @@ private:
|
||||
Sizes other_columns_sizes;
|
||||
ColumnRawPtrs other_columns; // used during processing
|
||||
|
||||
MutableColumns current_key;
|
||||
MutableColumns prev_chunk_latest_key;
|
||||
const bool sorted_stream = false;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -7,13 +7,13 @@ DistinctSortedChunkTransform
|
||||
-- distinct with primary key prefix -> pre-distinct optimization only
|
||||
DistinctSortedChunkTransform
|
||||
-- distinct with primary key prefix and order by on column in distinct -> pre-distinct and final distinct optimization
|
||||
DistinctSortedTransform
|
||||
DistinctSortedStreamTransform
|
||||
DistinctSortedChunkTransform
|
||||
-- distinct with primary key prefix and order by on column _not_ in distinct -> pre-distinct optimization only
|
||||
DistinctSortedChunkTransform
|
||||
-- distinct with non-primary key prefix -> no optimizations
|
||||
No optimizations
|
||||
-- distinct with non-primary key prefix and order by on column in distinct -> final distinct optimization only
|
||||
DistinctSortedTransform
|
||||
DistinctSortedStreamTransform
|
||||
-- distinct with non-primary key prefix and order by on column _not_ in distinct -> no optimizations
|
||||
No optimizations
|
||||
|
@ -8,7 +8,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
|
||||
DISABLE_OPTIMIZATION="set optimize_distinct_in_order=0"
|
||||
ENABLE_OPTIMIZATION="set optimize_distinct_in_order=1"
|
||||
GREP_OPTIMIZATIONS="grep 'DistinctSortedChunkTransform\|DistinctSortedTransform'"
|
||||
GREP_OPTIMIZATIONS="grep 'DistinctSortedChunkTransform\|DistinctSortedStreamTransform'"
|
||||
TRIM_LEADING_SPACES="sed -e 's/^[ \t]*//'"
|
||||
FIND_OPTIMIZATIONS="$GREP_OPTIMIZATIONS | $TRIM_LEADING_SPACES"
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user