Merge pull request #293 from yandex/issue-111

LIMIT BY clause was implemented
This commit is contained in:
alexey-milovidov 2016-12-31 05:54:03 +04:00 committed by GitHub
commit 49cd3db532
11 changed files with 315 additions and 22 deletions

View File

@ -0,0 +1,41 @@
#pragma once
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Common/HashTable/HashMap.h>
#include <DB/Common/SipHash.h>
#include <DB/Common/UInt128.h>
namespace DB
{
/** Implements LIMIT BY clause witch can be used to obtain a "top N by subgroup".
*
* For example, if you have table T like this (Num: 1 1 3 3 3 4 4 5 7 7 7 7),
* the query SELECT Num FROM T LIMIT 2 BY Num
* will give you the following result: (Num: 1 1 3 3 4 4 5 7 7).
*/
class LimitByBlockInputStream : public IProfilingBlockInputStream
{
public:
LimitByBlockInputStream(BlockInputStreamPtr input_, size_t group_size_, Names columns_);
String getName() const override { return "LimitBy"; }
String getID() const override;
protected:
Block readImpl() override;
private:
ConstColumnPlainPtrs getKeyColumns(Block & block) const;
private:
using MapHashed = HashMap<UInt128, UInt64, UInt128TrivialHash>;
const Names columns_names;
const size_t group_size;
MapHashed keys_counts;
};
}

View File

@ -230,6 +230,8 @@ private:
/// Удалить из ORDER BY повторяющиеся элементы.
void optimizeOrderBy();
void optimizeLimitBy();
/// remove Function_if AST if condition is constant
void optimizeIfWithConstantCondition();
void optimizeIfWithConstantConditionImpl(ASTPtr & current_ast, Aliases & aliases) const;

View File

@ -138,6 +138,7 @@ private:
void executeMergeSorted();
void executePreLimit();
void executeUnion();
void executeLimitBy();
void executeLimit();
void executeProjection(ExpressionActionsPtr expression);
void executeDistinct(bool before_order, Names columns);

View File

@ -57,6 +57,8 @@ public:
bool group_by_with_totals = false;
ASTPtr having_expression;
ASTPtr order_expression_list;
ASTPtr limit_by_value;
ASTPtr limit_by_expression_list;
ASTPtr limit_offset;
ASTPtr limit_length;
ASTPtr settings;

View File

@ -0,0 +1,83 @@
#include <DB/DataStreams/LimitByBlockInputStream.h>
namespace DB
{
LimitByBlockInputStream::LimitByBlockInputStream(BlockInputStreamPtr input_, size_t group_size_, Names columns_)
: columns_names(columns_)
, group_size(group_size_)
{
children.push_back(input_);
}
String LimitByBlockInputStream::getID() const
{
std::stringstream res;
res << "LimitBy(" << this << ")";
return res.str();
}
Block LimitByBlockInputStream::readImpl()
{
/// Execute until end of stream or until
/// a block with some new records will be gotten.
while (true)
{
Block block = children[0]->read();
if (!block)
return Block();
const ConstColumnPlainPtrs column_ptrs(getKeyColumns(block));
const size_t rows = block.rows();
IColumn::Filter filter(rows);
size_t inserted_count = 0;
for (size_t i = 0; i < rows; ++i)
{
UInt128 key;
SipHash hash;
for (auto & column : column_ptrs)
column->updateHashWithValue(i, hash);
hash.get128(key.first, key.second);
if (keys_counts[key]++ < group_size)
{
inserted_count++;
filter[i] = 1;
}
else
filter[i] = 0;
}
/// Just go to the next block if there isn't any new records in the current one.
if (!inserted_count)
continue;
size_t all_columns = block.columns();
for (size_t i = 0; i < all_columns; ++i)
block.getByPosition(i).column = block.getByPosition(i).column->filter(filter, inserted_count);
return block;
}
}
ConstColumnPlainPtrs LimitByBlockInputStream::getKeyColumns(Block & block) const
{
ConstColumnPlainPtrs column_ptrs;
column_ptrs.reserve(columns_names.size());
for (const auto & name : columns_names)
{
auto & column = block.getByName(name).column;
/// Ignore all constant columns.
if (!column->isConst())
column_ptrs.emplace_back(column.get());
}
return column_ptrs;
}
}

