mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
fix limit with ties
This commit is contained in:
parent
2aa4243c25
commit
e069fc495d
@ -79,27 +79,8 @@ struct SharedBlockRowRef
|
||||
bool empty() const { return columns == nullptr; }
|
||||
size_t size() const { return empty() ? 0 : columns->size(); }
|
||||
|
||||
/// gets pointers to all columns of block, which were used for ORDER BY
|
||||
static ColumnRawPtrs getBlockColumns(const Block & block, const SortDescription description)
|
||||
{
|
||||
size_t size = description.size();
|
||||
ColumnRawPtrs res;
|
||||
res.reserve(size);
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
const IColumn * column = !description[i].column_name.empty()
|
||||
? block.getByName(description[i].column_name).column.get()
|
||||
: block.safeGetByPosition(description[i].column_number).column.get();
|
||||
res.emplace_back(column);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
static void setSharedBlockRowRef(SharedBlockRowRef & row_ref, SharedBlockPtr & shared_block, ColumnRawPtrs * columns,
|
||||
size_t row_num)
|
||||
static void setSharedBlockRowRef(SharedBlockRowRef & row_ref,
|
||||
SharedBlockPtr & shared_block,ColumnRawPtrs * columns, size_t row_num)
|
||||
{
|
||||
row_ref.row_num = row_num;
|
||||
row_ref.columns = columns;
|
||||
@ -107,6 +88,4 @@ struct SharedBlockRowRef
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
@ -6,6 +6,24 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// gets pointers to all columns of block, which were used for ORDER BY
|
||||
static ColumnRawPtrs getSortColumns(const Block & block, const SortDescription & description)
|
||||
{
|
||||
size_t size = description.size();
|
||||
ColumnRawPtrs res;
|
||||
res.reserve(size);
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
const IColumn * column = !description[i].column_name.empty()
|
||||
? block.getByName(description[i].column_name).column.get()
|
||||
: block.safeGetByPosition(description[i].column_number).column.get();
|
||||
res.emplace_back(column);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
LimitBlockInputStream::LimitBlockInputStream(
|
||||
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_,
|
||||
@ -21,29 +39,30 @@ LimitBlockInputStream::LimitBlockInputStream(
|
||||
children.push_back(input);
|
||||
}
|
||||
|
||||
|
||||
Block LimitBlockInputStream::readImpl()
|
||||
{
|
||||
Block res;
|
||||
UInt64 rows = 0;
|
||||
|
||||
/// pos >= offset + limit and all rows in previous block were equal to ties_row_ref
|
||||
/// so we check current block
|
||||
if (with_ties && ties_row_ref.shared_block)
|
||||
/// pos >= offset + limit and all rows in the end of previous block were equal
|
||||
/// to last row at 'limit' position. So we check current block.
|
||||
if (!ties_row_ref.empty() && pos >= offset + limit)
|
||||
{
|
||||
rows = res.rows();
|
||||
pos += rows;
|
||||
res = children.back()->read();
|
||||
rows = res.rows();
|
||||
|
||||
if (!res)
|
||||
return res;
|
||||
|
||||
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
|
||||
ColumnRawPtrs columns = SharedBlockRowRef::getBlockColumns(*ptr, description);
|
||||
UInt64 len;
|
||||
ptr->sort_columns = getSortColumns(*ptr, description);
|
||||
|
||||
UInt64 len;
|
||||
for (len = 0; len < rows; ++len)
|
||||
{
|
||||
SharedBlockRowRef current_row;
|
||||
SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &columns, len);
|
||||
SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &ptr->sort_columns, len);
|
||||
|
||||
if (current_row != ties_row_ref)
|
||||
{
|
||||
ties_row_ref.reset();
|
||||
@ -51,7 +70,7 @@ Block LimitBlockInputStream::readImpl()
|
||||
}
|
||||
}
|
||||
|
||||
if (len < rows - 1)
|
||||
if (len < rows)
|
||||
{
|
||||
for (size_t i = 0; i < ptr->columns(); ++i)
|
||||
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len);
|
||||
@ -60,8 +79,6 @@ Block LimitBlockInputStream::readImpl()
|
||||
return *ptr;
|
||||
}
|
||||
|
||||
/// pos - how many lines were read, including the last read block
|
||||
|
||||
if (pos >= offset + limit)
|
||||
{
|
||||
if (!always_read_till_end)
|
||||
@ -83,10 +100,18 @@ Block LimitBlockInputStream::readImpl()
|
||||
pos += rows;
|
||||
} while (pos <= offset);
|
||||
|
||||
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
|
||||
if (with_ties)
|
||||
ptr->sort_columns = getSortColumns(*ptr, description);
|
||||
|
||||
/// give away the whole block
|
||||
if (pos >= offset + rows && pos <= offset + limit)
|
||||
return res;
|
||||
{
|
||||
/// Save rowref for last row, because probalbly next block begins with the same row.
|
||||
if (with_ties && pos == offset + limit)
|
||||
SharedBlockRowRef::setSharedBlockRowRef(ties_row_ref, ptr, &ptr->sort_columns, rows - 1);
|
||||
return *ptr;
|
||||
}
|
||||
|
||||
/// give away a piece of the block
|
||||
UInt64 start = std::max(
|
||||
@ -98,18 +123,16 @@ Block LimitBlockInputStream::readImpl()
|
||||
static_cast<Int64>(pos) - static_cast<Int64>(offset),
|
||||
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));
|
||||
|
||||
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
|
||||
|
||||
/// check if other rows in current block equals to last one in limit
|
||||
if (with_ties)
|
||||
{
|
||||
ColumnRawPtrs columns = SharedBlockRowRef::getBlockColumns(*ptr, description);
|
||||
SharedBlockRowRef::setSharedBlockRowRef(ties_row_ref, ptr, &columns, start + length - 1);
|
||||
SharedBlockRowRef::setSharedBlockRowRef(ties_row_ref, ptr, &ptr->sort_columns, start + length - 1);
|
||||
|
||||
for (size_t i = ties_row_ref.row_num + 1; i < rows; ++i)
|
||||
{
|
||||
SharedBlockRowRef current_row;
|
||||
SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &columns, i);
|
||||
SharedBlockRowRef::setSharedBlockRowRef(current_row, ptr, &ptr->sort_columns, i);
|
||||
if (current_row == ties_row_ref)
|
||||
++length;
|
||||
else
|
||||
@ -120,6 +143,9 @@ Block LimitBlockInputStream::readImpl()
|
||||
}
|
||||
}
|
||||
|
||||
if (start + length == rows)
|
||||
return *ptr;
|
||||
|
||||
for (size_t i = 0; i < ptr->columns(); ++i)
|
||||
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length);
|
||||
|
||||
|
@ -18,7 +18,7 @@ public:
|
||||
* If always_read_till_end = true - reads all the data to the end, but ignores them. This is necessary in rare cases:
|
||||
* when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server.
|
||||
* If use_limit_as_total_rows_approx = true, then addTotalRowsApprox is called to use the limit in progress & stats
|
||||
* with_ties = true, when query has WITH TIES modifier. If so,description should be provided
|
||||
* with_ties = true, when query has WITH TIES modifier. If so, description should be provided
|
||||
* description lets us know which row we should check for equality
|
||||
*/
|
||||
LimitBlockInputStream(
|
||||
|
@ -719,7 +719,7 @@ static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery &
|
||||
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & context)
|
||||
{
|
||||
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
|
||||
if (!query.distinct && !query.limitBy())
|
||||
if (!query.distinct && !query.limitBy() && !query.limit_with_ties)
|
||||
{
|
||||
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
|
||||
return limit_length + limit_offset;
|
||||
@ -2238,7 +2238,13 @@ void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
|
||||
if (query.limitLength())
|
||||
{
|
||||
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
|
||||
SortDescription sort_descr = getSortDescription(query, context);
|
||||
SortDescription sort_descr;
|
||||
if (query.limit_with_ties)
|
||||
{
|
||||
if (!query.orderBy())
|
||||
throw Exception("LIMIT WITH TIES without ORDER BY", ErrorCodes::LOGICAL_ERROR);
|
||||
sort_descr = getSortDescription(query, context);
|
||||
}
|
||||
pipeline.transform([&, limit = limit_length + limit_offset](auto & stream)
|
||||
{
|
||||
stream = std::make_shared<LimitBlockInputStream>(stream, limit, 0, false, false, query.limit_with_ties, sort_descr);
|
||||
|
Loading…
Reference in New Issue
Block a user