limit with ties

This commit is contained in:
dmitrii 2019-04-19 16:38:25 +03:00
parent 8d9105eb25
commit a130850e15
14 changed files with 214 additions and 108 deletions

View File

@ -134,7 +134,7 @@ list (APPEND dbms_headers src/TableFunctions/ITableFunction.h src/TableFunctio
list (APPEND dbms_sources src/Dictionaries/DictionaryFactory.cpp src/Dictionaries/DictionarySourceFactory.cpp src/Dictionaries/DictionaryStructure.cpp)
list (APPEND dbms_headers src/Dictionaries/DictionaryFactory.h src/Dictionaries/DictionarySourceFactory.h src/Dictionaries/DictionaryStructure.h)
add_library(clickhouse_common_io ${LINK_MODE} ${clickhouse_common_io_headers} ${clickhouse_common_io_sources})
add_library(clickhouse_common_io ${LINK_MODE} ${clickhouse_common_io_headers} ${clickhouse_common_io_sources} src/Common/SharedBlockRowRef.h)
if (OS_FREEBSD)
target_compile_definitions (clickhouse_common_io PUBLIC CLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST)

View File

@ -0,0 +1,90 @@
#pragma once
#include <algorithm>
#include "../Core/Block.h"
#include "../../../contrib/boost/boost/smart_ptr/intrusive_ptr.hpp"
#include "../Columns/IColumn.h"
namespace DB
{
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedBlock : Block
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedBlock(Block && block) : Block(std::move(block)) {}
};
}
inline void intrusive_ptr_add_ref(detail::SharedBlock * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedBlock * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
struct SharedBlockRowRef
{
ColumnRawPtrs * columns = nullptr;
size_t row_num;
SharedBlockPtr shared_block;
void swap(SharedBlockRowRef & other)
{
std::swap(columns, other.columns);
std::swap(row_num, other.row_num);
std::swap(shared_block, other.shared_block);
}
/// The number and types of columns must match.
bool operator==(const SharedBlockRowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
bool operator!=(const SharedBlockRowRef & other) const
{
return !(*this == other);
}
void reset()
{
SharedBlockRowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
};
void setRowRef(SharedBlockRowRef & row_ref, SharedBlockPtr shared_block, ColumnRawPtrs * columns, size_t row_num)
{
row_ref.row_num = row_num;
row_ref.columns = columns;
row_ref.shared_block = shared_block;
}
}

View File

@ -43,8 +43,8 @@ private:
ColumnNumbers column_numbers_not_to_aggregate;
std::vector<ColumnAggregateFunction *> columns_to_aggregate;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
/** We support two different cursors - with Collation and without.
* Templates are used instead of polymorphic SortCursor and calls to virtual functions.

View File

@ -46,12 +46,12 @@ private:
/// Read is finished.
bool finished = false;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
RowRef first_negative; /// The first negative row for the current primary key.
RowRef last_positive; /// The last positive row for the current primary key.
RowRef last_negative; /// Last negative row. It is only stored if there is not one row is written to output.
SharedBlockRowRef first_negative; /// The first negative row for the current primary key.
SharedBlockRowRef last_positive; /// The last positive row for the current primary key.
SharedBlockRowRef last_negative; /// Last negative row. It is only stored if there is not one row is written to output.
size_t count_positive = 0; /// The number of positive rows for the current primary key.
size_t count_negative = 0; /// The number of negative rows for the current primary key.

View File

@ -321,7 +321,7 @@ void GraphiteRollupSortedBlockInputStream::finishCurrentGroup(MutableColumns & m
}
void GraphiteRollupSortedBlockInputStream::accumulateRow(RowRef & row)
void GraphiteRollupSortedBlockInputStream::accumulateRow(SharedBlockRowRef & row)
{
const Graphite::AggregationPattern * aggregation_pattern = std::get<1>(current_rule);
if (aggregate_state_created)

View File

@ -204,7 +204,7 @@ private:
StringRef current_group_path;
/// Last row with maximum version for current primary key (time bucket).
RowRef current_subgroup_newest_row;
SharedBlockRowRef current_subgroup_newest_row;
/// Time of last read row
time_t current_time = 0;
@ -236,7 +236,7 @@ private:
void finishCurrentGroup(MutableColumns & merged_columns);
/// Update the state of the aggregate function with the new `value`.
void accumulateRow(RowRef & row);
void accumulateRow(SharedBlockRowRef & row);
};
}

View File

@ -6,8 +6,33 @@
namespace DB
{
LimitBlockInputStream::LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_, bool use_limit_as_total_rows_approx)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_)
namespace detail
{
ColumnRawPtrs getBlockColumns(const Block & block, const SortDescription description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
}
LimitBlockInputStream::LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_,
bool use_limit_as_total_rows_approx, bool with_ties_, const SortDescription & description_)
: limit(limit_), offset(offset_), always_read_till_end(always_read_till_end_), with_ties(with_ties_)
, description(description_)
{
if (use_limit_as_total_rows_approx)
{
@ -23,6 +48,36 @@ Block LimitBlockInputStream::readImpl()
Block res;
UInt64 rows = 0;
if (with_ties && tiesRowRef.shared_block)
{
res = children.back()->read();
rows = res.rows();
pos += rows;
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
ColumnRawPtrs columns = getBlockColumns(*ptr, description);
UInt64 len;
for (len = 0; len < rows; ++len)
{
SharedBlockRowRef currentRow;
setRowRef(currentRow, ptr, &columns, len);
if (currentRow != tiesRowRef)
{
tiesRowRef.reset();
break;
}
}
if (len < rows - 1)
{
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(0, len);
}
return *ptr;
}
/// pos - how many lines were read, including the last read block
if (pos >= offset + limit)
@ -46,6 +101,7 @@ Block LimitBlockInputStream::readImpl()
pos += rows;
} while (pos <= offset);
/// give away the whole block
if (pos >= offset + rows && pos <= offset + limit)
return res;
@ -60,13 +116,34 @@ Block LimitBlockInputStream::readImpl()
static_cast<Int64>(pos) - static_cast<Int64>(offset),
static_cast<Int64>(limit) + static_cast<Int64>(offset) - static_cast<Int64>(pos) + static_cast<Int64>(rows)));
for (size_t i = 0; i < res.columns(); ++i)
res.safeGetByPosition(i).column = res.safeGetByPosition(i).column->cut(start, length);
SharedBlockPtr ptr = new detail::SharedBlock(std::move(res));
if (with_ties)
{
ColumnRawPtrs columns = getBlockColumns(*ptr, description);
setRowRef(tiesRowRef, ptr, &columns, start + length - 1);
for (size_t i = tiesRowRef.row_num + 1; i < rows; ++i)
{
SharedBlockRowRef currentRow;
setRowRef(currentRow, ptr, &columns, i);
if (currentRow == tiesRowRef)
++length;
else
{
tiesRowRef.reset();
break;
}
}
}
for (size_t i = 0; i < ptr->columns(); ++i)
ptr->safeGetByPosition(i).column = ptr->safeGetByPosition(i).column->cut(start, length);
// TODO: we should provide feedback to child-block, so it will know how many rows are actually consumed.
// It's crucial for streaming engines like Kafka.
return res;
return *ptr;
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Common/SharedBlockRowRef.h>
namespace DB
@ -18,7 +19,10 @@ public:
* when otherwise, due to the cancellation of the request, we would not have received the data for GROUP BY WITH TOTALS from the remote server.
* If use_limit_as_total_rows_approx = true, then addTotalRowsApprox is called to use the limit in progress & stats
*/
LimitBlockInputStream(const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_, bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false);
LimitBlockInputStream(
const BlockInputStreamPtr & input, UInt64 limit_, UInt64 offset_,
bool always_read_till_end_ = false, bool use_limit_as_total_rows_approx = false,
bool with_ties_ = false, const SortDescription & description_ = {});
String getName() const override { return "Limit"; }
@ -32,6 +36,9 @@ private:
UInt64 offset;
UInt64 pos = 0;
bool always_read_till_end;
bool with_ties;
const SortDescription description;
SharedBlockRowRef tiesRowRef;
};
}

