mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
Merge pull request #39538 from ClickHouse/fix_distinct_sorted
Fix: DistinctSortedTransform doesn't take advantage of sorting
This commit is contained in:
commit
f414af2348
@ -27,62 +27,96 @@ DistinctSortedTransform::DistinctSortedTransform(
|
||||
if (col && !isColumnConst(*col))
|
||||
column_positions.emplace_back(pos);
|
||||
}
|
||||
column_ptrs.reserve(column_positions.size());
|
||||
|
||||
/// pre-calculate DISTINCT column positions which form sort prefix of sort description
|
||||
sort_prefix_positions.reserve(description.size());
|
||||
for (const auto & column_sort_descr : description)
|
||||
{
|
||||
/// check if there is such column in header
|
||||
if (!header.has(column_sort_descr.column_name))
|
||||
break;
|
||||
|
||||
/// check if sorted column position matches any DISTINCT column
|
||||
const auto pos = header.getPositionByName(column_sort_descr.column_name);
|
||||
if (std::find(begin(column_positions), end(column_positions), pos) == column_positions.end())
|
||||
break;
|
||||
|
||||
sort_prefix_positions.emplace_back(pos);
|
||||
}
|
||||
sort_prefix_columns.reserve(sort_prefix_positions.size());
|
||||
}
|
||||
|
||||
void DistinctSortedTransform::transform(Chunk & chunk)
|
||||
{
|
||||
const ColumnRawPtrs column_ptrs(getKeyColumns(chunk));
|
||||
if (column_ptrs.empty())
|
||||
return;
|
||||
if (unlikely(!chunk.hasRows()))
|
||||
return;
|
||||
|
||||
ColumnRawPtrs clearing_hint_columns(getClearingColumns(column_ptrs));
|
||||
/// get DISTINCT columns from chunk
|
||||
column_ptrs.clear();
|
||||
for (const auto pos : column_positions)
|
||||
{
|
||||
const auto & column = chunk.getColumns()[pos];
|
||||
column_ptrs.emplace_back(column.get());
|
||||
}
|
||||
|
||||
if (data.type == ClearableSetVariants::Type::EMPTY)
|
||||
data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
|
||||
/// get DISTINCT columns from chunk which form sort prefix of sort description
|
||||
sort_prefix_columns.clear();
|
||||
for (const auto pos : sort_prefix_positions)
|
||||
{
|
||||
const auto & column = chunk.getColumns()[pos];
|
||||
sort_prefix_columns.emplace_back(column.get());
|
||||
}
|
||||
|
||||
const size_t rows = chunk.getNumRows();
|
||||
IColumn::Filter filter(rows);
|
||||
if (data.type == ClearableSetVariants::Type::EMPTY)
|
||||
data.init(ClearableSetVariants::chooseMethod(column_ptrs, key_sizes));
|
||||
|
||||
const size_t rows = chunk.getNumRows();
|
||||
IColumn::Filter filter(rows);
|
||||
|
||||
bool has_new_data = false;
|
||||
switch (data.type)
|
||||
{
|
||||
case ClearableSetVariants::Type::EMPTY:
|
||||
break;
|
||||
// clang-format off
|
||||
#define M(NAME) \
|
||||
case ClearableSetVariants::Type::NAME: \
|
||||
has_new_data = buildFilter(*data.NAME, column_ptrs, sort_prefix_columns, filter, rows, data); \
|
||||
break;
|
||||
|
||||
bool has_new_data = false;
|
||||
switch (data.type)
|
||||
{
|
||||
case ClearableSetVariants::Type::EMPTY:
|
||||
break;
|
||||
#define M(NAME) \
|
||||
case ClearableSetVariants::Type::NAME: \
|
||||
has_new_data = buildFilter(*data.NAME, column_ptrs, clearing_hint_columns, filter, rows, data); \
|
||||
break;
|
||||
APPLY_FOR_SET_VARIANTS(M)
|
||||
#undef M
|
||||
}
|
||||
#undef M
|
||||
// clang-format on
|
||||
}
|
||||
|
||||
/// Just go to the next block if there isn't any new record in the current one.
|
||||
if (!has_new_data)
|
||||
{
|
||||
chunk.clear();
|
||||
return;
|
||||
}
|
||||
/// Just go to the next block if there isn't any new record in the current one.
|
||||
if (!has_new_data)
|
||||
{
|
||||
chunk.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
|
||||
{
|
||||
stopReading();
|
||||
chunk.clear();
|
||||
return;
|
||||
}
|
||||
if (!set_size_limits.check(data.getTotalRowCount(), data.getTotalByteCount(), "DISTINCT", ErrorCodes::SET_SIZE_LIMIT_EXCEEDED))
|
||||
{
|
||||
stopReading();
|
||||
chunk.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
/// Stop reading if we already reached the limit.
|
||||
if (limit_hint && data.getTotalRowCount() >= limit_hint)
|
||||
stopReading();
|
||||
/// Stop reading if we already reached the limit.
|
||||
if (limit_hint && data.getTotalRowCount() >= limit_hint)
|
||||
stopReading();
|
||||
|
||||
prev_chunk.chunk = std::move(chunk);
|
||||
prev_chunk.clearing_hint_columns = std::move(clearing_hint_columns);
|
||||
prev_chunk.chunk = std::move(chunk);
|
||||
prev_chunk.clearing_hint_columns = std::move(sort_prefix_columns);
|
||||
|
||||
size_t all_columns = prev_chunk.chunk.getNumColumns();
|
||||
Chunk res_chunk;
|
||||
for (size_t i = 0; i < all_columns; ++i)
|
||||
res_chunk.addColumn(prev_chunk.chunk.getColumns().at(i)->filter(filter, -1));
|
||||
size_t all_columns = prev_chunk.chunk.getNumColumns();
|
||||
Chunk res_chunk;
|
||||
for (size_t i = 0; i < all_columns; ++i)
|
||||
res_chunk.addColumn(prev_chunk.chunk.getColumns().at(i)->filter(filter, -1));
|
||||
|
||||
chunk = std::move(res_chunk);
|
||||
chunk = std::move(res_chunk);
|
||||
}
|
||||
|
||||
|
||||
@ -127,34 +161,6 @@ bool DistinctSortedTransform::buildFilter(
|
||||
return has_new_data;
|
||||
}
|
||||
|
||||
ColumnRawPtrs DistinctSortedTransform::getKeyColumns(const Chunk & chunk) const
|
||||
{
|
||||
ColumnRawPtrs column_ptrs;
|
||||
column_ptrs.reserve(column_positions.size());
|
||||
for (const auto pos : column_positions)
|
||||
{
|
||||
const auto & column = chunk.getColumns()[pos];
|
||||
column_ptrs.emplace_back(column.get());
|
||||
}
|
||||
return column_ptrs;
|
||||
}
|
||||
|
||||
ColumnRawPtrs DistinctSortedTransform::getClearingColumns(const ColumnRawPtrs & key_columns) const
|
||||
{
|
||||
ColumnRawPtrs clearing_hint_columns;
|
||||
clearing_hint_columns.reserve(description.size());
|
||||
for (const auto & sort_column_description : description)
|
||||
{
|
||||
const auto * sort_column_ptr = header.getByName(sort_column_description.column_name).column.get();
|
||||
const auto it = std::find(key_columns.cbegin(), key_columns.cend(), sort_column_ptr);
|
||||
if (it != key_columns.cend()) /// if found in key_columns
|
||||
clearing_hint_columns.emplace_back(sort_column_ptr);
|
||||
else
|
||||
return clearing_hint_columns; /// We will use common prefix of sort description and requested DISTINCT key.
|
||||
}
|
||||
return clearing_hint_columns;
|
||||
}
|
||||
|
||||
bool DistinctSortedTransform::rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m)
|
||||
{
|
||||
for (size_t column_index = 0, num_columns = lhs.size(); column_index < num_columns; ++column_index)
|
||||
|
@ -32,10 +32,6 @@ protected:
|
||||
void transform(Chunk & chunk) override;
|
||||
|
||||
private:
|
||||
ColumnRawPtrs getKeyColumns(const Chunk & chunk) const;
|
||||
/// When clearing_columns changed, we can clean HashSet to memory optimization
|
||||
/// clearing_columns is a left-prefix of SortDescription exists in key_columns
|
||||
ColumnRawPtrs getClearingColumns(const ColumnRawPtrs & key_columns) const;
|
||||
static bool rowsEqual(const ColumnRawPtrs & lhs, size_t n, const ColumnRawPtrs & rhs, size_t m);
|
||||
|
||||
/// return true if has new data
|
||||
@ -59,7 +55,11 @@ private:
|
||||
PreviousChunk prev_chunk;
|
||||
|
||||
Names column_names;
|
||||
ColumnNumbers column_positions;
|
||||
ColumnNumbers column_positions; /// DISTINCT columns positions in header
|
||||
ColumnNumbers sort_prefix_positions; /// DISTINCT columns positions which form sort prefix of sort description
|
||||
ColumnRawPtrs column_ptrs; /// DISTINCT columns from chunk
|
||||
ColumnRawPtrs sort_prefix_columns; /// DISTINCT columns from chunk which form sort prefix of sort description
|
||||
|
||||
ClearableSetVariants data;
|
||||
Sizes key_sizes;
|
||||
UInt64 limit_hint;
|
||||
|
@ -1 +1,3 @@
|
||||
-- check that distinct with and w/o optimization produce the same result
|
||||
-- DISTINCT colums are the same as in ORDER BY
|
||||
-- DISTINCT colums has prefix in ORDER BY columns
|
||||
|
@ -1,10 +1,24 @@
|
||||
select '-- check that distinct with and w/o optimization produce the same result';
|
||||
|
||||
drop table if exists distinct_in_order sync;
|
||||
drop table if exists ordinary_distinct sync;
|
||||
|
||||
select '-- DISTINCT colums are the same as in ORDER BY';
|
||||
create table distinct_in_order (CounterID UInt32, EventDate Date) engine=MergeTree() order by (CounterID, EventDate);
|
||||
insert into distinct_in_order select distinct CounterID, EventDate from test.hits order by CounterID, EventDate settings optimize_distinct_in_order=1;
|
||||
create table ordinary_distinct (CounterID UInt32, EventDate Date) engine=MergeTree() order by (CounterID, EventDate);
|
||||
insert into ordinary_distinct select distinct CounterID, EventDate from test.hits settings optimize_distinct_in_order=0;
|
||||
insert into ordinary_distinct select distinct CounterID, EventDate from test.hits order by CounterID, EventDate settings optimize_distinct_in_order=0;
|
||||
select distinct * from distinct_in_order except select * from ordinary_distinct;
|
||||
|
||||
drop table if exists distinct_in_order sync;
|
||||
drop table if exists ordinary_distinct sync;
|
||||
|
||||
select '-- DISTINCT colums has prefix in ORDER BY columns';
|
||||
create table distinct_in_order (CounterID UInt32, EventDate Date) engine=MergeTree() order by (CounterID, EventDate);
|
||||
insert into distinct_in_order select distinct CounterID, EventDate from test.hits order by CounterID settings optimize_distinct_in_order=1;
|
||||
create table ordinary_distinct (CounterID UInt32, EventDate Date) engine=MergeTree() order by (CounterID, EventDate);
|
||||
insert into ordinary_distinct select distinct CounterID, EventDate from test.hits order by CounterID settings optimize_distinct_in_order=0;
|
||||
select distinct * from distinct_in_order except select * from ordinary_distinct;
|
||||
|
||||
drop table if exists distinct_in_order sync;
|
||||
drop table if exists ordinary_distinct sync;
|
||||
|
Loading…
Reference in New Issue
Block a user