fix LIMIT WITH TIES

This commit is contained in:
Alexander Tokmakov 2020-02-28 23:20:39 +03:00
parent 1c2fe333d2
commit 286c65e89e
6 changed files with 55 additions and 90 deletions

View File

@ -23,6 +23,18 @@ LimitTransform::LimitTransform(
} }
} }
SharedChunkPtr LimitTransform::makeChunkWithPreviousRow(const Chunk & chunk, size_t row) const
{
assert(row < chunk.getNumRows());
auto last_row_columns = chunk.cloneEmptyColumns();
const auto & current_columns = chunk.getColumns();
for (size_t i = 0; i < current_columns.size(); ++i)
last_row_columns[i]->insertFrom(*current_columns[i], row);
SharedChunkPtr shared_chunk = std::make_shared<detail::SharedChunk>(Chunk(std::move(last_row_columns), 1));
shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
return shared_chunk;
}
LimitTransform::Status LimitTransform::prepare() LimitTransform::Status LimitTransform::prepare()
{ {
@ -53,7 +65,7 @@ LimitTransform::Status LimitTransform::prepare()
} }
/// Check if we are done with pushing. /// Check if we are done with pushing.
bool pushing_is_finished = (rows_read >= offset + limit) && ties_row_ref.empty(); bool pushing_is_finished = (rows_read >= offset + limit) && !previous_row_chunk;
if (pushing_is_finished) if (pushing_is_finished)
{ {
if (!always_read_till_end) if (!always_read_till_end)
@ -125,12 +137,9 @@ LimitTransform::Status LimitTransform::prepare()
if (output.hasData()) if (output.hasData())
return Status::PortFull; return Status::PortFull;
/// Save the last row of current block to check if next block begins with the same row (for WITH TIES).
if (with_ties && rows_read == offset + limit) if (with_ties && rows_read == offset + limit)
{ previous_row_chunk = makeChunkWithPreviousRow(current_chunk, current_chunk.getNumRows() - 1);
SharedChunkPtr shared_chunk = new detail::SharedChunk(current_chunk.clone());
shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, shared_chunk->getNumRows() - 1);
}
output.push(std::move(current_chunk)); output.push(std::move(current_chunk));
has_block = false; has_block = false;
@ -138,8 +147,9 @@ LimitTransform::Status LimitTransform::prepare()
return Status::PortFull; return Status::PortFull;
} }
bool may_need_more_data_for_ties = previous_row_chunk || rows_read - rows <= offset + limit;
/// No more data is needed. /// No more data is needed.
if (!always_read_till_end && rows_read >= offset + limit) if (!always_read_till_end && (rows_read >= offset + limit) && !may_need_more_data_for_ties)
input.close(); input.close();
return Status::Ready; return Status::Ready;
@ -148,36 +158,32 @@ LimitTransform::Status LimitTransform::prepare()
void LimitTransform::work() void LimitTransform::work()
{ {
SharedChunkPtr shared_chunk = new detail::SharedChunk(std::move(current_chunk)); SharedChunkPtr shared_chunk = std::make_shared<detail::SharedChunk>(std::move(current_chunk));
shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns()); shared_chunk->sort_columns = extractSortColumns(shared_chunk->getColumns());
size_t num_rows = shared_chunk->getNumRows(); size_t num_rows = shared_chunk->getNumRows();
size_t num_columns = shared_chunk->getNumColumns(); size_t num_columns = shared_chunk->getNumColumns();
if (!ties_row_ref.empty() && rows_read >= offset + limit) if (previous_row_chunk && rows_read >= offset + limit)
{ {
UInt64 len; /// Scan until the first row, which is not equal to previous_row_chunk (for WITH TIES)
for (len = 0; len < num_rows; ++len) size_t current_row_num = 0;
for (; current_row_num < num_rows; ++current_row_num)
{ {
SharedChunkRowRef current_row; if (!shared_chunk->sortColumnsEqualAt(current_row_num, 0, *previous_row_chunk))
current_row.set(shared_chunk, &shared_chunk->sort_columns, len);
if (current_row != ties_row_ref)
{
ties_row_ref.reset();
break; break;
}
} }
auto columns = shared_chunk->detachColumns(); auto columns = shared_chunk->detachColumns();
if (len < num_rows) if (current_row_num < num_rows)
{ {
previous_row_chunk = {};
for (size_t i = 0; i < num_columns; ++i) for (size_t i = 0; i < num_columns; ++i)
columns[i] = columns[i]->cut(0, len); columns[i] = columns[i]->cut(0, current_row_num);
} }
current_chunk.setColumns(std::move(columns), len); current_chunk.setColumns(std::move(columns), current_row_num);
block_processed = true; block_processed = true;
return; return;
} }
@ -193,22 +199,21 @@ void LimitTransform::work()
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows))); static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(rows_read) + static_cast<Int64>(num_rows)));
/// check if other rows in current block equals to last one in limit /// check if other rows in current block equals to last one in limit
if (with_ties) if (with_ties && length)
{ {
ties_row_ref.set(shared_chunk, &shared_chunk->sort_columns, start + length - 1); size_t current_row_num = start + length;
SharedChunkRowRef current_row; previous_row_chunk = makeChunkWithPreviousRow(*shared_chunk, current_row_num - 1);
for (size_t i = ties_row_ref.row_num + 1; i < num_rows; ++i) for (; current_row_num < num_rows; ++current_row_num)
{ {
current_row.set(shared_chunk, &shared_chunk->sort_columns, i); if (!shared_chunk->sortColumnsEqualAt(current_row_num, 0, *previous_row_chunk))
if (current_row == ties_row_ref)
++length;
else
{ {
ties_row_ref.reset(); previous_row_chunk = {};
break; break;
} }
} }
length = current_row_num - start;
} }
if (length == num_rows) if (length == num_rows)
@ -228,7 +233,7 @@ void LimitTransform::work()
block_processed = true; block_processed = true;
} }
ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) ColumnRawPtrs LimitTransform::extractSortColumns(const Columns & columns) const
{ {
ColumnRawPtrs res; ColumnRawPtrs res;
res.reserve(description.size()); res.reserve(description.size());

View File

@ -13,6 +13,7 @@ private:
InputPort & input; InputPort & input;
OutputPort & output; OutputPort & output;
size_t limit; size_t limit;
size_t offset; size_t offset;
size_t rows_read = 0; /// including the last read block size_t rows_read = 0; /// including the last read block
@ -26,10 +27,12 @@ private:
bool with_ties; bool with_ties;
const SortDescription description; const SortDescription description;
SharedChunkRowRef ties_row_ref;
SharedChunkPtr previous_row_chunk;
std::vector<size_t> sort_column_positions; std::vector<size_t> sort_column_positions;
ColumnRawPtrs extractSortColumns(const Columns & columns); SharedChunkPtr makeChunkWithPreviousRow(const Chunk & current_chunk, size_t row_num) const;
ColumnRawPtrs extractSortColumns(const Columns & columns) const;
public: public:
LimitTransform( LimitTransform(

View File

@ -25,67 +25,21 @@ struct SharedChunk : Chunk
ColumnRawPtrs sort_columns; ColumnRawPtrs sort_columns;
SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {} SharedChunk(Chunk && chunk) : Chunk(std::move(chunk)) {}
};
}
inline void intrusive_ptr_add_ref(detail::SharedChunk * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedChunk * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
using SharedChunkPtr = boost::intrusive_ptr<detail::SharedChunk>;
struct SharedChunkRowRef bool sortColumnsEqualAt(size_t row_num, size_t other_row_num, const detail::SharedChunk & other) const
{
ColumnRawPtrs * columns = nullptr;
size_t row_num;
SharedChunkPtr shared_block;
void swap(SharedChunkRowRef & other)
{ {
std::swap(columns, other.columns); size_t size = sort_columns.size();
std::swap(row_num, other.row_num);
std::swap(shared_block, other.shared_block);
}
/// The number and types of columns must match.
bool operator==(const SharedChunkRowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i) for (size_t i = 0; i < size; ++i)
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1)) if (0 != sort_columns[i]->compareAt(row_num, other_row_num, *other.sort_columns[i], 1))
return false; return false;
return true; return true;
} }
bool operator!=(const SharedChunkRowRef & other) const
{
return !(*this == other);
}
void reset()
{
SharedChunkRowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
void set(SharedChunkPtr & shared_block_, ColumnRawPtrs * columns_, size_t row_num_)
{
shared_block = shared_block_;
columns = columns_;
row_num = row_num_;
}
}; };
}
using SharedChunkPtr = std::shared_ptr<detail::SharedChunk>;
} }