View File

@ -5,6 +5,7 @@
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <common/logger_useful.h>
#include <Common/SharedBlockRowRef.h>
#include <Core/Row.h>
#include <Core/SortDescription.h>
@ -24,39 +25,6 @@ namespace ErrorCodes
}
/// Allows you refer to the row in the block and hold the block ownership,
/// and thus avoid creating a temporary row object.
/// Do not use std::shared_ptr, since there is no need for a place for `weak_count` and `deleter`;
/// does not use Poco::SharedPtr, since you need to allocate a block and `refcount` in one piece;
/// does not use Poco::AutoPtr, since it does not have a `move` constructor and there are extra checks for nullptr;
/// The reference counter is not atomic, since it is used from one thread.
namespace detail
{
struct SharedBlock : Block
{
int refcount = 0;
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SharedBlock(Block && block) : Block(std::move(block)) {}
};
}
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
inline void intrusive_ptr_add_ref(detail::SharedBlock * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedBlock * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
/** Merges several sorted streams into one sorted stream.
*/
class MergingSortedBlockInputStream : public IBlockInputStream
@ -78,45 +46,6 @@ public:
Block getHeader() const override { return header; }
protected:
struct RowRef
{
ColumnRawPtrs * columns = nullptr;
size_t row_num;
SharedBlockPtr shared_block;
void swap(RowRef & other)
{
std::swap(columns, other.columns);
std::swap(row_num, other.row_num);
std::swap(shared_block, other.shared_block);
}
/// The number and types of columns must match.
bool operator==(const RowRef & other) const
{
size_t size = columns->size();
for (size_t i = 0; i < size; ++i)
if (0 != (*columns)[i]->compareAt(row_num, other.row_num, *(*other.columns)[i], 1))
return false;
return true;
}
bool operator!=(const RowRef & other) const
{
return !(*this == other);
}
void reset()
{
RowRef empty;
swap(empty);
}
bool empty() const { return columns == nullptr; }
size_t size() const { return empty() ? 0 : columns->size(); }
};
Block readImpl() override;
void readSuffixImpl() override;
@ -197,7 +126,7 @@ protected:
}
template <typename TSortCursor>
void setRowRef(RowRef & row_ref, TSortCursor & cursor)
void setRowRef(SharedBlockRowRef & row_ref, TSortCursor & cursor)
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];
@ -205,7 +134,7 @@ protected:
}
template <typename TSortCursor>
void setPrimaryKeyRef(RowRef & row_ref, TSortCursor & cursor)
void setPrimaryKeyRef(SharedBlockRowRef & row_ref, TSortCursor & cursor)
{
row_ref.row_num = cursor.impl->pos;
row_ref.shared_block = source_blocks[cursor.impl->order];

View File

@ -40,11 +40,11 @@ private:
bool finished = false;
/// Primary key of current row.
RowRef current_key;
SharedBlockRowRef current_key;
/// Primary key of next row.
RowRef next_key;
SharedBlockRowRef next_key;
/// Last row with maximum version for current primary key.
RowRef selected_row;
SharedBlockRowRef selected_row;
/// The position (into current_row_sources) of the row with the highest version.
size_t max_pos = 0;

View File

@ -129,8 +129,8 @@ private:
std::vector<AggregateDescription> columns_to_aggregate;
std::vector<MapDescription> maps_to_sum;
RowRef current_key; /// The current primary key.
RowRef next_key; /// The primary key of the next row.
SharedBlockRowRef current_key; /// The current primary key.
SharedBlockRowRef next_key; /// The primary key of the next row.
Row current_row;
bool current_row_is_zero = true; /// Are all summed columns zero (or empty)? It is updated incrementally.

View File

@ -46,7 +46,7 @@ void VersionedCollapsingSortedBlockInputStream::insertGap(size_t gap_size)
}
}
void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns)
void VersionedCollapsingSortedBlockInputStream::insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns)
{
const auto & columns = row.shared_block->all_columns;
for (size_t i = 0; i < num_columns; ++i)
@ -109,7 +109,7 @@ void VersionedCollapsingSortedBlockInputStream::merge(MutableColumns & merged_co
{
SortCursor current = queue.top();
RowRef next_key;
SharedBlockRowRef next_key;
Int8 sign = static_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];

View File

@ -197,7 +197,7 @@ private:
Int8 sign_in_queue = 0;
const size_t max_rows_in_queue;
/// Rows with the same primary key and sign.
FixedSizeDequeWithGaps<RowRef> current_keys;
FixedSizeDequeWithGaps<SharedBlockRowRef> current_keys;
size_t blocks_written = 0;
@ -207,7 +207,7 @@ private:
void merge(MutableColumns & merged_columns, std::priority_queue<SortCursor> & queue);
/// Output to result row for the current primary key.
void insertRow(size_t skip_rows, const RowRef & row, MutableColumns & merged_columns);
void insertRow(size_t skip_rows, const SharedBlockRowRef & row, MutableColumns & merged_columns);
void insertGap(size_t gap_size);
};

View File

@ -803,7 +803,7 @@ void InterpreterSelectQuery::executeImpl(Pipeline & pipeline, const BlockInputSt
/** Optimization - if there are several sources and there is LIMIT, then first apply the preliminary LIMIT,
* limiting the number of rows in each up to `offset + limit`.
*/
if (query.limitLength() && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.has_limit_by && !settings.extremes)
if (query.limitLength() && !query.limit_with_ties && pipeline.hasMoreThanOneStream() && !query.distinct && !expressions.has_limit_by && !settings.extremes)
{
executePreLimit(pipeline);
}
@ -879,8 +879,8 @@ static std::pair<UInt64, UInt64> getLimitLengthAndOffset(const ASTSelectQuery &
static UInt64 getLimitForSorting(const ASTSelectQuery & query, const Context & context)
{
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
if (!query.distinct && !query.limitBy())
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY or WITH TIES.
if (!query.distinct && !query.limitBy() && !query.limit_with_ties)
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
return limit_length + limit_offset;
@ -1083,11 +1083,12 @@ void InterpreterSelectQuery::executeFetchColumns(
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY but LIMIT is specified, and limit + offset < max_block_size,
/** Optimization - if not specified DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY, WITH TIES but LIMIT is specified, and limit + offset < max_block_size,
* then as the block size we will use limit + offset (not to read more from the table than requested),
* and also set the number of threads to 1.
*/
if (!query.distinct
&& !query.limit_with_ties
&& !query.prewhere()
&& !query.where()
&& !query.groupBy()
@ -1511,7 +1512,7 @@ void InterpreterSelectQuery::executeDistinct(Pipeline & pipeline, bool before_or
UInt64 limit_for_distinct = 0;
/// If after this stage of DISTINCT ORDER BY is not executed, then you can get no more than limit_length + limit_offset of different rows.
if (!query.orderBy() || !before_order)
if ((!query.orderBy() || !before_order) && !query.limit_with_ties)
limit_for_distinct = limit_length + limit_offset;
pipeline.transform([&](auto & stream)
@ -1546,8 +1547,8 @@ void InterpreterSelectQuery::executeUnion(Pipeline & pipeline)
void InterpreterSelectQuery::executePreLimit(Pipeline & pipeline)
{
auto & query = getSelectQuery();
/// If there is LIMIT
if (query.limitLength())
/// If there is LIMIT and no WITH TIES
if (query.limitLength() && !query.limit_with_ties)
{
auto [limit_length, limit_offset] = getLimitLengthAndOffset(query, context);
pipeline.transform([&, limit = limit_length + limit_offset](auto & stream)
@ -1624,13 +1625,15 @@ void InterpreterSelectQuery::executeLimit(Pipeline & pipeline)
if (!query.group_by_with_totals && hasWithTotalsInAnySubqueryInFromClause(query))
always_read_till_end = true;
SortDescription order_descr = getSortDescription(query);
UInt64 limit_length;
UInt64 limit_offset;
std::tie(limit_length, limit_offset) = getLimitLengthAndOffset(query, context);
pipeline.transform([&](auto & stream)
{
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, always_read_till_end);
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length, limit_offset, always_read_till_end, false, query.limit_with_ties, order_descr);
});
}
}