From 61974e0047e9826ab84d5d4b07f93ec065b285dc Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Mon, 27 Apr 2020 17:47:59 +0300 Subject: [PATCH 01/14] TTL Expressions WHERE and GROUP BY draft --- src/DataStreams/TTLBlockInputStream.cpp | 225 +++++++++++++++++++++-- src/DataStreams/TTLBlockInputStream.h | 8 + src/Interpreters/ExpressionAnalyzer.cpp | 9 +- src/Interpreters/SyntaxAnalyzer.cpp | 17 +- src/Interpreters/SyntaxAnalyzer.h | 2 +- src/Parsers/ASTTTLElement.cpp | 67 ++++++- src/Parsers/ASTTTLElement.h | 38 +++- src/Parsers/ExpressionElementParsers.cpp | 84 ++++++++- src/Parsers/ExpressionListParsers.cpp | 9 + src/Parsers/ExpressionListParsers.h | 11 +- src/Storages/MergeTree/MergeTreeData.cpp | 44 ++++- src/Storages/MergeTree/MergeTreeData.h | 9 + 12 files changed, 479 insertions(+), 44 deletions(-) diff --git a/src/DataStreams/TTLBlockInputStream.cpp b/src/DataStreams/TTLBlockInputStream.cpp index d2d5d6a92f9..d816cba4b2f 100644 --- a/src/DataStreams/TTLBlockInputStream.cpp +++ b/src/DataStreams/TTLBlockInputStream.cpp @@ -67,6 +67,30 @@ TTLBlockInputStream::TTLBlockInputStream( default_expr_list, storage.getColumns().getAllPhysical()); defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true); } + + if (storage.hasRowsTTL() && !storage.rows_ttl_entry.group_by_keys.empty()) + { + ColumnNumbers keys; + for (const auto & key : storage.rows_ttl_entry.group_by_keys) + keys.push_back(header.getPositionByName(key)); + agg_key_columns.resize(storage.rows_ttl_entry.group_by_keys.size()); + + AggregateDescriptions aggregates = storage.rows_ttl_entry.aggregate_descriptions; + for (auto & descr : aggregates) + if (descr.arguments.empty()) + for (const auto & name : descr.argument_names) + descr.arguments.push_back(header.getPositionByName(name)); + agg_aggregate_columns.resize(storage.rows_ttl_entry.aggregate_descriptions.size()); + + const Settings & settings = storage.global_context.getSettingsRef(); + + Aggregator::Params params(header, keys, aggregates, + false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, + SettingUInt64(0), SettingUInt64(0), + settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, + storage.global_context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); + aggregator = std::make_unique(params); + } } bool TTLBlockInputStream::isTTLExpired(time_t ttl) const @@ -77,7 +101,8 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl) const Block TTLBlockInputStream::readImpl() { /// Skip all data if table ttl is expired for part - if (storage.hasRowsTTL() && isTTLExpired(old_ttl_infos.table_ttl.max)) + if (storage.hasRowsTTL() && !storage.rows_ttl_entry.where_expression && + storage.rows_ttl_entry.group_by_keys.empty() && isTTLExpired(old_ttl_infos.table_ttl.max)) { rows_removed = data_part->rows_count; return {}; @@ -85,7 +110,43 @@ Block TTLBlockInputStream::readImpl() Block block = children.at(0)->read(); if (!block) + { + if (aggregator && !agg_result.empty()) + { + MutableColumns result_columns; + const auto & column_names = header.getNames(); + for (const auto & column_name : column_names) + { + const IColumn * values_column = header.getByName(column_name).column.get(); + MutableColumnPtr result_column = values_column->cloneEmpty(); + result_columns.emplace_back(std::move(result_column)); + } + + auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1); + for (auto & agg_block : aggregated_res) + { + for (const auto & [name, actions] : storage.rows_ttl_entry.group_by_aggregations) + actions->execute(agg_block); + for (const auto & name : storage.rows_ttl_entry.group_by_keys) + { + const IColumn * values_column = agg_block.getByName(name).column.get(); + auto & result_column = result_columns[header.getPositionByName(name)]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + for (const auto & [name, res_column] : storage.rows_ttl_entry.group_by_aggregations_res_column) + { + const IColumn * values_column = agg_block.getByName(res_column).column.get(); + auto & result_column = result_columns[header.getPositionByName(name)]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + } + + block = header.cloneWithColumns(std::move(result_columns)); + agg_result.invalidate(); + } + return block; + } if (storage.hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min))) removeRowsWithExpiredTableTTL(block); @@ -114,35 +175,171 @@ void TTLBlockInputStream::readSuffixImpl() void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) { storage.rows_ttl_entry.expression->execute(block); + if (storage.rows_ttl_entry.where_expression) + storage.rows_ttl_entry.where_expression->execute(block); const IColumn * ttl_column = block.getByName(storage.rows_ttl_entry.result_column).column.get(); + const IColumn * where_filter_column = storage.rows_ttl_entry.where_expression ? + block.getByName(storage.rows_ttl_entry.where_filter_column).column.get() : nullptr; + const auto & column_names = header.getNames(); - MutableColumns result_columns; - result_columns.reserve(column_names.size()); - for (auto it = column_names.begin(); it != column_names.end(); ++it) + if (!aggregator) { - const IColumn * values_column = block.getByName(*it).column.get(); - MutableColumnPtr result_column = values_column->cloneEmpty(); - result_column->reserve(block.rows()); + MutableColumns result_columns; + result_columns.reserve(column_names.size()); + for (auto it = column_names.begin(); it != column_names.end(); ++it) + { + const IColumn * values_column = block.getByName(*it).column.get(); + MutableColumnPtr result_column = values_column->cloneEmpty(); + result_column->reserve(block.rows()); + for (size_t i = 0; i < block.rows(); ++i) + { + UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); + bool where_filter_passed = !where_filter_column || where_filter_column->getBool(i); + if (!isTTLExpired(cur_ttl) || !where_filter_passed) + { + new_ttl_infos.table_ttl.update(cur_ttl); + result_column->insertFrom(*values_column, i); + } + else if (it == column_names.begin()) + ++rows_removed; + } + result_columns.emplace_back(std::move(result_column)); + } + block = header.cloneWithColumns(std::move(result_columns)); + } + else + { + MutableColumns result_columns; + MutableColumns aggregate_columns; + + for (const auto & column_name : column_names) + { + const IColumn * values_column = block.getByName(column_name).column.get(); + MutableColumnPtr result_column = values_column->cloneEmpty(); + result_column->reserve(block.rows()); + result_columns.emplace_back(std::move(result_column)); + + MutableColumnPtr aggregate_column = values_column->cloneEmpty(); + aggregate_column->reserve(block.rows()); + aggregate_columns.emplace_back(std::move(aggregate_column)); + } + + size_t rows_aggregated = 0; + size_t current_key_start = 0; + size_t rows_with_current_key = 0; for (size_t i = 0; i < block.rows(); ++i) { UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); - if (!isTTLExpired(cur_ttl)) + bool where_filter_passed = !where_filter_column || where_filter_column->getBool(i); + bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed; + + bool same_as_current = true; + if (current_key_value.empty()) + { + same_as_current = false; + current_key_value.resize(storage.rows_ttl_entry.group_by_keys.size()); + } + for (size_t j = 0; j < storage.rows_ttl_entry.group_by_keys.size(); ++j) + { + const String & key_column = storage.rows_ttl_entry.group_by_keys[j]; + const IColumn * values_column = block.getByName(key_column).column.get(); + if (!same_as_current) + values_column->get(i, current_key_value[j]); + else + { + Field value; + values_column->get(i, value); + if (value != current_key_value[j]) + { + current_key_value[j] = value; + same_as_current = false; + } + } + } + if (!same_as_current) + { + if (rows_with_current_key) + { + Columns aggregate_chunk; + aggregate_chunk.reserve(aggregate_columns.size()); + for (const auto & name : column_names) + { + const auto & column = aggregate_columns[header.getPositionByName(name)]; + ColumnPtr chunk_column = column->cut(current_key_start, rows_with_current_key); + aggregate_chunk.emplace_back(std::move(chunk_column)); + } + aggregator->executeOnBlock(aggregate_chunk, rows_with_current_key, agg_result, agg_key_columns, + agg_aggregate_columns, agg_no_more_keys); + } + if (!agg_result.empty()) + { + auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1); + for (auto & agg_block : aggregated_res) + { + for (const auto & [name, actions] : storage.rows_ttl_entry.group_by_aggregations) + actions->execute(agg_block); + for (const auto & name : storage.rows_ttl_entry.group_by_keys) + { + const IColumn * values_column = agg_block.getByName(name).column.get(); + auto & result_column = result_columns[header.getPositionByName(name)]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + for (const auto & [name, res_column] : storage.rows_ttl_entry.group_by_aggregations_res_column) + { + const IColumn * values_column = agg_block.getByName(res_column).column.get(); + auto & result_column = result_columns[header.getPositionByName(name)]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + } + } + agg_result.invalidate(); + current_key_start = rows_aggregated; + rows_with_current_key = 0; + } + if (ttl_expired) + { + ++rows_with_current_key; + ++rows_aggregated; + for (const auto & name : column_names) + { + const IColumn * values_column = block.getByName(name).column.get(); + auto & column = aggregate_columns[header.getPositionByName(name)]; + column->insertFrom(*values_column, i); + } + } + else { new_ttl_infos.table_ttl.update(cur_ttl); - result_column->insertFrom(*values_column, i); + for (const auto & name : column_names) + { + const IColumn * values_column = block.getByName(name).column.get(); + auto & column = result_columns[header.getPositionByName(name)]; + column->insertFrom(*values_column, i); + } } - else if (it == column_names.begin()) - ++rows_removed; } - result_columns.emplace_back(std::move(result_column)); - } - block = header.cloneWithColumns(std::move(result_columns)); + if (rows_with_current_key) + { + Columns aggregate_chunk; + aggregate_chunk.reserve(aggregate_columns.size()); + for (const auto & name : column_names) + { + const auto & column = aggregate_columns[header.getPositionByName(name)]; + ColumnPtr chunk_column = column->cut(current_key_start, rows_with_current_key); + aggregate_chunk.emplace_back(std::move(chunk_column)); + } + aggregator->executeOnBlock(aggregate_chunk, rows_with_current_key, agg_result, agg_key_columns, + agg_aggregate_columns, agg_no_more_keys); + } + + block = header.cloneWithColumns(std::move(result_columns)); + } } void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block) diff --git a/src/DataStreams/TTLBlockInputStream.h b/src/DataStreams/TTLBlockInputStream.h index 3896e5232f8..821f8dd9284 100644 --- a/src/DataStreams/TTLBlockInputStream.h +++ b/src/DataStreams/TTLBlockInputStream.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -39,6 +40,13 @@ private: time_t current_time; bool force; + std::unique_ptr aggregator; + std::vector current_key_value; + AggregatedDataVariants agg_result; + ColumnRawPtrs agg_key_columns; + Aggregator::AggregateColumns agg_aggregate_columns; + bool agg_no_more_keys; + IMergeTreeDataPart::TTLInfos old_ttl_infos; IMergeTreeDataPart::TTLInfos new_ttl_infos; NameSet empty_columns; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 3341855b8c6..c432b920a9f 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -192,10 +192,10 @@ void ExpressionAnalyzer::analyzeAggregation() if (has_aggregation) { - getSelectQuery(); /// assertSelect() + // getSelectQuery(); /// assertSelect() /// Find out aggregation keys. - if (select_query->groupBy()) + if (select_query && select_query->groupBy()) { NameSet unique_keys; ASTs & group_asts = select_query->groupBy()->children; @@ -926,7 +926,10 @@ void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result) { - ExpressionActionsPtr actions = std::make_shared(sourceColumns(), context); + NamesAndTypesList columns(sourceColumns()); + for (const auto & col : aggregated_columns) + columns.push_back(col); + ExpressionActionsPtr actions = std::make_shared(columns, context); NamesWithAliases result_columns; Names result_names; diff --git a/src/Interpreters/SyntaxAnalyzer.cpp b/src/Interpreters/SyntaxAnalyzer.cpp index b3d566dbdc8..ec7f2154dad 100644 --- a/src/Interpreters/SyntaxAnalyzer.cpp +++ b/src/Interpreters/SyntaxAnalyzer.cpp @@ -839,7 +839,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect( return std::make_shared(result); } -SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage) const +SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage, bool allow_aggregates) const { if (query->as()) throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR); @@ -855,7 +855,20 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTy optimizeIf(query, result.aliases, settings.optimize_if_chain_to_miltiif); - assertNoAggregates(query, "in wrong place"); + if (allow_aggregates) + { + GetAggregatesVisitor::Data data; + GetAggregatesVisitor(data).visit(query); + + /// There can not be other aggregate functions within the aggregate functions. + for (const ASTFunction * node : data.aggregates) + for (auto & arg : node->arguments->children) + assertNoAggregates(arg, "inside another aggregate function"); + result.aggregates = data.aggregates; + } + else + assertNoAggregates(query, "in wrong place"); + result.collectUsedColumns(query); return std::make_shared(result); } diff --git a/src/Interpreters/SyntaxAnalyzer.h b/src/Interpreters/SyntaxAnalyzer.h index dda0add38db..abacb25ac4d 100644 --- a/src/Interpreters/SyntaxAnalyzer.h +++ b/src/Interpreters/SyntaxAnalyzer.h @@ -86,7 +86,7 @@ public: {} /// Analyze and rewrite not select query - SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}) const; + SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}, bool allow_aggregations = false) const; /// Analyze and rewrite select query SyntaxAnalyzerResultPtr analyzeSelect( diff --git a/src/Parsers/ASTTTLElement.cpp b/src/Parsers/ASTTTLElement.cpp index 7e03a73e36d..fb35266465b 100644 --- a/src/Parsers/ASTTTLElement.cpp +++ b/src/Parsers/ASTTTLElement.cpp @@ -7,21 +7,80 @@ namespace DB { +ASTPtr ASTTTLElement::clone() const +{ + auto clone = std::make_shared(*this); + clone->children.clear(); + clone->positions.clear(); + + for (auto expr : {Expression::TTL, Expression::WHERE}) + clone->setExpression(expr, getExpression(expr, true)); + + for (auto & [name, expr] : clone->group_by_aggregations) + expr = expr->clone(); + + return clone; +} + void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const { - children.front()->formatImpl(settings, state, frame); - if (destination_type == PartDestinationType::DISK) + ttl()->formatImpl(settings, state, frame); + if (mode == Mode::MOVE && destination_type == PartDestinationType::DISK) { settings.ostr << " TO DISK " << quoteString(destination_name); } - else if (destination_type == PartDestinationType::VOLUME) + else if (mode == Mode::MOVE && destination_type == PartDestinationType::VOLUME) { settings.ostr << " TO VOLUME " << quoteString(destination_name); } - else if (destination_type == PartDestinationType::DELETE) + else if (mode == Mode::GROUP_BY) + { + settings.ostr << " GROUP BY "; + for (size_t i = 0; i < group_by_key_columns.size(); ++i) + { + settings.ostr << group_by_key_columns[i]; + if (i + 1 != group_by_key_columns.size()) + settings.ostr << ", "; + } + settings.ostr << " SET "; + for (size_t i = 0; i < group_by_aggregations.size(); ++i) + { + settings.ostr << group_by_aggregations[i].first << " = "; + group_by_aggregations[i].second->formatImpl(settings, state, frame); + if (i + 1 != group_by_aggregations.size()) + settings.ostr << ", "; + } + } + else if (mode == Mode::DELETE) { /// It would be better to output "DELETE" here but that will break compatibility with earlier versions. } + + if (where()) + { + settings.ostr << " WHERE "; + where()->formatImpl(settings, state, frame); + } +} + +void ASTTTLElement::setExpression(Expression expr, ASTPtr && ast) +{ + auto it = positions.find(expr); + if (it == positions.end()) + { + positions[expr] = children.size(); + children.emplace_back(ast); + } + else + children[it->second] = ast; +} + +ASTPtr ASTTTLElement::getExpression(Expression expr, bool clone) const +{ + auto it = positions.find(expr); + if (it != positions.end()) + return clone ? children[it->second]->clone() : children[it->second]; + return {}; } } diff --git a/src/Parsers/ASTTTLElement.h b/src/Parsers/ASTTTLElement.h index 02f70094e04..1a2d7d3c723 100644 --- a/src/Parsers/ASTTTLElement.h +++ b/src/Parsers/ASTTTLElement.h @@ -6,31 +6,53 @@ namespace DB { + /** Element of TTL expression. */ class ASTTTLElement : public IAST { public: + enum class Expression : uint8_t + { + TTL, + WHERE + }; + + enum class Mode : uint8_t + { + DELETE, + MOVE, + GROUP_BY + }; + + Mode mode; PartDestinationType destination_type; String destination_name; + std::vector group_by_key_columns; + std::vector> group_by_aggregations; - ASTTTLElement(PartDestinationType destination_type_, const String & destination_name_) - : destination_type(destination_type_) + ASTTTLElement(Mode mode_, PartDestinationType destination_type_, const String & destination_name_) + : mode(mode_) + , destination_type(destination_type_) , destination_name(destination_name_) { } String getID(char) const override { return "TTLElement"; } - ASTPtr clone() const override - { - auto clone = std::make_shared(*this); - clone->cloneChildren(); - return clone; - } + ASTPtr clone() const override; + + const ASTPtr ttl() const { return getExpression(Expression::TTL); } + const ASTPtr where() const { return getExpression(Expression::WHERE); } + + void setExpression(Expression expr, ASTPtr && ast); + ASTPtr getExpression(Expression expr, bool clone = false) const; protected: void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; + +private: + std::unordered_map positions; }; } diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index 70a8b282a72..2734aa67193 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -1455,23 +1455,50 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_to_disk("TO DISK"); ParserKeyword s_to_volume("TO VOLUME"); ParserKeyword s_delete("DELETE"); + ParserKeyword s_where("WHERE"); + ParserKeyword s_group_by("GROUP BY"); + ParserKeyword s_set("SET"); + ParserToken s_comma(TokenType::Comma); + ParserToken s_eq(TokenType::Equals); + ParserIdentifier parser_identifier; ParserStringLiteral parser_string_literal; ParserExpression parser_exp; + ParserIdentifierList parser_identifier_list; - ASTPtr expr_elem; - if (!parser_exp.parse(pos, expr_elem, expected)) + + ASTPtr ttl_expr; + if (!parser_exp.parse(pos, ttl_expr, expected)) return false; + ASTPtr where_expr; + + std::vector group_by_key_columns; + std::vector> group_by_aggregations; + + ASTTTLElement::Mode mode; PartDestinationType destination_type = PartDestinationType::DELETE; String destination_name; if (s_to_disk.ignore(pos)) + { + mode = ASTTTLElement::Mode::MOVE; destination_type = PartDestinationType::DISK; + } else if (s_to_volume.ignore(pos)) + { + mode = ASTTTLElement::Mode::MOVE; destination_type = PartDestinationType::VOLUME; + } + else if (s_group_by.ignore(pos)) + { + mode = ASTTTLElement::Mode::GROUP_BY; + } else + { s_delete.ignore(pos); + mode = ASTTTLElement::Mode::DELETE; + } - if (destination_type == PartDestinationType::DISK || destination_type == PartDestinationType::VOLUME) + if (mode == ASTTTLElement::Mode::MOVE) { ASTPtr ast_space_name; if (!parser_string_literal.parse(pos, ast_space_name, expected)) @@ -1479,10 +1506,57 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) destination_name = ast_space_name->as().value.get(); } + else if (mode == ASTTTLElement::Mode::GROUP_BY) + { + ASTPtr ast_group_by_key_columns; + if (!parser_identifier_list.parse(pos, ast_group_by_key_columns, expected)) + return false; + for (const auto identifier : ast_group_by_key_columns->children) + { + String identifier_str; + if (!tryGetIdentifierNameInto(identifier, identifier_str)) + return false; + group_by_key_columns.emplace_back(std::move(identifier_str)); + } - node = std::make_shared(destination_type, destination_name); - node->children.push_back(expr_elem); + if (!s_set.ignore(pos)) + return false; + while (true) + { + if (!group_by_aggregations.empty() && !s_comma.ignore(pos)) + break; + + ASTPtr name; + ASTPtr value; + if (!parser_identifier.parse(pos, name, expected)) + return false; + if (!s_eq.ignore(pos)) + return false; + if (!parser_exp.parse(pos, value, expected)) + return false; + String name_str; + if (!tryGetIdentifierNameInto(name, name_str)) + return false; + group_by_aggregations.emplace_back(name_str, std::move(value)); + } + } + + if ((mode == ASTTTLElement::Mode::MOVE || mode == ASTTTLElement::Mode::DELETE) && s_where.ignore(pos)) + { + if (!parser_exp.parse(pos, where_expr, expected)) + return false; + } + + auto ttl_element = std::make_shared(mode, destination_type, destination_name); + ttl_element->setExpression(ASTTTLElement::Expression::TTL, std::move(ttl_expr)); + if (where_expr) + ttl_element->setExpression(ASTTTLElement::Expression::WHERE, std::move(where_expr)); + + ttl_element->group_by_key_columns = std::move(group_by_key_columns); + ttl_element->group_by_aggregations = std::move(group_by_aggregations); + + node = ttl_element; return true; } diff --git a/src/Parsers/ExpressionListParsers.cpp b/src/Parsers/ExpressionListParsers.cpp index a967ae19691..062d1dd4483 100644 --- a/src/Parsers/ExpressionListParsers.cpp +++ b/src/Parsers/ExpressionListParsers.cpp @@ -742,4 +742,13 @@ bool ParserKeyValuePairsList::parseImpl(Pos & pos, ASTPtr & node, Expected & exp return parser.parse(pos, node, expected); } + +bool ParserIdentifierList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + return ParserList( + std::make_unique(), + std::make_unique(TokenType::Comma)) + .parse(pos, node, expected); } + +} \ No newline at end of file diff --git a/src/Parsers/ExpressionListParsers.h b/src/Parsers/ExpressionListParsers.h index 0cef29b6d67..ba939d8d160 100644 --- a/src/Parsers/ExpressionListParsers.h +++ b/src/Parsers/ExpressionListParsers.h @@ -421,4 +421,13 @@ protected: bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; }; -} + +/** A comma-separated list of identifiers, probably empty. */ +class ParserIdentifierList : public IParserBase +{ +protected: + const char * getName() const override { return "list of identifiers"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; + +} \ No newline at end of file diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 48cf3934820..9eb14ed5475 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -615,19 +615,50 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, auto new_column_ttls = new_columns.getColumnTTLs(); - auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_ast) + auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_expr_ast) { TTLEntry result; - auto syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_ast, new_columns.getAllPhysical()); - result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false); + auto ttl_syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_expr_ast, new_columns.getAllPhysical()); + result.expression = ExpressionAnalyzer(ttl_expr_ast, ttl_syntax_result, global_context).getActions(false); + result.result_column = ttl_expr_ast->getColumnName(); + result.destination_type = PartDestinationType::DELETE; - result.result_column = ttl_ast->getColumnName(); checkTTLExpression(result.expression, result.result_column); return result; }; + auto create_rows_ttl_entry = [this, &new_columns, &create_ttl_entry](const ASTTTLElement * ttl_element) + { + auto result = create_ttl_entry(ttl_element->ttl()); + if (ttl_element->mode == ASTTTLElement::Mode::DELETE || ttl_element->mode ==ASTTTLElement::Mode::MOVE) + { + if (ASTPtr where_expr_ast = ttl_element->where()) + { + auto where_syntax_result = SyntaxAnalyzer(global_context).analyze(where_expr_ast, new_columns.getAllPhysical()); + result.where_expression = ExpressionAnalyzer(where_expr_ast, where_syntax_result, global_context).getActions(false); + result.where_filter_column = where_expr_ast->getColumnName(); + } + } + else if (ttl_element->mode == ASTTTLElement::Mode::GROUP_BY) + { + result.group_by_keys = ttl_element->group_by_key_columns; + for (auto [name, value] : ttl_element->group_by_aggregations) + { + auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true); + auto expr_analyzer = ExpressionAnalyzer(value, syntax_result, global_context); + + result.group_by_aggregations.emplace_back(name, expr_analyzer.getActions(false)); + result.group_by_aggregations_res_column.emplace_back(name, value->getColumnName()); + + for (const auto descr : expr_analyzer.getAnalyzedData().aggregate_descriptions) + result.aggregate_descriptions.push_back(descr); + } + } + return result; + }; + if (!new_column_ttls.empty()) { NameSet columns_ttl_forbidden; @@ -672,7 +703,8 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION); } - auto new_rows_ttl_entry = create_ttl_entry(ttl_element->children[0]); + LOG_DEBUG(log, "ttl_element->size is " << ttl_element->size()); + auto new_rows_ttl_entry = create_rows_ttl_entry(ttl_element); if (!only_check) update_rows_ttl_entry = new_rows_ttl_entry; @@ -680,7 +712,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, } else { - auto new_ttl_entry = create_ttl_entry(ttl_element->children[0]); + auto new_ttl_entry = create_rows_ttl_entry(ttl_element); new_ttl_entry.entry_ast = ttl_element_ptr; new_ttl_entry.destination_type = ttl_element->destination_type; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index cd1d4912a29..dfcddb40f3c 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -652,6 +653,14 @@ public: ExpressionActionsPtr expression; String result_column; + ExpressionActionsPtr where_expression; + String where_filter_column; + + Names group_by_keys; + std::vector> group_by_aggregations; + std::vector> group_by_aggregations_res_column; + AggregateDescriptions aggregate_descriptions; + /// Name and type of a destination are only valid in table-level context. PartDestinationType destination_type; String destination_name; From eeaf608aa912543540106ec5bc4363b88d3324f3 Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Tue, 12 May 2020 17:27:00 +0300 Subject: [PATCH 02/14] Refactor code; support prefix of PK as GROUP BY key --- src/DataStreams/TTLBlockInputStream.cpp | 175 +++++++++-------------- src/DataStreams/TTLBlockInputStream.h | 6 + src/Parsers/ASTTTLElement.cpp | 57 ++++---- src/Parsers/ASTTTLElement.h | 38 +++-- src/Parsers/ExpressionElementParsers.cpp | 33 +++-- src/Storages/MergeTree/MergeTreeData.cpp | 32 +++-- src/Storages/MergeTree/MergeTreeData.h | 8 +- src/Storages/MergeTree/TTLMode.h | 14 ++ 8 files changed, 177 insertions(+), 186 deletions(-) create mode 100644 src/Storages/MergeTree/TTLMode.h diff --git a/src/DataStreams/TTLBlockInputStream.cpp b/src/DataStreams/TTLBlockInputStream.cpp index d816cba4b2f..d5e46a9929d 100644 --- a/src/DataStreams/TTLBlockInputStream.cpp +++ b/src/DataStreams/TTLBlockInputStream.cpp @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB { @@ -68,8 +69,10 @@ TTLBlockInputStream::TTLBlockInputStream( defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true); } - if (storage.hasRowsTTL() && !storage.rows_ttl_entry.group_by_keys.empty()) + if (storage.hasRowsTTL() && storage.rows_ttl_entry.mode == TTLMode::GROUP_BY) { + current_key_value.resize(storage.rows_ttl_entry.group_by_keys.size()); + ColumnNumbers keys; for (const auto & key : storage.rows_ttl_entry.group_by_keys) keys.push_back(header.getPositionByName(key)); @@ -84,9 +87,11 @@ TTLBlockInputStream::TTLBlockInputStream( const Settings & settings = storage.global_context.getSettingsRef(); + bool allow_to_use_two_level_group_by = false; // settings.max_bytes_before_external_group_by != 0; Aggregator::Params params(header, keys, aggregates, false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, - SettingUInt64(0), SettingUInt64(0), + allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), + allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, storage.global_context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); aggregator = std::make_unique(params); @@ -102,7 +107,7 @@ Block TTLBlockInputStream::readImpl() { /// Skip all data if table ttl is expired for part if (storage.hasRowsTTL() && !storage.rows_ttl_entry.where_expression && - storage.rows_ttl_entry.group_by_keys.empty() && isTTLExpired(old_ttl_infos.table_ttl.max)) + storage.rows_ttl_entry.mode != TTLMode::GROUP_BY && isTTLExpired(old_ttl_infos.table_ttl.max)) { rows_removed = data_part->rows_count; return {}; @@ -113,36 +118,9 @@ Block TTLBlockInputStream::readImpl() { if (aggregator && !agg_result.empty()) { - MutableColumns result_columns; - const auto & column_names = header.getNames(); - for (const auto & column_name : column_names) - { - const IColumn * values_column = header.getByName(column_name).column.get(); - MutableColumnPtr result_column = values_column->cloneEmpty(); - result_columns.emplace_back(std::move(result_column)); - } - - auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1); - for (auto & agg_block : aggregated_res) - { - for (const auto & [name, actions] : storage.rows_ttl_entry.group_by_aggregations) - actions->execute(agg_block); - for (const auto & name : storage.rows_ttl_entry.group_by_keys) - { - const IColumn * values_column = agg_block.getByName(name).column.get(); - auto & result_column = result_columns[header.getPositionByName(name)]; - result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); - } - for (const auto & [name, res_column] : storage.rows_ttl_entry.group_by_aggregations_res_column) - { - const IColumn * values_column = agg_block.getByName(res_column).column.get(); - auto & result_column = result_columns[header.getPositionByName(name)]; - result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); - } - } - + MutableColumns result_columns = header.cloneEmptyColumns(); + finalizeAggregates(result_columns); block = header.cloneWithColumns(std::move(result_columns)); - agg_result.invalidate(); } return block; @@ -181,8 +159,8 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) const IColumn * ttl_column = block.getByName(storage.rows_ttl_entry.result_column).column.get(); - const IColumn * where_filter_column = storage.rows_ttl_entry.where_expression ? - block.getByName(storage.rows_ttl_entry.where_filter_column).column.get() : nullptr; + const IColumn * where_result_column = storage.rows_ttl_entry.where_expression ? + block.getByName(storage.rows_ttl_entry.where_result_column).column.get() : nullptr; const auto & column_names = header.getNames(); @@ -199,7 +177,7 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) for (size_t i = 0; i < block.rows(); ++i) { UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); - bool where_filter_passed = !where_filter_column || where_filter_column->getBool(i); + bool where_filter_passed = !where_result_column || where_result_column->getBool(i); if (!isTTLExpired(cur_ttl) || !where_filter_passed) { new_ttl_infos.table_ttl.update(cur_ttl); @@ -214,20 +192,8 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) } else { - MutableColumns result_columns; - MutableColumns aggregate_columns; - - for (const auto & column_name : column_names) - { - const IColumn * values_column = block.getByName(column_name).column.get(); - MutableColumnPtr result_column = values_column->cloneEmpty(); - result_column->reserve(block.rows()); - result_columns.emplace_back(std::move(result_column)); - - MutableColumnPtr aggregate_column = values_column->cloneEmpty(); - aggregate_column->reserve(block.rows()); - aggregate_columns.emplace_back(std::move(aggregate_column)); - } + MutableColumns result_columns = header.cloneEmptyColumns(); + MutableColumns aggregate_columns = header.cloneEmptyColumns(); size_t rows_aggregated = 0; size_t current_key_start = 0; @@ -235,72 +201,30 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) for (size_t i = 0; i < block.rows(); ++i) { UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); - bool where_filter_passed = !where_filter_column || where_filter_column->getBool(i); + bool where_filter_passed = !where_result_column || where_result_column->getBool(i); bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed; bool same_as_current = true; - if (current_key_value.empty()) - { - same_as_current = false; - current_key_value.resize(storage.rows_ttl_entry.group_by_keys.size()); - } for (size_t j = 0; j < storage.rows_ttl_entry.group_by_keys.size(); ++j) { const String & key_column = storage.rows_ttl_entry.group_by_keys[j]; const IColumn * values_column = block.getByName(key_column).column.get(); - if (!same_as_current) - values_column->get(i, current_key_value[j]); - else + if (!same_as_current || (*values_column)[i] != current_key_value[j]) { - Field value; - values_column->get(i, value); - if (value != current_key_value[j]) - { - current_key_value[j] = value; - same_as_current = false; - } + values_column->get(i, current_key_value[j]); + same_as_current = false; } } if (!same_as_current) { if (rows_with_current_key) - { - Columns aggregate_chunk; - aggregate_chunk.reserve(aggregate_columns.size()); - for (const auto & name : column_names) - { - const auto & column = aggregate_columns[header.getPositionByName(name)]; - ColumnPtr chunk_column = column->cut(current_key_start, rows_with_current_key); - aggregate_chunk.emplace_back(std::move(chunk_column)); - } - aggregator->executeOnBlock(aggregate_chunk, rows_with_current_key, agg_result, agg_key_columns, - agg_aggregate_columns, agg_no_more_keys); - } - if (!agg_result.empty()) - { - auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1); - for (auto & agg_block : aggregated_res) - { - for (const auto & [name, actions] : storage.rows_ttl_entry.group_by_aggregations) - actions->execute(agg_block); - for (const auto & name : storage.rows_ttl_entry.group_by_keys) - { - const IColumn * values_column = agg_block.getByName(name).column.get(); - auto & result_column = result_columns[header.getPositionByName(name)]; - result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); - } - for (const auto & [name, res_column] : storage.rows_ttl_entry.group_by_aggregations_res_column) - { - const IColumn * values_column = agg_block.getByName(res_column).column.get(); - auto & result_column = result_columns[header.getPositionByName(name)]; - result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); - } - } - } - agg_result.invalidate(); + calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); + finalizeAggregates(result_columns); + current_key_start = rows_aggregated; rows_with_current_key = 0; } + if (ttl_expired) { ++rows_with_current_key; @@ -325,23 +249,52 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) } if (rows_with_current_key) - { - Columns aggregate_chunk; - aggregate_chunk.reserve(aggregate_columns.size()); - for (const auto & name : column_names) - { - const auto & column = aggregate_columns[header.getPositionByName(name)]; - ColumnPtr chunk_column = column->cut(current_key_start, rows_with_current_key); - aggregate_chunk.emplace_back(std::move(chunk_column)); - } - aggregator->executeOnBlock(aggregate_chunk, rows_with_current_key, agg_result, agg_key_columns, - agg_aggregate_columns, agg_no_more_keys); - } + calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key); block = header.cloneWithColumns(std::move(result_columns)); } } +void TTLBlockInputStream::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length) +{ + Columns aggregate_chunk; + aggregate_chunk.reserve(aggregate_columns.size()); + for (const auto & name : header.getNames()) + { + const auto & column = aggregate_columns[header.getPositionByName(name)]; + ColumnPtr chunk_column = column->cut(start_pos, length); + aggregate_chunk.emplace_back(std::move(chunk_column)); + } + aggregator->executeOnBlock(aggregate_chunk, length, agg_result, agg_key_columns, + agg_aggregate_columns, agg_no_more_keys); +} + +void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns) +{ + if (!agg_result.empty()) + { + auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1); + for (auto & agg_block : aggregated_res) + { + for (const auto & it : storage.rows_ttl_entry.group_by_aggregations) + std::get<2>(it)->execute(agg_block); + for (const auto & name : storage.rows_ttl_entry.group_by_keys) + { + const IColumn * values_column = agg_block.getByName(name).column.get(); + auto & result_column = result_columns[header.getPositionByName(name)]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + for (const auto & it : storage.rows_ttl_entry.group_by_aggregations) + { + const IColumn * values_column = agg_block.getByName(get<1>(it)).column.get(); + auto & result_column = result_columns[header.getPositionByName(std::get<0>(it))]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + } + } + agg_result.invalidate(); +} + void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block) { Block block_with_defaults; diff --git a/src/DataStreams/TTLBlockInputStream.h b/src/DataStreams/TTLBlockInputStream.h index 821f8dd9284..c5ec6596038 100644 --- a/src/DataStreams/TTLBlockInputStream.h +++ b/src/DataStreams/TTLBlockInputStream.h @@ -66,6 +66,12 @@ private: /// Removes rows with expired table ttl and computes new ttl_infos for part void removeRowsWithExpiredTableTTL(Block & block); + + // Calculate aggregates of aggregate_columns into agg_result + void calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length); + + /// Finalize agg_result into result_columns + void finalizeAggregates(MutableColumns & result_columns); /// Updates TTL for moves void updateMovesTTL(Block & block); diff --git a/src/Parsers/ASTTTLElement.cpp b/src/Parsers/ASTTTLElement.cpp index fb35266465b..35e9dc89a0d 100644 --- a/src/Parsers/ASTTTLElement.cpp +++ b/src/Parsers/ASTTTLElement.cpp @@ -11,10 +11,11 @@ ASTPtr ASTTTLElement::clone() const { auto clone = std::make_shared(*this); clone->children.clear(); - clone->positions.clear(); + clone->ttl_expr_pos = -1; + clone->where_expr_pos = -1; - for (auto expr : {Expression::TTL, Expression::WHERE}) - clone->setExpression(expr, getExpression(expr, true)); + clone->setExpression(clone->ttl_expr_pos, getExpression(ttl_expr_pos, true)); + clone->setExpression(clone->where_expr_pos, getExpression(where_expr_pos, true)); for (auto & [name, expr] : clone->group_by_aggregations) expr = expr->clone(); @@ -25,33 +26,33 @@ ASTPtr ASTTTLElement::clone() const void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const { ttl()->formatImpl(settings, state, frame); - if (mode == Mode::MOVE && destination_type == PartDestinationType::DISK) + if (mode == TTLMode::MOVE && destination_type == PartDestinationType::DISK) { settings.ostr << " TO DISK " << quoteString(destination_name); } - else if (mode == Mode::MOVE && destination_type == PartDestinationType::VOLUME) + else if (mode == TTLMode::MOVE && destination_type == PartDestinationType::VOLUME) { settings.ostr << " TO VOLUME " << quoteString(destination_name); } - else if (mode == Mode::GROUP_BY) + else if (mode == TTLMode::GROUP_BY) { settings.ostr << " GROUP BY "; - for (size_t i = 0; i < group_by_key_columns.size(); ++i) + for (auto it = group_by_key_columns.begin(); it != group_by_key_columns.end(); ++it) { - settings.ostr << group_by_key_columns[i]; - if (i + 1 != group_by_key_columns.size()) + if (it != group_by_key_columns.begin()) settings.ostr << ", "; + settings.ostr << *it; } settings.ostr << " SET "; - for (size_t i = 0; i < group_by_aggregations.size(); ++i) + for (auto it = group_by_aggregations.begin(); it != group_by_aggregations.end(); ++it) { - settings.ostr << group_by_aggregations[i].first << " = "; - group_by_aggregations[i].second->formatImpl(settings, state, frame); - if (i + 1 != group_by_aggregations.size()) + if (it != group_by_aggregations.begin()) settings.ostr << ", "; + settings.ostr << it->first << " = "; + it->second->formatImpl(settings, state, frame); } } - else if (mode == Mode::DELETE) + else if (mode == TTLMode::DELETE) { /// It would be better to output "DELETE" here but that will break compatibility with earlier versions. } @@ -63,24 +64,28 @@ void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & st } } -void ASTTTLElement::setExpression(Expression expr, ASTPtr && ast) +void ASTTTLElement::setExpression(int & pos, ASTPtr && ast) { - auto it = positions.find(expr); - if (it == positions.end()) + if (ast) { - positions[expr] = children.size(); - children.emplace_back(ast); + if (pos == -1) + { + pos = children.size(); + children.emplace_back(ast); + } + else + children[pos] = ast; + } + else if (pos != -1) + { + children[pos] = ASTPtr{}; + pos = -1; } - else - children[it->second] = ast; } -ASTPtr ASTTTLElement::getExpression(Expression expr, bool clone) const +ASTPtr ASTTTLElement::getExpression(int pos, bool clone) const { - auto it = positions.find(expr); - if (it != positions.end()) - return clone ? children[it->second]->clone() : children[it->second]; - return {}; + return pos != -1 ? (clone ? children[pos]->clone() : children[pos]) : ASTPtr{}; } } diff --git a/src/Parsers/ASTTTLElement.h b/src/Parsers/ASTTTLElement.h index 1a2d7d3c723..24b02860f32 100644 --- a/src/Parsers/ASTTTLElement.h +++ b/src/Parsers/ASTTTLElement.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB @@ -12,29 +13,19 @@ namespace DB class ASTTTLElement : public IAST { public: - enum class Expression : uint8_t - { - TTL, - WHERE - }; - - enum class Mode : uint8_t - { - DELETE, - MOVE, - GROUP_BY - }; - - Mode mode; + TTLMode mode; PartDestinationType destination_type; String destination_name; - std::vector group_by_key_columns; + + Strings group_by_key_columns; std::vector> group_by_aggregations; - ASTTTLElement(Mode mode_, PartDestinationType destination_type_, const String & destination_name_) + ASTTTLElement(TTLMode mode_, PartDestinationType destination_type_, const String & destination_name_) : mode(mode_) , destination_type(destination_type_) , destination_name(destination_name_) + , ttl_expr_pos(-1) + , where_expr_pos(-1) { } @@ -42,17 +33,22 @@ public: ASTPtr clone() const override; - const ASTPtr ttl() const { return getExpression(Expression::TTL); } - const ASTPtr where() const { return getExpression(Expression::WHERE); } + const ASTPtr ttl() const { return getExpression(ttl_expr_pos); } + const ASTPtr where() const { return getExpression(where_expr_pos); } - void setExpression(Expression expr, ASTPtr && ast); - ASTPtr getExpression(Expression expr, bool clone = false) const; + void setTTL(ASTPtr && ast) { setExpression(ttl_expr_pos, std::forward(ast)); } + void setWhere(ASTPtr && ast) { setExpression(where_expr_pos, std::forward(ast)); } protected: void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; +private: + int ttl_expr_pos; + int where_expr_pos; + private: - std::unordered_map positions; + void setExpression(int & pos, ASTPtr && ast); + ASTPtr getExpression(int pos, bool clone = false) const; }; } diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index 2734aa67193..afd3814d913 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -1460,45 +1460,45 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_set("SET"); ParserToken s_comma(TokenType::Comma); ParserToken s_eq(TokenType::Equals); + ParserIdentifier parser_identifier; ParserStringLiteral parser_string_literal; ParserExpression parser_exp; ParserIdentifierList parser_identifier_list; - ASTPtr ttl_expr; if (!parser_exp.parse(pos, ttl_expr, expected)) return false; - ASTPtr where_expr; - - std::vector group_by_key_columns; - std::vector> group_by_aggregations; - - ASTTTLElement::Mode mode; + TTLMode mode; PartDestinationType destination_type = PartDestinationType::DELETE; String destination_name; + if (s_to_disk.ignore(pos)) { - mode = ASTTTLElement::Mode::MOVE; + mode = TTLMode::MOVE; destination_type = PartDestinationType::DISK; } else if (s_to_volume.ignore(pos)) { - mode = ASTTTLElement::Mode::MOVE; + mode = TTLMode::MOVE; destination_type = PartDestinationType::VOLUME; } else if (s_group_by.ignore(pos)) { - mode = ASTTTLElement::Mode::GROUP_BY; + mode = TTLMode::GROUP_BY; } else { s_delete.ignore(pos); - mode = ASTTTLElement::Mode::DELETE; + mode = TTLMode::DELETE; } - if (mode == ASTTTLElement::Mode::MOVE) + ASTPtr where_expr; + std::vector group_by_key_columns; + std::vector> group_by_aggregations; + + if (mode == TTLMode::MOVE) { ASTPtr ast_space_name; if (!parser_string_literal.parse(pos, ast_space_name, expected)) @@ -1506,7 +1506,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) destination_name = ast_space_name->as().value.get(); } - else if (mode == ASTTTLElement::Mode::GROUP_BY) + else if (mode == TTLMode::GROUP_BY) { ASTPtr ast_group_by_key_columns; if (!parser_identifier_list.parse(pos, ast_group_by_key_columns, expected)) @@ -1541,17 +1541,16 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) group_by_aggregations.emplace_back(name_str, std::move(value)); } } - - if ((mode == ASTTTLElement::Mode::MOVE || mode == ASTTTLElement::Mode::DELETE) && s_where.ignore(pos)) + else if (mode == TTLMode::DELETE && s_where.ignore(pos)) { if (!parser_exp.parse(pos, where_expr, expected)) return false; } auto ttl_element = std::make_shared(mode, destination_type, destination_name); - ttl_element->setExpression(ASTTTLElement::Expression::TTL, std::move(ttl_expr)); + ttl_element->setTTL(std::move(ttl_expr)); if (where_expr) - ttl_element->setExpression(ASTTTLElement::Expression::WHERE, std::move(where_expr)); + ttl_element->setWhere(std::move(where_expr)); ttl_element->group_by_key_columns = std::move(group_by_key_columns); ttl_element->group_by_aggregations = std::move(group_by_aggregations); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 9eb14ed5475..1f55f5ecff5 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -624,6 +624,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, result.result_column = ttl_expr_ast->getColumnName(); result.destination_type = PartDestinationType::DELETE; + result.mode = TTLMode::DELETE; checkTTLExpression(result.expression, result.result_column); return result; @@ -632,27 +633,43 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, auto create_rows_ttl_entry = [this, &new_columns, &create_ttl_entry](const ASTTTLElement * ttl_element) { auto result = create_ttl_entry(ttl_element->ttl()); - if (ttl_element->mode == ASTTTLElement::Mode::DELETE || ttl_element->mode ==ASTTTLElement::Mode::MOVE) + result.mode = ttl_element->mode; + + if (ttl_element->mode == TTLMode::DELETE) { if (ASTPtr where_expr_ast = ttl_element->where()) { auto where_syntax_result = SyntaxAnalyzer(global_context).analyze(where_expr_ast, new_columns.getAllPhysical()); result.where_expression = ExpressionAnalyzer(where_expr_ast, where_syntax_result, global_context).getActions(false); - result.where_filter_column = where_expr_ast->getColumnName(); + result.where_result_column = where_expr_ast->getColumnName(); } } - else if (ttl_element->mode == ASTTTLElement::Mode::GROUP_BY) + else if (ttl_element->mode == TTLMode::GROUP_BY) { + if (ttl_element->group_by_key_columns.size() > this->primary_key_columns.size()) + throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); + for (size_t i = 0; i < ttl_element->group_by_key_columns.size(); ++i) + { + if (ttl_element->group_by_key_columns[i] != this->primary_key_columns[i]) + throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); + } + result.group_by_keys = ttl_element->group_by_key_columns; - for (auto [name, value] : ttl_element->group_by_aggregations) + + auto aggregations = ttl_element->group_by_aggregations; + for (size_t i = ttl_element->group_by_key_columns.size(); i < this->primary_key_columns.size(); ++i) + { + ASTPtr expr = makeASTFunction("max", std::make_shared(this->primary_key_columns[i])); + aggregations.emplace_back(this->primary_key_columns[i], std::move(expr)); + } + for (auto [name, value] : aggregations) { auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true); auto expr_analyzer = ExpressionAnalyzer(value, syntax_result, global_context); - result.group_by_aggregations.emplace_back(name, expr_analyzer.getActions(false)); - result.group_by_aggregations_res_column.emplace_back(name, value->getColumnName()); + result.group_by_aggregations.emplace_back(name, value->getColumnName(), expr_analyzer.getActions(false)); - for (const auto descr : expr_analyzer.getAnalyzedData().aggregate_descriptions) + for (const auto & descr : expr_analyzer.getAnalyzedData().aggregate_descriptions) result.aggregate_descriptions.push_back(descr); } } @@ -703,7 +720,6 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION); } - LOG_DEBUG(log, "ttl_element->size is " << ttl_element->size()); auto new_rows_ttl_entry = create_rows_ttl_entry(ttl_element); if (!only_check) update_rows_ttl_entry = new_rows_ttl_entry; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index dfcddb40f3c..5272ee25314 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -21,6 +21,7 @@ #include #include #include +#include #include #include @@ -650,15 +651,16 @@ public: struct TTLEntry { + TTLMode mode; + ExpressionActionsPtr expression; String result_column; ExpressionActionsPtr where_expression; - String where_filter_column; + String where_result_column; Names group_by_keys; - std::vector> group_by_aggregations; - std::vector> group_by_aggregations_res_column; + std::vector> group_by_aggregations; AggregateDescriptions aggregate_descriptions; /// Name and type of a destination are only valid in table-level context. diff --git a/src/Storages/MergeTree/TTLMode.h b/src/Storages/MergeTree/TTLMode.h new file mode 100644 index 00000000000..99a57f46853 --- /dev/null +++ b/src/Storages/MergeTree/TTLMode.h @@ -0,0 +1,14 @@ +#pragma once + + +namespace DB +{ + +enum class TTLMode +{ + DELETE, + MOVE, + GROUP_BY +}; + +} From 66496dc7e350971da8774f66517a4c3a1b62dbbe Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Tue, 12 May 2020 23:44:48 +0300 Subject: [PATCH 03/14] Fix trailing spaces --- src/DataStreams/TTLBlockInputStream.cpp | 2 +- src/DataStreams/TTLBlockInputStream.h | 4 ++-- src/Interpreters/SyntaxAnalyzer.cpp | 4 ++-- src/Parsers/ASTTTLElement.cpp | 10 +++++----- src/Parsers/ASTTTLElement.h | 6 +++--- src/Parsers/ExpressionElementParsers.cpp | 6 +++--- src/Storages/MergeTree/MergeTreeData.cpp | 6 +++--- src/Storages/MergeTree/TTLMode.h | 2 +- 8 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/DataStreams/TTLBlockInputStream.cpp b/src/DataStreams/TTLBlockInputStream.cpp index d5e46a9929d..106ae1fd5a0 100644 --- a/src/DataStreams/TTLBlockInputStream.cpp +++ b/src/DataStreams/TTLBlockInputStream.cpp @@ -197,7 +197,7 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) size_t rows_aggregated = 0; size_t current_key_start = 0; - size_t rows_with_current_key = 0; + size_t rows_with_current_key = 0; for (size_t i = 0; i < block.rows(); ++i) { UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); diff --git a/src/DataStreams/TTLBlockInputStream.h b/src/DataStreams/TTLBlockInputStream.h index c5ec6596038..a8f86477b88 100644 --- a/src/DataStreams/TTLBlockInputStream.h +++ b/src/DataStreams/TTLBlockInputStream.h @@ -43,7 +43,7 @@ private: std::unique_ptr aggregator; std::vector current_key_value; AggregatedDataVariants agg_result; - ColumnRawPtrs agg_key_columns; + ColumnRawPtrs agg_key_columns; Aggregator::AggregateColumns agg_aggregate_columns; bool agg_no_more_keys; @@ -66,7 +66,7 @@ private: /// Removes rows with expired table ttl and computes new ttl_infos for part void removeRowsWithExpiredTableTTL(Block & block); - + // Calculate aggregates of aggregate_columns into agg_result void calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length); diff --git a/src/Interpreters/SyntaxAnalyzer.cpp b/src/Interpreters/SyntaxAnalyzer.cpp index ec7f2154dad..54f369c9495 100644 --- a/src/Interpreters/SyntaxAnalyzer.cpp +++ b/src/Interpreters/SyntaxAnalyzer.cpp @@ -855,7 +855,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTy optimizeIf(query, result.aliases, settings.optimize_if_chain_to_miltiif); - if (allow_aggregates) + if (allow_aggregates) { GetAggregatesVisitor::Data data; GetAggregatesVisitor(data).visit(query); @@ -866,7 +866,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTy assertNoAggregates(arg, "inside another aggregate function"); result.aggregates = data.aggregates; } - else + else assertNoAggregates(query, "in wrong place"); result.collectUsedColumns(query); diff --git a/src/Parsers/ASTTTLElement.cpp b/src/Parsers/ASTTTLElement.cpp index 35e9dc89a0d..991274e8350 100644 --- a/src/Parsers/ASTTTLElement.cpp +++ b/src/Parsers/ASTTTLElement.cpp @@ -7,13 +7,13 @@ namespace DB { -ASTPtr ASTTTLElement::clone() const +ASTPtr ASTTTLElement::clone() const { auto clone = std::make_shared(*this); clone->children.clear(); clone->ttl_expr_pos = -1; clone->where_expr_pos = -1; - + clone->setExpression(clone->ttl_expr_pos, getExpression(ttl_expr_pos, true)); clone->setExpression(clone->where_expr_pos, getExpression(where_expr_pos, true)); @@ -57,14 +57,14 @@ void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & st /// It would be better to output "DELETE" here but that will break compatibility with earlier versions. } - if (where()) + if (where()) { settings.ostr << " WHERE "; where()->formatImpl(settings, state, frame); } } -void ASTTTLElement::setExpression(int & pos, ASTPtr && ast) +void ASTTTLElement::setExpression(int & pos, ASTPtr && ast) { if (ast) { @@ -83,7 +83,7 @@ void ASTTTLElement::setExpression(int & pos, ASTPtr && ast) } } -ASTPtr ASTTTLElement::getExpression(int pos, bool clone) const +ASTPtr ASTTTLElement::getExpression(int pos, bool clone) const { return pos != -1 ? (clone ? children[pos]->clone() : children[pos]) : ASTPtr{}; } diff --git a/src/Parsers/ASTTTLElement.h b/src/Parsers/ASTTTLElement.h index 24b02860f32..9b40adf2be4 100644 --- a/src/Parsers/ASTTTLElement.h +++ b/src/Parsers/ASTTTLElement.h @@ -21,7 +21,7 @@ public: std::vector> group_by_aggregations; ASTTTLElement(TTLMode mode_, PartDestinationType destination_type_, const String & destination_name_) - : mode(mode_) + : mode(mode_) , destination_type(destination_type_) , destination_name(destination_name_) , ttl_expr_pos(-1) @@ -42,12 +42,12 @@ public: protected: void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; -private: +private: int ttl_expr_pos; int where_expr_pos; private: - void setExpression(int & pos, ASTPtr && ast); + void setExpression(int & pos, ASTPtr && ast); ASTPtr getExpression(int pos, bool clone = false) const; }; diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index afd3814d913..65ff9e1da94 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -1484,7 +1484,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) mode = TTLMode::MOVE; destination_type = PartDestinationType::VOLUME; } - else if (s_group_by.ignore(pos)) + else if (s_group_by.ignore(pos)) { mode = TTLMode::GROUP_BY; } @@ -1517,7 +1517,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (!tryGetIdentifierNameInto(identifier, identifier_str)) return false; group_by_key_columns.emplace_back(std::move(identifier_str)); - } + } if (!s_set.ignore(pos)) return false; @@ -1525,7 +1525,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { if (!group_by_aggregations.empty() && !s_comma.ignore(pos)) break; - + ASTPtr name; ASTPtr value; if (!parser_identifier.parse(pos, name, expected)) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 1f55f5ecff5..bb9d8ce186e 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -635,16 +635,16 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, auto result = create_ttl_entry(ttl_element->ttl()); result.mode = ttl_element->mode; - if (ttl_element->mode == TTLMode::DELETE) + if (ttl_element->mode == TTLMode::DELETE) { - if (ASTPtr where_expr_ast = ttl_element->where()) + if (ASTPtr where_expr_ast = ttl_element->where()) { auto where_syntax_result = SyntaxAnalyzer(global_context).analyze(where_expr_ast, new_columns.getAllPhysical()); result.where_expression = ExpressionAnalyzer(where_expr_ast, where_syntax_result, global_context).getActions(false); result.where_result_column = where_expr_ast->getColumnName(); } } - else if (ttl_element->mode == TTLMode::GROUP_BY) + else if (ttl_element->mode == TTLMode::GROUP_BY) { if (ttl_element->group_by_key_columns.size() > this->primary_key_columns.size()) throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); diff --git a/src/Storages/MergeTree/TTLMode.h b/src/Storages/MergeTree/TTLMode.h index 99a57f46853..0681f10fc17 100644 --- a/src/Storages/MergeTree/TTLMode.h +++ b/src/Storages/MergeTree/TTLMode.h @@ -4,7 +4,7 @@ namespace DB { -enum class TTLMode +enum class TTLMode { DELETE, MOVE, From a991dcf9f4f7d5552b9406548e414432fd237c41 Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Thu, 14 May 2020 13:48:45 +0300 Subject: [PATCH 04/14] Add tests --- .../01246_ttl_where_group_by.reference | 15 +++++++ .../0_stateless/01246_ttl_where_group_by.sql | 41 +++++++++++++++++++ 2 files changed, 56 insertions(+) create mode 100644 tests/queries/0_stateless/01246_ttl_where_group_by.reference create mode 100644 tests/queries/0_stateless/01246_ttl_where_group_by.sql diff --git a/tests/queries/0_stateless/01246_ttl_where_group_by.reference b/tests/queries/0_stateless/01246_ttl_where_group_by.reference new file mode 100644 index 00000000000..7e276e2681e --- /dev/null +++ b/tests/queries/0_stateless/01246_ttl_where_group_by.reference @@ -0,0 +1,15 @@ +1 1 0 4 +1 2 3 7 +1 3 0 5 +2 1 20 1 +2 1 0 1 +1 1 0 4 +1 1 10 6 +1 2 3 7 +1 3 0 5 +2 1 10 1 +3 1 0 8 +1 1 0 4 +1 3 10 6 +2 1 20 1 +3 1 0 8 diff --git a/tests/queries/0_stateless/01246_ttl_where_group_by.sql b/tests/queries/0_stateless/01246_ttl_where_group_by.sql new file mode 100644 index 00000000000..fbbd0336e1b --- /dev/null +++ b/tests/queries/0_stateless/01246_ttl_where_group_by.sql @@ -0,0 +1,41 @@ +drop table if exists ttl_01246_1; + +create table ttl_01246_1 (a Int, b Int, x Int, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second delete where x % 10 == 0 and y > 5; +insert into ttl_01246_1 values (1, 1, 0, 4, now() + 10); +insert into ttl_01246_1 values (1, 1, 10, 6, now()); +insert into ttl_01246_1 values (1, 2, 3, 7, now()); +insert into ttl_01246_1 values (1, 3, 0, 5, now()); +insert into ttl_01246_1 values (2, 1, 20, 1, now()); +insert into ttl_01246_1 values (2, 1, 0, 1, now()); +insert into ttl_01246_1 values (3, 1, 0, 8, now()); +select sleep(1.1) format Null; +optimize table ttl_01246_1 final; +select a, b, x, y from ttl_01246_1; + +drop table if exists ttl_01246_1; + +create table ttl_01246_1 (a Int, b Int, x Int32, y Double, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set x = cast(median(x) as Int32), y = avg(y), d = max(d); +insert into ttl_01246_1 values (1, 1, 0, 4, now() + 10); +insert into ttl_01246_1 values (1, 1, 10, 6, now()); +insert into ttl_01246_1 values (1, 2, 3, 7, now()); +insert into ttl_01246_1 values (1, 3, 0, 5, now()); +insert into ttl_01246_1 values (2, 1, 20, 1, now()); +insert into ttl_01246_1 values (2, 1, 0, 1, now()); +insert into ttl_01246_1 values (3, 1, 0, 8, now()); +select sleep(1.1) format Null; +optimize table ttl_01246_1 final; +select a, b, x, y from ttl_01246_1; + +drop table if exists ttl_01246_1; + +create table ttl_01246_1 (a Int, b Int, x Int32, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set x = max(x), y = cast(round(avg(y)) as Int), d = max(d); +insert into ttl_01246_1 values (1, 1, 0, 4, now() + 10); +insert into ttl_01246_1 values (1, 1, 10, 6, now()); +insert into ttl_01246_1 values (1, 2, 3, 7, now()); +insert into ttl_01246_1 values (1, 3, 0, 5, now()); +insert into ttl_01246_1 values (2, 1, 20, 1, now()); +insert into ttl_01246_1 values (2, 1, 0, 1, now()); +insert into ttl_01246_1 values (3, 1, 0, 8, now()); +select sleep(1.1) format Null; +optimize table ttl_01246_1 final; +select a, b, x, y from ttl_01246_1; \ No newline at end of file From 9269db072af72d5f8c5dbf6483d21b50bdad4ac9 Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Thu, 14 May 2020 17:07:36 +0300 Subject: [PATCH 05/14] Better tests --- .../0_stateless/01246_ttl_where_group_by.reference | 11 +++++------ .../queries/0_stateless/01246_ttl_where_group_by.sql | 12 ++++++++---- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/tests/queries/0_stateless/01246_ttl_where_group_by.reference b/tests/queries/0_stateless/01246_ttl_where_group_by.reference index 7e276e2681e..69d3d77cd46 100644 --- a/tests/queries/0_stateless/01246_ttl_where_group_by.reference +++ b/tests/queries/0_stateless/01246_ttl_where_group_by.reference @@ -4,12 +4,11 @@ 2 1 20 1 2 1 0 1 1 1 0 4 -1 1 10 6 -1 2 3 7 -1 3 0 5 -2 1 10 1 +1 1 6 6.5 +1 3 4 5.666666666666667 +2 1 10 4 3 1 0 8 1 1 0 4 1 3 10 6 -2 1 20 1 -3 1 0 8 +2 1 20 2 +3 5 8 4 diff --git a/tests/queries/0_stateless/01246_ttl_where_group_by.sql b/tests/queries/0_stateless/01246_ttl_where_group_by.sql index fbbd0336e1b..ad969127eb1 100644 --- a/tests/queries/0_stateless/01246_ttl_where_group_by.sql +++ b/tests/queries/0_stateless/01246_ttl_where_group_by.sql @@ -17,9 +17,11 @@ drop table if exists ttl_01246_1; create table ttl_01246_1 (a Int, b Int, x Int32, y Double, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set x = cast(median(x) as Int32), y = avg(y), d = max(d); insert into ttl_01246_1 values (1, 1, 0, 4, now() + 10); insert into ttl_01246_1 values (1, 1, 10, 6, now()); -insert into ttl_01246_1 values (1, 2, 3, 7, now()); +insert into ttl_01246_1 values (1, 1, 3, 7, now()); insert into ttl_01246_1 values (1, 3, 0, 5, now()); -insert into ttl_01246_1 values (2, 1, 20, 1, now()); +insert into ttl_01246_1 values (1, 3, 4, 9, now()); +insert into ttl_01246_1 values (1, 3, 6, 3, now()); +insert into ttl_01246_1 values (2, 1, 20, 7, now()); insert into ttl_01246_1 values (2, 1, 0, 1, now()); insert into ttl_01246_1 values (3, 1, 0, 8, now()); select sleep(1.1) format Null; @@ -34,8 +36,10 @@ insert into ttl_01246_1 values (1, 1, 10, 6, now()); insert into ttl_01246_1 values (1, 2, 3, 7, now()); insert into ttl_01246_1 values (1, 3, 0, 5, now()); insert into ttl_01246_1 values (2, 1, 20, 1, now()); -insert into ttl_01246_1 values (2, 1, 0, 1, now()); -insert into ttl_01246_1 values (3, 1, 0, 8, now()); +insert into ttl_01246_1 values (2, 1, 0, 3, now()); +insert into ttl_01246_1 values (3, 1, 0, 3, now()); +insert into ttl_01246_1 values (3, 2, 8, 2, now()); +insert into ttl_01246_1 values (3, 5, 5, 8, now()); select sleep(1.1) format Null; optimize table ttl_01246_1 final; select a, b, x, y from ttl_01246_1; \ No newline at end of file From 1f02ba9692b7152dc82c3ce8f0e6f62ec5aa120f Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Thu, 14 May 2020 21:46:15 +0300 Subject: [PATCH 06/14] Add eof --- src/Parsers/ExpressionListParsers.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Parsers/ExpressionListParsers.h b/src/Parsers/ExpressionListParsers.h index ba939d8d160..9cc36967bd7 100644 --- a/src/Parsers/ExpressionListParsers.h +++ b/src/Parsers/ExpressionListParsers.h @@ -430,4 +430,4 @@ protected: bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; }; -} \ No newline at end of file +} From c6541b1dad1fe95644a9dc1c916dbd162be5d37e Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Fri, 15 May 2020 02:32:45 +0300 Subject: [PATCH 07/14] Up test id; Fix clang build --- src/Interpreters/SyntaxAnalyzer.cpp | 4 ++-- src/Parsers/ExpressionElementParsers.cpp | 2 +- src/Parsers/ExpressionListParsers.cpp | 2 +- ..._group_by.reference => 01280_ttl_where_group_by.reference} | 0 ...46_ttl_where_group_by.sql => 01280_ttl_where_group_by.sql} | 0 5 files changed, 4 insertions(+), 4 deletions(-) rename tests/queries/0_stateless/{01246_ttl_where_group_by.reference => 01280_ttl_where_group_by.reference} (100%) rename tests/queries/0_stateless/{01246_ttl_where_group_by.sql => 01280_ttl_where_group_by.sql} (100%) diff --git a/src/Interpreters/SyntaxAnalyzer.cpp b/src/Interpreters/SyntaxAnalyzer.cpp index 54f369c9495..b5f86b87fdc 100644 --- a/src/Interpreters/SyntaxAnalyzer.cpp +++ b/src/Interpreters/SyntaxAnalyzer.cpp @@ -839,7 +839,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect( return std::make_shared(result); } -SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage, bool allow_aggregates) const +SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage, bool allow_aggregations) const { if (query->as()) throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR); @@ -855,7 +855,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTy optimizeIf(query, result.aliases, settings.optimize_if_chain_to_miltiif); - if (allow_aggregates) + if (allow_aggregations) { GetAggregatesVisitor::Data data; GetAggregatesVisitor(data).visit(query); diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index 65ff9e1da94..5b5d9f05b22 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -1511,7 +1511,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ASTPtr ast_group_by_key_columns; if (!parser_identifier_list.parse(pos, ast_group_by_key_columns, expected)) return false; - for (const auto identifier : ast_group_by_key_columns->children) + for (const auto & identifier : ast_group_by_key_columns->children) { String identifier_str; if (!tryGetIdentifierNameInto(identifier, identifier_str)) diff --git a/src/Parsers/ExpressionListParsers.cpp b/src/Parsers/ExpressionListParsers.cpp index 062d1dd4483..2561bf7ef83 100644 --- a/src/Parsers/ExpressionListParsers.cpp +++ b/src/Parsers/ExpressionListParsers.cpp @@ -751,4 +751,4 @@ bool ParserIdentifierList::parseImpl(Pos & pos, ASTPtr & node, Expected & expect .parse(pos, node, expected); } -} \ No newline at end of file +} diff --git a/tests/queries/0_stateless/01246_ttl_where_group_by.reference b/tests/queries/0_stateless/01280_ttl_where_group_by.reference similarity index 100% rename from tests/queries/0_stateless/01246_ttl_where_group_by.reference rename to tests/queries/0_stateless/01280_ttl_where_group_by.reference diff --git a/tests/queries/0_stateless/01246_ttl_where_group_by.sql b/tests/queries/0_stateless/01280_ttl_where_group_by.sql similarity index 100% rename from tests/queries/0_stateless/01246_ttl_where_group_by.sql rename to tests/queries/0_stateless/01280_ttl_where_group_by.sql From 6585571ee2e8d8b0a832e4b51f0e40762d096a4b Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Sat, 16 May 2020 02:53:47 +0300 Subject: [PATCH 08/14] Fix bug --- src/Interpreters/ExpressionAnalyzer.cpp | 89 +++++++++++++------------ 1 file changed, 45 insertions(+), 44 deletions(-) diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index c432b920a9f..b38af6feef9 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -192,61 +192,65 @@ void ExpressionAnalyzer::analyzeAggregation() if (has_aggregation) { - // getSelectQuery(); /// assertSelect() /// Find out aggregation keys. - if (select_query && select_query->groupBy()) + if (select_query) { - NameSet unique_keys; - ASTs & group_asts = select_query->groupBy()->children; - for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i) + if (select_query->groupBy()) { - ssize_t size = group_asts.size(); - getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false); - - const auto & column_name = group_asts[i]->getColumnName(); - const auto & block = temp_actions->getSampleBlock(); - - if (!block.has(column_name)) - throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER); - - const auto & col = block.getByName(column_name); - - /// Constant expressions have non-null column pointer at this stage. - if (col.column && isColumnConst(*col.column)) + NameSet unique_keys; + ASTs & group_asts = select_query->groupBy()->children; + for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i) { - /// But don't remove last key column if no aggregate functions, otherwise aggregation will not work. - if (!aggregate_descriptions.empty() || size > 1) + ssize_t size = group_asts.size(); + getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false); + + const auto & column_name = group_asts[i]->getColumnName(); + const auto & block = temp_actions->getSampleBlock(); + + if (!block.has(column_name)) + throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER); + + const auto & col = block.getByName(column_name); + + /// Constant expressions have non-null column pointer at this stage. + if (col.column && isColumnConst(*col.column)) { - if (i + 1 < static_cast(size)) - group_asts[i] = std::move(group_asts.back()); + /// But don't remove last key column if no aggregate functions, otherwise aggregation will not work. + if (!aggregate_descriptions.empty() || size > 1) + { + if (i + 1 < static_cast(size)) + group_asts[i] = std::move(group_asts.back()); - group_asts.pop_back(); + group_asts.pop_back(); - --i; - continue; + --i; + continue; + } + } + + NameAndTypePair key{column_name, col.type}; + + /// Aggregation keys are uniqued. + if (!unique_keys.count(key.name)) + { + unique_keys.insert(key.name); + aggregation_keys.push_back(key); + + /// Key is no longer needed, therefore we can save a little by moving it. + aggregated_columns.push_back(std::move(key)); } } - NameAndTypePair key{column_name, col.type}; - - /// Aggregation keys are uniqued. - if (!unique_keys.count(key.name)) + if (group_asts.empty()) { - unique_keys.insert(key.name); - aggregation_keys.push_back(key); - - /// Key is no longer needed, therefore we can save a little by moving it. - aggregated_columns.push_back(std::move(key)); + select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, {}); + has_aggregation = select_query->having() || !aggregate_descriptions.empty(); } } - - if (group_asts.empty()) - { - select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, {}); - has_aggregation = select_query->having() || !aggregate_descriptions.empty(); - } } + else + aggregated_columns = temp_actions->getSampleBlock().getNamesAndTypesList(); for (const auto & desc : aggregate_descriptions) aggregated_columns.emplace_back(desc.column_name, desc.function->getReturnType()); @@ -926,10 +930,7 @@ void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result) { - NamesAndTypesList columns(sourceColumns()); - for (const auto & col : aggregated_columns) - columns.push_back(col); - ExpressionActionsPtr actions = std::make_shared(columns, context); + ExpressionActionsPtr actions = std::make_shared(aggregated_columns, context); NamesWithAliases result_columns; Names result_names; From ed09f68b8dbead260ee3506ff3533616259dca04 Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Sat, 16 May 2020 19:31:29 +0300 Subject: [PATCH 09/14] Better tests --- .../01280_ttl_where_group_by.reference | 14 ++-- .../0_stateless/01280_ttl_where_group_by.sql | 76 +++++++++---------- 2 files changed, 45 insertions(+), 45 deletions(-) diff --git a/tests/queries/0_stateless/01280_ttl_where_group_by.reference b/tests/queries/0_stateless/01280_ttl_where_group_by.reference index 69d3d77cd46..572fc7731d3 100644 --- a/tests/queries/0_stateless/01280_ttl_where_group_by.reference +++ b/tests/queries/0_stateless/01280_ttl_where_group_by.reference @@ -3,12 +3,12 @@ 1 3 0 5 2 1 20 1 2 1 0 1 -1 1 0 4 -1 1 6 6.5 -1 3 4 5.666666666666667 -2 1 10 4 -3 1 0 8 +1 1 [0,2,3] 4 +1 1 [5,4,1] 13 +1 3 [1,0,1,0] 17 +2 1 [3,1,0,3] 8 +3 1 [2,4,5] 8 1 1 0 4 1 3 10 6 -2 1 20 2 -3 5 8 4 +2 1 0 3 +3 5 8 2 diff --git a/tests/queries/0_stateless/01280_ttl_where_group_by.sql b/tests/queries/0_stateless/01280_ttl_where_group_by.sql index ad969127eb1..0ca06e77001 100644 --- a/tests/queries/0_stateless/01280_ttl_where_group_by.sql +++ b/tests/queries/0_stateless/01280_ttl_where_group_by.sql @@ -1,45 +1,45 @@ -drop table if exists ttl_01246_1; +drop table if exists ttl_01280_1; -create table ttl_01246_1 (a Int, b Int, x Int, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second delete where x % 10 == 0 and y > 5; -insert into ttl_01246_1 values (1, 1, 0, 4, now() + 10); -insert into ttl_01246_1 values (1, 1, 10, 6, now()); -insert into ttl_01246_1 values (1, 2, 3, 7, now()); -insert into ttl_01246_1 values (1, 3, 0, 5, now()); -insert into ttl_01246_1 values (2, 1, 20, 1, now()); -insert into ttl_01246_1 values (2, 1, 0, 1, now()); -insert into ttl_01246_1 values (3, 1, 0, 8, now()); +create table ttl_01280_1 (a Int, b Int, x Int, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second delete where x % 10 == 0 and y > 5; +insert into ttl_01280_1 values (1, 1, 0, 4, now() + 10); +insert into ttl_01280_1 values (1, 1, 10, 6, now()); +insert into ttl_01280_1 values (1, 2, 3, 7, now()); +insert into ttl_01280_1 values (1, 3, 0, 5, now()); +insert into ttl_01280_1 values (2, 1, 20, 1, now()); +insert into ttl_01280_1 values (2, 1, 0, 1, now()); +insert into ttl_01280_1 values (3, 1, 0, 8, now()); select sleep(1.1) format Null; -optimize table ttl_01246_1 final; -select a, b, x, y from ttl_01246_1; +optimize table ttl_01280_1 final; +select a, b, x, y from ttl_01280_1; -drop table if exists ttl_01246_1; +drop table if exists ttl_01280_2; -create table ttl_01246_1 (a Int, b Int, x Int32, y Double, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set x = cast(median(x) as Int32), y = avg(y), d = max(d); -insert into ttl_01246_1 values (1, 1, 0, 4, now() + 10); -insert into ttl_01246_1 values (1, 1, 10, 6, now()); -insert into ttl_01246_1 values (1, 1, 3, 7, now()); -insert into ttl_01246_1 values (1, 3, 0, 5, now()); -insert into ttl_01246_1 values (1, 3, 4, 9, now()); -insert into ttl_01246_1 values (1, 3, 6, 3, now()); -insert into ttl_01246_1 values (2, 1, 20, 7, now()); -insert into ttl_01246_1 values (2, 1, 0, 1, now()); -insert into ttl_01246_1 values (3, 1, 0, 8, now()); +create table ttl_01280_2 (a Int, b Int, x Array(Int32), y Double, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set x = minForEach(x), y = sum(y), d = max(d); +insert into ttl_01280_2 values (1, 1, array(0, 2, 3), 4, now() + 10); +insert into ttl_01280_2 values (1, 1, array(5, 4, 3), 6, now()); +insert into ttl_01280_2 values (1, 1, array(5, 5, 1), 7, now()); +insert into ttl_01280_2 values (1, 3, array(3, 0, 4), 5, now()); +insert into ttl_01280_2 values (1, 3, array(1, 1, 2, 1), 9, now()); +insert into ttl_01280_2 values (1, 3, array(3, 2, 1, 0), 3, now()); +insert into ttl_01280_2 values (2, 1, array(3, 3, 3), 7, now()); +insert into ttl_01280_2 values (2, 1, array(11, 1, 0, 3), 1, now()); +insert into ttl_01280_2 values (3, 1, array(2, 4, 5), 8, now()); select sleep(1.1) format Null; -optimize table ttl_01246_1 final; -select a, b, x, y from ttl_01246_1; +optimize table ttl_01280_2 final; +select a, b, x, y from ttl_01280_2; -drop table if exists ttl_01246_1; +drop table if exists ttl_01280_3; -create table ttl_01246_1 (a Int, b Int, x Int32, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set x = max(x), y = cast(round(avg(y)) as Int), d = max(d); -insert into ttl_01246_1 values (1, 1, 0, 4, now() + 10); -insert into ttl_01246_1 values (1, 1, 10, 6, now()); -insert into ttl_01246_1 values (1, 2, 3, 7, now()); -insert into ttl_01246_1 values (1, 3, 0, 5, now()); -insert into ttl_01246_1 values (2, 1, 20, 1, now()); -insert into ttl_01246_1 values (2, 1, 0, 3, now()); -insert into ttl_01246_1 values (3, 1, 0, 3, now()); -insert into ttl_01246_1 values (3, 2, 8, 2, now()); -insert into ttl_01246_1 values (3, 5, 5, 8, now()); -select sleep(1.1) format Null; -optimize table ttl_01246_1 final; -select a, b, x, y from ttl_01246_1; \ No newline at end of file +create table ttl_01280_3 (a Int, b Int, x Int64, y Int, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set x = argMax(x, d), y = argMax(y, d), d = max(d); +insert into ttl_01280_3 values (1, 1, 0, 4, now() + 10); +insert into ttl_01280_3 values (1, 1, 10, 6, now() + 1); +insert into ttl_01280_3 values (1, 2, 3, 7, now()); +insert into ttl_01280_3 values (1, 3, 0, 5, now()); +insert into ttl_01280_3 values (2, 1, 20, 1, now()); +insert into ttl_01280_3 values (2, 1, 0, 3, now() + 1); +insert into ttl_01280_3 values (3, 1, 0, 3, now()); +insert into ttl_01280_3 values (3, 2, 8, 2, now() + 1); +insert into ttl_01280_3 values (3, 5, 5, 8, now()); +select sleep(2.1) format Null; +optimize table ttl_01280_3 final; +select a, b, x, y from ttl_01280_3; \ No newline at end of file From 7d937b43c7f0248069bdb7b1686244ce9d0c0f2b Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Sun, 17 May 2020 16:01:38 +0300 Subject: [PATCH 10/14] Init with correct value --- src/DataStreams/TTLBlockInputStream.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DataStreams/TTLBlockInputStream.h b/src/DataStreams/TTLBlockInputStream.h index a8f86477b88..c6ffa95cd75 100644 --- a/src/DataStreams/TTLBlockInputStream.h +++ b/src/DataStreams/TTLBlockInputStream.h @@ -45,7 +45,7 @@ private: AggregatedDataVariants agg_result; ColumnRawPtrs agg_key_columns; Aggregator::AggregateColumns agg_aggregate_columns; - bool agg_no_more_keys; + bool agg_no_more_keys = false; IMergeTreeDataPart::TTLInfos old_ttl_infos; IMergeTreeDataPart::TTLInfos new_ttl_infos; From 141ed887515e6523c192690baf66dd1c2ca3a3b8 Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Sun, 24 May 2020 23:21:23 +0300 Subject: [PATCH 11/14] Allow functions in group by keys; Add default aggregate function; Add more tests --- src/Parsers/ASTTTLElement.cpp | 23 ++++--- src/Parsers/ASTTTLElement.h | 2 +- src/Parsers/ExpressionElementParsers.cpp | 58 ++++++++--------- src/Parsers/ExpressionListParsers.cpp | 9 --- src/Parsers/ExpressionListParsers.h | 9 --- src/Storages/MergeTree/MergeTreeData.cpp | 62 ++++++++++++++++--- .../01280_ttl_where_group_by.reference | 6 ++ .../0_stateless/01280_ttl_where_group_by.sql | 44 ++++++++++++- 8 files changed, 146 insertions(+), 67 deletions(-) diff --git a/src/Parsers/ASTTTLElement.cpp b/src/Parsers/ASTTTLElement.cpp index 991274e8350..33f8a3ac906 100644 --- a/src/Parsers/ASTTTLElement.cpp +++ b/src/Parsers/ASTTTLElement.cpp @@ -17,6 +17,8 @@ ASTPtr ASTTTLElement::clone() const clone->setExpression(clone->ttl_expr_pos, getExpression(ttl_expr_pos, true)); clone->setExpression(clone->where_expr_pos, getExpression(where_expr_pos, true)); + for (auto & expr : clone->group_by_key) + expr = expr->clone(); for (auto & [name, expr] : clone->group_by_aggregations) expr = expr->clone(); @@ -37,19 +39,22 @@ void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & st else if (mode == TTLMode::GROUP_BY) { settings.ostr << " GROUP BY "; - for (auto it = group_by_key_columns.begin(); it != group_by_key_columns.end(); ++it) + for (auto it = group_by_key.begin(); it != group_by_key.end(); ++it) { - if (it != group_by_key_columns.begin()) + if (it != group_by_key.begin()) settings.ostr << ", "; - settings.ostr << *it; + (*it)->formatImpl(settings, state, frame); } - settings.ostr << " SET "; - for (auto it = group_by_aggregations.begin(); it != group_by_aggregations.end(); ++it) + if (!group_by_aggregations.empty()) { - if (it != group_by_aggregations.begin()) - settings.ostr << ", "; - settings.ostr << it->first << " = "; - it->second->formatImpl(settings, state, frame); + settings.ostr << " SET "; + for (auto it = group_by_aggregations.begin(); it != group_by_aggregations.end(); ++it) + { + if (it != group_by_aggregations.begin()) + settings.ostr << ", "; + settings.ostr << it->first << " = "; + it->second->formatImpl(settings, state, frame); + } } } else if (mode == TTLMode::DELETE) diff --git a/src/Parsers/ASTTTLElement.h b/src/Parsers/ASTTTLElement.h index 9b40adf2be4..2004a4f927b 100644 --- a/src/Parsers/ASTTTLElement.h +++ b/src/Parsers/ASTTTLElement.h @@ -17,7 +17,7 @@ public: PartDestinationType destination_type; String destination_name; - Strings group_by_key_columns; + ASTs group_by_key; std::vector> group_by_aggregations; ASTTTLElement(TTLMode mode_, PartDestinationType destination_type_, const String & destination_name_) diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index 5b5d9f05b22..2ea39946ff7 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -1464,7 +1464,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserIdentifier parser_identifier; ParserStringLiteral parser_string_literal; ParserExpression parser_exp; - ParserIdentifierList parser_identifier_list; + ParserExpressionList parser_expression_list(false); ASTPtr ttl_expr; if (!parser_exp.parse(pos, ttl_expr, expected)) @@ -1495,7 +1495,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) } ASTPtr where_expr; - std::vector group_by_key_columns; + ASTPtr ast_group_by_key; std::vector> group_by_aggregations; if (mode == TTLMode::MOVE) @@ -1508,37 +1508,30 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) } else if (mode == TTLMode::GROUP_BY) { - ASTPtr ast_group_by_key_columns; - if (!parser_identifier_list.parse(pos, ast_group_by_key_columns, expected)) + if (!parser_expression_list.parse(pos, ast_group_by_key, expected)) return false; - for (const auto & identifier : ast_group_by_key_columns->children) + + if (s_set.ignore(pos)) { - String identifier_str; - if (!tryGetIdentifierNameInto(identifier, identifier_str)) - return false; - group_by_key_columns.emplace_back(std::move(identifier_str)); - } + while (true) + { + if (!group_by_aggregations.empty() && !s_comma.ignore(pos)) + break; - if (!s_set.ignore(pos)) - return false; - while (true) - { - if (!group_by_aggregations.empty() && !s_comma.ignore(pos)) - break; + ASTPtr name; + ASTPtr value; + if (!parser_identifier.parse(pos, name, expected)) + return false; + if (!s_eq.ignore(pos)) + return false; + if (!parser_exp.parse(pos, value, expected)) + return false; - ASTPtr name; - ASTPtr value; - if (!parser_identifier.parse(pos, name, expected)) - return false; - if (!s_eq.ignore(pos)) - return false; - if (!parser_exp.parse(pos, value, expected)) - return false; - - String name_str; - if (!tryGetIdentifierNameInto(name, name_str)) - return false; - group_by_aggregations.emplace_back(name_str, std::move(value)); + String name_str; + if (!tryGetIdentifierNameInto(name, name_str)) + return false; + group_by_aggregations.emplace_back(name_str, std::move(value)); + } } } else if (mode == TTLMode::DELETE && s_where.ignore(pos)) @@ -1552,8 +1545,11 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) if (where_expr) ttl_element->setWhere(std::move(where_expr)); - ttl_element->group_by_key_columns = std::move(group_by_key_columns); - ttl_element->group_by_aggregations = std::move(group_by_aggregations); + if (mode == TTLMode::GROUP_BY) + { + ttl_element->group_by_key = std::move(ast_group_by_key->children); + ttl_element->group_by_aggregations = std::move(group_by_aggregations); + } node = ttl_element; return true; diff --git a/src/Parsers/ExpressionListParsers.cpp b/src/Parsers/ExpressionListParsers.cpp index 2561bf7ef83..a967ae19691 100644 --- a/src/Parsers/ExpressionListParsers.cpp +++ b/src/Parsers/ExpressionListParsers.cpp @@ -742,13 +742,4 @@ bool ParserKeyValuePairsList::parseImpl(Pos & pos, ASTPtr & node, Expected & exp return parser.parse(pos, node, expected); } - -bool ParserIdentifierList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) -{ - return ParserList( - std::make_unique(), - std::make_unique(TokenType::Comma)) - .parse(pos, node, expected); -} - } diff --git a/src/Parsers/ExpressionListParsers.h b/src/Parsers/ExpressionListParsers.h index 9cc36967bd7..0cef29b6d67 100644 --- a/src/Parsers/ExpressionListParsers.h +++ b/src/Parsers/ExpressionListParsers.h @@ -421,13 +421,4 @@ protected: bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; }; - -/** A comma-separated list of identifiers, probably empty. */ -class ParserIdentifierList : public IParserBase -{ -protected: - const char * getName() const override { return "list of identifiers"; } - bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; -}; - } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index bb9d8ce186e..5de3d204f16 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -646,22 +646,70 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, } else if (ttl_element->mode == TTLMode::GROUP_BY) { - if (ttl_element->group_by_key_columns.size() > this->primary_key_columns.size()) + if (ttl_element->group_by_key.size() > this->primary_key_columns.size()) throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); - for (size_t i = 0; i < ttl_element->group_by_key_columns.size(); ++i) + + NameSet primary_key_columns_set(this->primary_key_columns.begin(), this->primary_key_columns.end()); + NameSet aggregation_columns_set; + + for (const auto & column : this->primary_key_expr->getRequiredColumns()) + primary_key_columns_set.insert(column); + + for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i) { - if (ttl_element->group_by_key_columns[i] != this->primary_key_columns[i]) + if (ttl_element->group_by_key[i]->getColumnName() != this->primary_key_columns[i]) throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); } + for (const auto & [name, value] : ttl_element->group_by_aggregations) + { + if (primary_key_columns_set.contains(name)) + throw Exception("Can not set custom aggregation for column in primary key in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION); + aggregation_columns_set.insert(name); + } + if (aggregation_columns_set.size() != ttl_element->group_by_aggregations.size()) + throw Exception("Multiple aggregations set for one column in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION); - result.group_by_keys = ttl_element->group_by_key_columns; + result.group_by_keys = Names(this->primary_key_columns.begin(), this->primary_key_columns.begin() + ttl_element->group_by_key.size()); auto aggregations = ttl_element->group_by_aggregations; - for (size_t i = ttl_element->group_by_key_columns.size(); i < this->primary_key_columns.size(); ++i) + for (size_t i = 0; i < this->primary_key_columns.size(); ++i) { - ASTPtr expr = makeASTFunction("max", std::make_shared(this->primary_key_columns[i])); - aggregations.emplace_back(this->primary_key_columns[i], std::move(expr)); + ASTPtr value = this->primary_key_expr_ast->children[i]->clone(); + + if (i >= ttl_element->group_by_key.size()) + { + ASTPtr value_max = makeASTFunction("max", value->clone()); + aggregations.emplace_back(value->getColumnName(), std::move(value_max)); + } + + if (value->as()) + { + auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true); + auto expr_actions = ExpressionAnalyzer(value, syntax_result, global_context).getActions(false); + for (const auto & column : expr_actions->getRequiredColumns()) + { + if (i < ttl_element->group_by_key.size()) + { + ASTPtr expr = makeASTFunction("any", std::make_shared(column)); + aggregations.emplace_back(column, std::move(expr)); + } + else + { + ASTPtr expr = makeASTFunction("argMax", std::make_shared(column), value->clone()); + aggregations.emplace_back(column, std::move(expr)); + } + } + } } + for (const auto & column : new_columns.getAllPhysical()) + { + if (!primary_key_columns_set.contains(column.name) && !aggregation_columns_set.contains(column.name)) + { + ASTPtr expr = makeASTFunction("any", std::make_shared(column.name)); + aggregations.emplace_back(column.name, std::move(expr)); + } + } + for (auto [name, value] : aggregations) { auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true); diff --git a/tests/queries/0_stateless/01280_ttl_where_group_by.reference b/tests/queries/0_stateless/01280_ttl_where_group_by.reference index 572fc7731d3..dead0a5aac3 100644 --- a/tests/queries/0_stateless/01280_ttl_where_group_by.reference +++ b/tests/queries/0_stateless/01280_ttl_where_group_by.reference @@ -12,3 +12,9 @@ 1 3 10 6 2 1 0 3 3 5 8 2 +1 1 0 4 +3 3 13 9 +1 2 7 5 +2 3 6 5 +1 2 3 5 +2 3 3 5 diff --git a/tests/queries/0_stateless/01280_ttl_where_group_by.sql b/tests/queries/0_stateless/01280_ttl_where_group_by.sql index 0ca06e77001..e61716cfe81 100644 --- a/tests/queries/0_stateless/01280_ttl_where_group_by.sql +++ b/tests/queries/0_stateless/01280_ttl_where_group_by.sql @@ -42,4 +42,46 @@ insert into ttl_01280_3 values (3, 2, 8, 2, now() + 1); insert into ttl_01280_3 values (3, 5, 5, 8, now()); select sleep(2.1) format Null; optimize table ttl_01280_3 final; -select a, b, x, y from ttl_01280_3; \ No newline at end of file +select a, b, x, y from ttl_01280_3; + +drop table if exists ttl_01280_4; + +create table ttl_01280_4 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), -(a + b)) ttl d + interval 1 second group by toDate(d) set x = sum(x), y = max(y); +insert into ttl_01280_4 values (1, 1, 0, 4, now() + 10); +insert into ttl_01280_4 values (10, 2, 3, 3, now()); +insert into ttl_01280_4 values (2, 10, 1, 7, now()); +insert into ttl_01280_4 values (3, 3, 5, 2, now()); +insert into ttl_01280_4 values (1, 5, 4, 9, now()); +select sleep(1.1) format Null; +optimize table ttl_01280_4 final; +select a, b, x, y from ttl_01280_4; + +drop table if exists ttl_01280_5; + +create table ttl_01280_5 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a set x = sum(x); +insert into ttl_01280_5 values (1, 2, 3, 5, now()); +insert into ttl_01280_5 values (2, 10, 1, 5, now()); +insert into ttl_01280_5 values (2, 3, 5, 5, now()); +insert into ttl_01280_5 values (1, 5, 4, 5, now()); +select sleep(1.1) format Null; +optimize table ttl_01280_5 final; +select a, b, x, y from ttl_01280_5; + +drop table if exists ttl_01280_6; + +create table ttl_01280_6 (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a, -b) ttl d + interval 1 second group by toDate(d), a; +insert into ttl_01280_6 values (1, 2, 3, 5, now()); +insert into ttl_01280_6 values (2, 10, 3, 5, now()); +insert into ttl_01280_6 values (2, 3, 3, 5, now()); +insert into ttl_01280_6 values (1, 5, 3, 5, now()); +select sleep(1.1) format Null; +optimize table ttl_01280_6 final; +select a, b, x, y from ttl_01280_6; + +create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by x set y = max(y); -- { serverError 450} +create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by b set y = max(y); -- { serverError 450} +create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b, x set y = max(y); -- { serverError 450} +create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a set b = min(b), y = max(y); -- { serverError 450} +create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (a, b) ttl d + interval 1 second group by a, b set y = max(y), y = max(y); -- { serverError 450} +create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (toDate(d), a) ttl d + interval 1 second group by toDate(d), a set d = min(d), b = max(b); -- { serverError 450} +create table ttl_01280_error (a Int, b Int, x Int64, y Int64, d DateTime) engine = MergeTree order by (d, -(a + b)) ttl d + interval 1 second group by d, -(a + b) set a = sum(a), b = min(b); -- { serverError 450} From e182d4df2ef95c59527bb06b162d92c9af3fd341 Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Mon, 25 May 2020 01:52:18 +0300 Subject: [PATCH 12/14] Fix context usage --- src/DataStreams/TTLBlockInputStream.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/DataStreams/TTLBlockInputStream.cpp b/src/DataStreams/TTLBlockInputStream.cpp index 106ae1fd5a0..f11f6ea38cd 100644 --- a/src/DataStreams/TTLBlockInputStream.cpp +++ b/src/DataStreams/TTLBlockInputStream.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -87,11 +88,9 @@ TTLBlockInputStream::TTLBlockInputStream( const Settings & settings = storage.global_context.getSettingsRef(); - bool allow_to_use_two_level_group_by = false; // settings.max_bytes_before_external_group_by != 0; Aggregator::Params params(header, keys, aggregates, false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, - allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), - allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), + SettingUInt64(0), SettingUInt64(0), settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, storage.global_context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); aggregator = std::make_unique(params); From f62adfd588b579e51f76ac78fe19e66df250ff8b Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 27 May 2020 04:52:02 +0300 Subject: [PATCH 13/14] Fix Arcadia build --- src/Storages/MergeTree/MergeTreeData.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 5de3d204f16..9a1b4f27872 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -662,7 +662,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, } for (const auto & [name, value] : ttl_element->group_by_aggregations) { - if (primary_key_columns_set.contains(name)) + if (primary_key_columns_set.count(name)) throw Exception("Can not set custom aggregation for column in primary key in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION); aggregation_columns_set.insert(name); } @@ -703,7 +703,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, } for (const auto & column : new_columns.getAllPhysical()) { - if (!primary_key_columns_set.contains(column.name) && !aggregation_columns_set.contains(column.name)) + if (!primary_key_columns_set.count(column.name) && !aggregation_columns_set.count(column.name)) { ASTPtr expr = makeASTFunction("any", std::make_shared(column.name)); aggregations.emplace_back(column.name, std::move(expr)); From 57555dbabf11780ca751e8f89a8e8f3787294d90 Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Wed, 27 May 2020 14:00:17 +0300 Subject: [PATCH 14/14] Fix after rebase --- src/Storages/MergeTree/MergeTreeData.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 9a1b4f27872..da351fc7750 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -646,18 +646,18 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, } else if (ttl_element->mode == TTLMode::GROUP_BY) { - if (ttl_element->group_by_key.size() > this->primary_key_columns.size()) + if (ttl_element->group_by_key.size() > this->getPrimaryKey().column_names.size()) throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); - NameSet primary_key_columns_set(this->primary_key_columns.begin(), this->primary_key_columns.end()); + NameSet primary_key_columns_set(this->getPrimaryKey().column_names.begin(), this->getPrimaryKey().column_names.end()); NameSet aggregation_columns_set; - for (const auto & column : this->primary_key_expr->getRequiredColumns()) + for (const auto & column : this->getPrimaryKey().expression->getRequiredColumns()) primary_key_columns_set.insert(column); for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i) { - if (ttl_element->group_by_key[i]->getColumnName() != this->primary_key_columns[i]) + if (ttl_element->group_by_key[i]->getColumnName() != this->getPrimaryKey().column_names[i]) throw Exception("TTL Expression GROUP BY key should be a prefix of primary key", ErrorCodes::BAD_TTL_EXPRESSION); } for (const auto & [name, value] : ttl_element->group_by_aggregations) @@ -669,12 +669,12 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, if (aggregation_columns_set.size() != ttl_element->group_by_aggregations.size()) throw Exception("Multiple aggregations set for one column in TTL Expression", ErrorCodes::BAD_TTL_EXPRESSION); - result.group_by_keys = Names(this->primary_key_columns.begin(), this->primary_key_columns.begin() + ttl_element->group_by_key.size()); + result.group_by_keys = Names(this->getPrimaryKey().column_names.begin(), this->getPrimaryKey().column_names.begin() + ttl_element->group_by_key.size()); auto aggregations = ttl_element->group_by_aggregations; - for (size_t i = 0; i < this->primary_key_columns.size(); ++i) + for (size_t i = 0; i < this->getPrimaryKey().column_names.size(); ++i) { - ASTPtr value = this->primary_key_expr_ast->children[i]->clone(); + ASTPtr value = this->getPrimaryKey().expression_list_ast->children[i]->clone(); if (i >= ttl_element->group_by_key.size()) {