View File

@ -146,7 +146,7 @@ private:
if (!shared_chunk_ptr) if (!shared_chunk_ptr)
{ {
shared_chunk_ptr = new detail::SharedChunk(std::move(chunk)); shared_chunk_ptr = std::make_shared<detail::SharedChunk>(std::move(chunk));
cursors[source_num] = SortCursorImpl(shared_chunk_ptr->getColumns(), description, source_num); cursors[source_num] = SortCursorImpl(shared_chunk_ptr->getColumns(), description, source_num);
has_collation |= cursors[source_num].has_collation; has_collation |= cursors[source_num].has_collation;
} }

View File

@ -50,3 +50,5 @@
2 2
2 2
* *
100
100

View File

@ -1,8 +1,6 @@
DROP TABLE IF EXISTS ties; DROP TABLE IF EXISTS ties;
CREATE TABLE ties (a Int) ENGINE = Memory; CREATE TABLE ties (a Int) ENGINE = Memory;
-- SET experimental_use_processors=1;
INSERT INTO ties VALUES (1), (1), (2), (2), (2), (2) (3), (3); INSERT INTO ties VALUES (1), (1), (2), (2), (2), (2) (3), (3);
SELECT a FROM ties order by a limit 1 with ties; SELECT a FROM ties order by a limit 1 with ties;
@ -32,4 +30,7 @@ SELECT '*';
SELECT a FROM ties order by a limit 3, 2 with ties; SELECT a FROM ties order by a limit 3, 2 with ties;
SELECT '*'; SELECT '*';
select count() from (select number > 100 from numbers(2000) order by number > 100 limit 1, 7 with ties); --TODO replace "number > 100" with "number > 100 as n"
select count() from (select number, number < 100 from numbers(2000) order by number < 100 desc limit 10 with ties);
DROP TABLE ties; DROP TABLE ties;