View File

@ -176,6 +176,9 @@ void ExpressionAnalyzer::init()
/// Удалить из ORDER BY повторяющиеся элементы.
optimizeOrderBy();
// Remove duplicated elements from LIMIT BY clause.
optimizeLimitBy();
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns();
@ -1169,6 +1172,31 @@ void ExpressionAnalyzer::optimizeOrderBy()
}
void ExpressionAnalyzer::optimizeLimitBy()
{
if (!(select_query && select_query->limit_by_expression_list))
return;
std::set<String> elems_set;
ASTs & elems = select_query->limit_by_expression_list->children;
ASTs unique_elems;
unique_elems.reserve(elems.size());
for (const auto & elem : elems)
{
if (const auto id = typeid_cast<const ASTIdentifier*>(elem.get()))
{
if (elems_set.emplace(id->getColumnName()).second)
unique_elems.emplace_back(elem);
}
}
if (unique_elems.size() < elems.size())
elems = unique_elems;
}
void ExpressionAnalyzer::makeSetsForIndex()
{
if (storage && ast && storage->supportsIndexForIn())

View File

@ -3,6 +3,7 @@
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/FilterBlockInputStream.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/LimitByBlockInputStream.h>
#include <DB/DataStreams/PartialSortingBlockInputStream.h>
#include <DB/DataStreams/MergeSortingBlockInputStream.h>
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
@ -190,19 +191,25 @@ void InterpreterSelectQuery::initQueryAnalyzer()
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_,
size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), to_stage(to_stage_), subquery_depth(subquery_depth_),
is_first_select_inside_union_all(query.isUnionAllHead()),
log(&Logger::get("InterpreterSelectQuery"))
: query_ptr(query_ptr_)
, query(typeid_cast<ASTSelectQuery &>(*query_ptr))
, context(context_)
, to_stage(to_stage_)
, subquery_depth(subquery_depth_)
, is_first_select_inside_union_all(query.isUnionAllHead())
, log(&Logger::get("InterpreterSelectQuery"))
{
init(input_);
}
InterpreterSelectQuery::InterpreterSelectQuery(OnlyAnalyzeTag, ASTPtr query_ptr_, const Context & context_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), to_stage(QueryProcessingStage::Complete), subquery_depth(0),
is_first_select_inside_union_all(false), only_analyze(true),
log(&Logger::get("InterpreterSelectQuery"))
: query_ptr(query_ptr_)
, query(typeid_cast<ASTSelectQuery &>(*query_ptr))
, context(context_)
, to_stage(QueryProcessingStage::Complete)
, subquery_depth(0)
, is_first_select_inside_union_all(false), only_analyze(true)
, log(&Logger::get("InterpreterSelectQuery"))
{
init({});
}
@ -217,10 +224,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context
InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_,
const Names & required_column_names_,
const NamesAndTypesList & table_column_names_, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_)
: query_ptr(query_ptr_), query(typeid_cast<ASTSelectQuery &>(*query_ptr)),
context(context_), to_stage(to_stage_), subquery_depth(subquery_depth_), table_column_names(table_column_names_),
is_first_select_inside_union_all(query.isUnionAllHead()),
log(&Logger::get("InterpreterSelectQuery"))
: query_ptr(query_ptr_)
, query(typeid_cast<ASTSelectQuery &>(*query_ptr))
, context(context_)
, to_stage(to_stage_)
, subquery_depth(subquery_depth_)
, table_column_names(table_column_names_)
, is_first_select_inside_union_all(query.isUnionAllHead())
, log(&Logger::get("InterpreterSelectQuery"))
{
init(input_, required_column_names_);
}
@ -305,7 +316,7 @@ void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, St
DataTypes InterpreterSelectQuery::getReturnTypes()
{
DataTypes res;
NamesAndTypesList columns = query_analyzer->getSelectSampleBlock().getColumnsList();
const NamesAndTypesList & columns = query_analyzer->getSelectSampleBlock().getColumnsList();
for (auto & column : columns)
res.push_back(column.type);
@ -553,8 +564,7 @@ void InterpreterSelectQuery::executeSingleQuery()
* но есть ORDER или LIMIT,
* то выполним предварительную сортировку и LIMIT на удалёном сервере.
*/
if (!second_stage
&& !need_aggregate && !has_having)
if (!second_stage && !need_aggregate && !has_having)
{
if (has_order_by)
executeOrder();
@ -619,21 +629,28 @@ void InterpreterSelectQuery::executeSingleQuery()
/** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT,
* ограничивающий число записей в каждом до offset + limit.
*/
if (query.limit_length && hasMoreThanOneStream() && !query.distinct)
if (query.limit_length && hasMoreThanOneStream() && !query.distinct && !query.limit_by_expression_list)
executePreLimit();
if (need_second_distinct_pass)
union_within_single_query = true;
/// To execute LIMIT BY we should merge all streams together.
if (query.limit_by_expression_list && hasMoreThanOneStream())
union_within_single_query = true;
if (union_within_single_query || stream_with_non_joined_data)
executeUnion();
if (streams.size() == 1)
{
/// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния.
/** If there was more than one stream,
* then DISTINCT needs to be performed once again after merging all streams.
*/
if (need_second_distinct_pass)
executeDistinct(false, Names());
executeLimitBy();
executeLimit();
}
}
@ -770,7 +787,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
size_t limit_offset = 0;
getLimitLengthAndOffset(query, limit_length, limit_offset);
/** Оптимизация - если не указаны DISTINCT, WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size,
/** Оптимизация - если не указаны DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY но указан LIMIT, и limit + offset < max_block_size,
* то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено),
* а также установим количество потоков в 1.
*/
@ -780,6 +797,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns()
&& !query.group_expression_list
&& !query.having_expression
&& !query.order_expression_list
&& !query.limit_by_expression_list
&& query.limit_length
&& !query_analyzer->hasAggregation()
&& limit_length + limit_offset < settings.max_block_size)
@ -1024,9 +1042,9 @@ static SortDescription getSortDescription(ASTSelectQuery & query)
static size_t getLimitForSorting(ASTSelectQuery & query)
{
/// Если есть LIMIT и нет DISTINCT - можно делать частичную сортировку.
/// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY.
size_t limit = 0;
if (!query.distinct)
if (!query.distinct && !query.limit_by_expression_list)
{
size_t limit_length = 0;
size_t limit_offset = 0;
@ -1156,7 +1174,7 @@ void InterpreterSelectQuery::executePreLimit()
{
transformStreams([&](auto & stream)
{
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, 0);
stream = std::make_shared<LimitBlockInputStream>(stream, limit_length + limit_offset, false);
});
if (hasMoreThanOneStream())
@ -1165,6 +1183,28 @@ void InterpreterSelectQuery::executePreLimit()
}
void InterpreterSelectQuery::executeLimitBy()
{
if (!query.limit_by_value)
return;
Names columns;
size_t value = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*query.limit_by_value).value);
for (const auto & elem : query.limit_by_expression_list->children)
{
columns.emplace_back(elem->getAliasOrColumnName());
}
transformStreams([&](auto & stream)
{
stream = std::make_shared<LimitByBlockInputStream>(
stream, value, columns
);
});
}
void InterpreterSelectQuery::executeLimit()
{
size_t limit_length = 0;

View File

@ -202,6 +202,8 @@ ASTPtr ASTSelectQuery::cloneImpl(bool traverse_union_all) const
CLONE(group_expression_list)
CLONE(having_expression)
CLONE(order_expression_list)
CLONE(limit_by_value)
CLONE(limit_by_expression_list)
CLONE(limit_offset)
CLONE(limit_length)
CLONE(settings)
@ -287,6 +289,14 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
: typeid_cast<const ASTExpressionList &>(*order_expression_list).formatImplMultiline(s, state, frame);
}
if (limit_by_value)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT BY " << (s.hilite ? hilite_none : "");
s.one_line
? limit_by_expression_list->formatImpl(s, state, frame)
: typeid_cast<const ASTExpressionList &>(*limit_by_expression_list).formatImplMultiline(s, state, frame);
}
if (limit_length)
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT " << (s.hilite ? hilite_none : "");

