diff --git a/dbms/include/DB/DataStreams/LimitByBlockInputStream.h b/dbms/include/DB/DataStreams/LimitByBlockInputStream.h new file mode 100644 index 00000000000..a9224daecb1 --- /dev/null +++ b/dbms/include/DB/DataStreams/LimitByBlockInputStream.h @@ -0,0 +1,41 @@ +#pragma once + +#include + +#include +#include +#include + +namespace DB +{ + +/** Implements LIMIT BY clause witch can be used to obtain a "top N by subgroup". + * + * For example, if you have table T like this (Num: 1 1 3 3 3 4 4 5 7 7 7 7), + * the query SELECT Num FROM T LIMIT 2 BY Num + * will give you the following result: (Num: 1 1 3 3 4 4 5 7 7). + */ +class LimitByBlockInputStream : public IProfilingBlockInputStream +{ +public: + LimitByBlockInputStream(BlockInputStreamPtr input_, size_t group_size_, Names columns_); + + String getName() const override { return "LimitBy"; } + + String getID() const override; + +protected: + Block readImpl() override; + +private: + ConstColumnPlainPtrs getKeyColumns(Block & block) const; + +private: + using MapHashed = HashMap; + + const Names columns_names; + const size_t group_size; + MapHashed keys_counts; +}; + +} diff --git a/dbms/include/DB/Interpreters/ExpressionAnalyzer.h b/dbms/include/DB/Interpreters/ExpressionAnalyzer.h index 7bb679b57bf..7c40d88623e 100644 --- a/dbms/include/DB/Interpreters/ExpressionAnalyzer.h +++ b/dbms/include/DB/Interpreters/ExpressionAnalyzer.h @@ -230,6 +230,8 @@ private: /// Удалить из ORDER BY повторяющиеся элементы. void optimizeOrderBy(); + void optimizeLimitBy(); + /// remove Function_if AST if condition is constant void optimizeIfWithConstantCondition(); void optimizeIfWithConstantConditionImpl(ASTPtr & current_ast, Aliases & aliases) const; diff --git a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h index 73c015c46bc..8f4ae2dc31f 100644 --- a/dbms/include/DB/Interpreters/InterpreterSelectQuery.h +++ b/dbms/include/DB/Interpreters/InterpreterSelectQuery.h @@ -138,6 +138,7 @@ private: void executeMergeSorted(); void executePreLimit(); void executeUnion(); + void executeLimitBy(); void executeLimit(); void executeProjection(ExpressionActionsPtr expression); void executeDistinct(bool before_order, Names columns); diff --git a/dbms/include/DB/Parsers/ASTSelectQuery.h b/dbms/include/DB/Parsers/ASTSelectQuery.h index 175bc1eaa9a..05d664d65da 100644 --- a/dbms/include/DB/Parsers/ASTSelectQuery.h +++ b/dbms/include/DB/Parsers/ASTSelectQuery.h @@ -57,6 +57,8 @@ public: bool group_by_with_totals = false; ASTPtr having_expression; ASTPtr order_expression_list; + ASTPtr limit_by_value; + ASTPtr limit_by_expression_list; ASTPtr limit_offset; ASTPtr limit_length; ASTPtr settings; diff --git a/dbms/src/DataStreams/LimitByBlockInputStream.cpp b/dbms/src/DataStreams/LimitByBlockInputStream.cpp new file mode 100644 index 00000000000..ea39fb7226c --- /dev/null +++ b/dbms/src/DataStreams/LimitByBlockInputStream.cpp @@ -0,0 +1,83 @@ +#include + +namespace DB +{ + +LimitByBlockInputStream::LimitByBlockInputStream(BlockInputStreamPtr input_, size_t group_size_, Names columns_) + : columns_names(columns_) + , group_size(group_size_) +{ + children.push_back(input_); +} + +String LimitByBlockInputStream::getID() const +{ + std::stringstream res; + res << "LimitBy(" << this << ")"; + return res.str(); +} + +Block LimitByBlockInputStream::readImpl() +{ + /// Execute until end of stream or until + /// a block with some new records will be gotten. + while (true) + { + Block block = children[0]->read(); + if (!block) + return Block(); + + const ConstColumnPlainPtrs column_ptrs(getKeyColumns(block)); + const size_t rows = block.rows(); + IColumn::Filter filter(rows); + size_t inserted_count = 0; + + for (size_t i = 0; i < rows; ++i) + { + UInt128 key; + SipHash hash; + + for (auto & column : column_ptrs) + column->updateHashWithValue(i, hash); + + hash.get128(key.first, key.second); + + if (keys_counts[key]++ < group_size) + { + inserted_count++; + filter[i] = 1; + } + else + filter[i] = 0; + } + + /// Just go to the next block if there isn't any new records in the current one. + if (!inserted_count) + continue; + + size_t all_columns = block.columns(); + for (size_t i = 0; i < all_columns; ++i) + block.getByPosition(i).column = block.getByPosition(i).column->filter(filter, inserted_count); + + return block; + } +} + +ConstColumnPlainPtrs LimitByBlockInputStream::getKeyColumns(Block & block) const +{ + ConstColumnPlainPtrs column_ptrs; + column_ptrs.reserve(columns_names.size()); + + for (const auto & name : columns_names) + { + auto & column = block.getByName(name).column; + + /// Ignore all constant columns. + if (!column->isConst()) + column_ptrs.emplace_back(column.get()); + } + + return column_ptrs; +} + +} diff --git a/dbms/src/Interpreters/ExpressionAnalyzer.cpp b/dbms/src/Interpreters/ExpressionAnalyzer.cpp index d7eee0b2162..9440143bdc4 100644 --- a/dbms/src/Interpreters/ExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/ExpressionAnalyzer.cpp @@ -176,6 +176,9 @@ void ExpressionAnalyzer::init() /// Удалить из ORDER BY повторяющиеся элементы. optimizeOrderBy(); + // Remove duplicated elements from LIMIT BY clause. + optimizeLimitBy(); + /// array_join_alias_to_name, array_join_result_to_source. getArrayJoinedColumns(); @@ -1169,6 +1172,31 @@ void ExpressionAnalyzer::optimizeOrderBy() } +void ExpressionAnalyzer::optimizeLimitBy() +{ + if (!(select_query && select_query->limit_by_expression_list)) + return; + + std::set elems_set; + + ASTs & elems = select_query->limit_by_expression_list->children; + ASTs unique_elems; + unique_elems.reserve(elems.size()); + + for (const auto & elem : elems) + { + if (const auto id = typeid_cast(elem.get())) + { + if (elems_set.emplace(id->getColumnName()).second) + unique_elems.emplace_back(elem); + } + } + + if (unique_elems.size() < elems.size()) + elems = unique_elems; +} + + void ExpressionAnalyzer::makeSetsForIndex() { if (storage && ast && storage->supportsIndexForIn()) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 53e876ce8d3..4273e20f322 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -190,19 +191,25 @@ void InterpreterSelectQuery::initQueryAnalyzer() InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_) - : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), - context(context_), to_stage(to_stage_), subquery_depth(subquery_depth_), - is_first_select_inside_union_all(query.isUnionAllHead()), - log(&Logger::get("InterpreterSelectQuery")) + : query_ptr(query_ptr_) + , query(typeid_cast(*query_ptr)) + , context(context_) + , to_stage(to_stage_) + , subquery_depth(subquery_depth_) + , is_first_select_inside_union_all(query.isUnionAllHead()) + , log(&Logger::get("InterpreterSelectQuery")) { init(input_); } InterpreterSelectQuery::InterpreterSelectQuery(OnlyAnalyzeTag, ASTPtr query_ptr_, const Context & context_) - : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), - context(context_), to_stage(QueryProcessingStage::Complete), subquery_depth(0), - is_first_select_inside_union_all(false), only_analyze(true), - log(&Logger::get("InterpreterSelectQuery")) + : query_ptr(query_ptr_) + , query(typeid_cast(*query_ptr)) + , context(context_) + , to_stage(QueryProcessingStage::Complete) + , subquery_depth(0) + , is_first_select_inside_union_all(false), only_analyze(true) + , log(&Logger::get("InterpreterSelectQuery")) { init({}); } @@ -217,10 +224,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context InterpreterSelectQuery::InterpreterSelectQuery(ASTPtr query_ptr_, const Context & context_, const Names & required_column_names_, const NamesAndTypesList & table_column_names_, QueryProcessingStage::Enum to_stage_, size_t subquery_depth_, BlockInputStreamPtr input_) - : query_ptr(query_ptr_), query(typeid_cast(*query_ptr)), - context(context_), to_stage(to_stage_), subquery_depth(subquery_depth_), table_column_names(table_column_names_), - is_first_select_inside_union_all(query.isUnionAllHead()), - log(&Logger::get("InterpreterSelectQuery")) + : query_ptr(query_ptr_) + , query(typeid_cast(*query_ptr)) + , context(context_) + , to_stage(to_stage_) + , subquery_depth(subquery_depth_) + , table_column_names(table_column_names_) + , is_first_select_inside_union_all(query.isUnionAllHead()) + , log(&Logger::get("InterpreterSelectQuery")) { init(input_, required_column_names_); } @@ -305,7 +316,7 @@ void InterpreterSelectQuery::getDatabaseAndTableNames(String & database_name, St DataTypes InterpreterSelectQuery::getReturnTypes() { DataTypes res; - NamesAndTypesList columns = query_analyzer->getSelectSampleBlock().getColumnsList(); + const NamesAndTypesList & columns = query_analyzer->getSelectSampleBlock().getColumnsList(); for (auto & column : columns) res.push_back(column.type); @@ -553,8 +564,7 @@ void InterpreterSelectQuery::executeSingleQuery() * но есть ORDER или LIMIT, * то выполним предварительную сортировку и LIMIT на удалёном сервере. */ - if (!second_stage - && !need_aggregate && !has_having) + if (!second_stage && !need_aggregate && !has_having) { if (has_order_by) executeOrder(); @@ -619,21 +629,28 @@ void InterpreterSelectQuery::executeSingleQuery() /** Оптимизация - если источников несколько и есть LIMIT, то сначала применим предварительный LIMIT, * ограничивающий число записей в каждом до offset + limit. */ - if (query.limit_length && hasMoreThanOneStream() && !query.distinct) + if (query.limit_length && hasMoreThanOneStream() && !query.distinct && !query.limit_by_expression_list) executePreLimit(); if (need_second_distinct_pass) union_within_single_query = true; + /// To execute LIMIT BY we should merge all streams together. + if (query.limit_by_expression_list && hasMoreThanOneStream()) + union_within_single_query = true; + if (union_within_single_query || stream_with_non_joined_data) executeUnion(); if (streams.size() == 1) { - /// Если было более одного источника - то нужно выполнить DISTINCT ещё раз после их слияния. + /** If there was more than one stream, + * then DISTINCT needs to be performed once again after merging all streams. + */ if (need_second_distinct_pass) executeDistinct(false, Names()); + executeLimitBy(); executeLimit(); } } @@ -770,7 +787,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() size_t limit_offset = 0; getLimitLengthAndOffset(query, limit_length, limit_offset); - /** Оптимизация - если не указаны DISTINCT, WHERE, GROUP, HAVING, ORDER, но указан LIMIT, и limit + offset < max_block_size, + /** Оптимизация - если не указаны DISTINCT, WHERE, GROUP, HAVING, ORDER, LIMIT BY но указан LIMIT, и limit + offset < max_block_size, * то в качестве размера блока будем использовать limit + offset (чтобы не читать из таблицы больше, чем запрошено), * а также установим количество потоков в 1. */ @@ -780,6 +797,7 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns() && !query.group_expression_list && !query.having_expression && !query.order_expression_list + && !query.limit_by_expression_list && query.limit_length && !query_analyzer->hasAggregation() && limit_length + limit_offset < settings.max_block_size) @@ -1024,9 +1042,9 @@ static SortDescription getSortDescription(ASTSelectQuery & query) static size_t getLimitForSorting(ASTSelectQuery & query) { - /// Если есть LIMIT и нет DISTINCT - можно делать частичную сортировку. + /// Partial sort can be done if there is LIMIT but no DISTINCT or LIMIT BY. size_t limit = 0; - if (!query.distinct) + if (!query.distinct && !query.limit_by_expression_list) { size_t limit_length = 0; size_t limit_offset = 0; @@ -1156,7 +1174,7 @@ void InterpreterSelectQuery::executePreLimit() { transformStreams([&](auto & stream) { - stream = std::make_shared(stream, limit_length + limit_offset, 0); + stream = std::make_shared(stream, limit_length + limit_offset, false); }); if (hasMoreThanOneStream()) @@ -1165,6 +1183,28 @@ void InterpreterSelectQuery::executePreLimit() } +void InterpreterSelectQuery::executeLimitBy() +{ + if (!query.limit_by_value) + return; + + Names columns; + size_t value = safeGet(typeid_cast(*query.limit_by_value).value); + + for (const auto & elem : query.limit_by_expression_list->children) + { + columns.emplace_back(elem->getAliasOrColumnName()); + } + + transformStreams([&](auto & stream) + { + stream = std::make_shared( + stream, value, columns + ); + }); +} + + void InterpreterSelectQuery::executeLimit() { size_t limit_length = 0; diff --git a/dbms/src/Parsers/ASTSelectQuery.cpp b/dbms/src/Parsers/ASTSelectQuery.cpp index 203d306fcb4..b8d1684e92c 100644 --- a/dbms/src/Parsers/ASTSelectQuery.cpp +++ b/dbms/src/Parsers/ASTSelectQuery.cpp @@ -202,6 +202,8 @@ ASTPtr ASTSelectQuery::cloneImpl(bool traverse_union_all) const CLONE(group_expression_list) CLONE(having_expression) CLONE(order_expression_list) + CLONE(limit_by_value) + CLONE(limit_by_expression_list) CLONE(limit_offset) CLONE(limit_length) CLONE(settings) @@ -287,6 +289,14 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F : typeid_cast(*order_expression_list).formatImplMultiline(s, state, frame); } + if (limit_by_value) + { + s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT BY " << (s.hilite ? hilite_none : ""); + s.one_line + ? limit_by_expression_list->formatImpl(s, state, frame) + : typeid_cast(*limit_by_expression_list).formatImplMultiline(s, state, frame); + } + if (limit_length) { s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "LIMIT " << (s.hilite ? hilite_none : ""); diff --git a/dbms/src/Parsers/ParserSelectQuery.cpp b/dbms/src/Parsers/ParserSelectQuery.cpp index a912d09699f..c2d085b2a95 100644 --- a/dbms/src/Parsers/ParserSelectQuery.cpp +++ b/dbms/src/Parsers/ParserSelectQuery.cpp @@ -10,6 +10,7 @@ #include #include +#include namespace DB { @@ -151,7 +152,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p ws.ignore(pos, end); } - /// LIMIT length или LIMIT offset, length + /// LIMIT length | LIMIT offset, length | LIMIT count BY expr-list if (s_limit.ignore(pos, end, max_parsed_pos, expected)) { ws.ignore(pos, end); @@ -164,6 +165,44 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p ws.ignore(pos, end); + if (s_comma.ignore(pos, end, max_parsed_pos, expected)) + { + select_query->limit_offset = select_query->limit_length; + if (!num.parse(pos, end, select_query->limit_length, max_parsed_pos, expected)) + return false; + + ws.ignore(pos, end); + } + else if (s_by.ignore(pos, end, max_parsed_pos, expected)) + { + select_query->limit_by_value = select_query->limit_length; + select_query->limit_length = nullptr; + + ws.ignore(pos, end); + + if (!exp_list.parse(pos, end, select_query->limit_by_expression_list, max_parsed_pos, expected)) + return false; + + ws.ignore(pos, end); + } + } + + /// LIMIT length | LIMIT offset, length + if (s_limit.ignore(pos, end, max_parsed_pos, expected)) + { + if (!select_query->limit_by_value || select_query->limit_length) + return false; + + ws.ignore(pos, end); + + ParserString s_comma(","); + ParserNumber num; + + if (!num.parse(pos, end, select_query->limit_length, max_parsed_pos, expected)) + return false; + + ws.ignore(pos, end); + if (s_comma.ignore(pos, end, max_parsed_pos, expected)) { select_query->limit_offset = select_query->limit_length; @@ -232,6 +271,10 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, Pos & max_p select_query->children.push_back(select_query->having_expression); if (select_query->order_expression_list) select_query->children.push_back(select_query->order_expression_list); + if (select_query->limit_by_value) + select_query->children.push_back(select_query->limit_by_value); + if (select_query->limit_by_expression_list) + select_query->children.push_back(select_query->limit_by_expression_list); if (select_query->limit_offset) select_query->children.push_back(select_query->limit_offset); if (select_query->limit_length) diff --git a/dbms/tests/queries/0_stateless/00408_limit_by.reference b/dbms/tests/queries/0_stateless/00408_limit_by.reference new file mode 100644 index 00000000000..e037feaf637 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00408_limit_by.reference @@ -0,0 +1,17 @@ +1 +1 +3 +3 +4 +4 +5 +7 +7 +1 2 +3 3 +4 2 +5 1 +7 4 +1 John +3 Mary +4 Mary diff --git a/dbms/tests/queries/0_stateless/00408_limit_by.sql b/dbms/tests/queries/0_stateless/00408_limit_by.sql new file mode 100644 index 00000000000..8501acebbdb --- /dev/null +++ b/dbms/tests/queries/0_stateless/00408_limit_by.sql @@ -0,0 +1,26 @@ +DROP TABLE IF EXISTS test.limit_by; +CREATE TABLE test.limit_by (Num UInt32, Name String) ENGINE = Memory; + +INSERT INTO test.limit_by (Num, Name) VALUES (1, 'John'); +INSERT INTO test.limit_by (Num, Name) VALUES (1, 'John'); +INSERT INTO test.limit_by (Num, Name) VALUES (3, 'Mary'); +INSERT INTO test.limit_by (Num, Name) VALUES (3, 'Mary'); +INSERT INTO test.limit_by (Num, Name) VALUES (3, 'Mary'); +INSERT INTO test.limit_by (Num, Name) VALUES (4, 'Mary'); +INSERT INTO test.limit_by (Num, Name) VALUES (4, 'Mary'); +INSERT INTO test.limit_by (Num, Name) VALUES (5, 'Bill'); +INSERT INTO test.limit_by (Num, Name) VALUES (7, 'Bill'); +INSERT INTO test.limit_by (Num, Name) VALUES (7, 'Bill'); +INSERT INTO test.limit_by (Num, Name) VALUES (7, 'Mary'); +INSERT INTO test.limit_by (Num, Name) VALUES (7, 'John'); + +-- Two elemens in each group +SELECT Num FROM test.limit_by ORDER BY Num LIMIT 2 BY Num; + +-- LIMIT BY doesn't affect result of GROUP BY +SELECT Num, count(*) FROM test.limit_by GROUP BY Num ORDER BY Num LIMIT 2 BY Num; + +-- LIMIT BY can be combined with LIMIT +SELECT Num, Name FROM test.limit_by ORDER BY Num LIMIT 1 BY Num, Name LIMIT 3; + +DROP TABLE IF EXISTS test.limit_by;