View File

@ -10,6 +10,7 @@
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/ParserTablesInSelectQuery.h>
#include <iostream>
namespace DB
{
@ -151,7 +152,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ws.ignore(pos, end);
}
/// LIMIT length или LIMIT offset, length
/// LIMIT length | LIMIT offset, length | LIMIT count BY expr-list
if (s_limit.ignore(pos, end, max_parsed_pos, expected))
{
ws.ignore(pos, end);
@ -164,6 +165,44 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
ws.ignore(pos, end);
if (s_comma.ignore(pos, end, max_parsed_pos, expected))
{
select_query->limit_offset = select_query->limit_length;
if (!num.parse(pos, end, select_query->limit_length, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
}
else if (s_by.ignore(pos, end, max_parsed_pos, expected))
{
select_query->limit_by_value = select_query->limit_length;
select_query->limit_length = nullptr;
ws.ignore(pos, end);
if (!exp_list.parse(pos, end, select_query->limit_by_expression_list, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
}
}
/// LIMIT length | LIMIT offset, length
if (s_limit.ignore(pos, end, max_parsed_pos, expected))
{
if (!select_query->limit_by_value || select_query->limit_length)
return false;
ws.ignore(pos, end);
ParserString s_comma(",");
ParserNumber num;
if (!num.parse(pos, end, select_query->limit_length, max_parsed_pos, expected))
return false;
ws.ignore(pos, end);
if (s_comma.ignore(pos, end, max_parsed_pos, expected))
{
select_query->limit_offset = select_query->limit_length;
@ -232,6 +271,10 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p
select_query->children.push_back(select_query->having_expression);
if (select_query->order_expression_list)
select_query->children.push_back(select_query->order_expression_list);
if (select_query->limit_by_value)
select_query->children.push_back(select_query->limit_by_value);
if (select_query->limit_by_expression_list)
select_query->children.push_back(select_query->limit_by_expression_list);
if (select_query->limit_offset)
select_query->children.push_back(select_query->limit_offset);
if (select_query->limit_length)

View File

@ -0,0 +1,17 @@
1
1
3
3
4
4
5
7
7
1 2
3 3
4 2
5 1
7 4
1 John
3 Mary
4 Mary

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS test.limit_by;
CREATE TABLE test.limit_by (Num UInt32, Name String) ENGINE = Memory;
INSERT INTO test.limit_by (Num, Name) VALUES (1, 'John');
INSERT INTO test.limit_by (Num, Name) VALUES (1, 'John');
INSERT INTO test.limit_by (Num, Name) VALUES (3, 'Mary');
INSERT INTO test.limit_by (Num, Name) VALUES (3, 'Mary');
INSERT INTO test.limit_by (Num, Name) VALUES (3, 'Mary');
INSERT INTO test.limit_by (Num, Name) VALUES (4, 'Mary');
INSERT INTO test.limit_by (Num, Name) VALUES (4, 'Mary');
INSERT INTO test.limit_by (Num, Name) VALUES (5, 'Bill');
INSERT INTO test.limit_by (Num, Name) VALUES (7, 'Bill');
INSERT INTO test.limit_by (Num, Name) VALUES (7, 'Bill');
INSERT INTO test.limit_by (Num, Name) VALUES (7, 'Mary');
INSERT INTO test.limit_by (Num, Name) VALUES (7, 'John');
-- Two elemens in each group
SELECT Num FROM test.limit_by ORDER BY Num LIMIT 2 BY Num;
-- LIMIT BY doesn't affect result of GROUP BY
SELECT Num, count(*) FROM test.limit_by GROUP BY Num ORDER BY Num LIMIT 2 BY Num;
-- LIMIT BY can be combined with LIMIT
SELECT Num, Name FROM test.limit_by ORDER BY Num LIMIT 1 BY Num, Name LIMIT 3;
DROP TABLE IF EXISTS test.limit_by;