From 61974e0047e9826ab84d5d4b07f93ec065b285dc Mon Sep 17 00:00:00 2001 From: Nikolai Sorokin Date: Mon, 27 Apr 2020 17:47:59 +0300 Subject: [PATCH] TTL Expressions WHERE and GROUP BY draft --- src/DataStreams/TTLBlockInputStream.cpp | 225 +++++++++++++++++++++-- src/DataStreams/TTLBlockInputStream.h | 8 + src/Interpreters/ExpressionAnalyzer.cpp | 9 +- src/Interpreters/SyntaxAnalyzer.cpp | 17 +- src/Interpreters/SyntaxAnalyzer.h | 2 +- src/Parsers/ASTTTLElement.cpp | 67 ++++++- src/Parsers/ASTTTLElement.h | 38 +++- src/Parsers/ExpressionElementParsers.cpp | 84 ++++++++- src/Parsers/ExpressionListParsers.cpp | 9 + src/Parsers/ExpressionListParsers.h | 11 +- src/Storages/MergeTree/MergeTreeData.cpp | 44 ++++- src/Storages/MergeTree/MergeTreeData.h | 9 + 12 files changed, 479 insertions(+), 44 deletions(-) diff --git a/src/DataStreams/TTLBlockInputStream.cpp b/src/DataStreams/TTLBlockInputStream.cpp index d2d5d6a92f9..d816cba4b2f 100644 --- a/src/DataStreams/TTLBlockInputStream.cpp +++ b/src/DataStreams/TTLBlockInputStream.cpp @@ -67,6 +67,30 @@ TTLBlockInputStream::TTLBlockInputStream( default_expr_list, storage.getColumns().getAllPhysical()); defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true); } + + if (storage.hasRowsTTL() && !storage.rows_ttl_entry.group_by_keys.empty()) + { + ColumnNumbers keys; + for (const auto & key : storage.rows_ttl_entry.group_by_keys) + keys.push_back(header.getPositionByName(key)); + agg_key_columns.resize(storage.rows_ttl_entry.group_by_keys.size()); + + AggregateDescriptions aggregates = storage.rows_ttl_entry.aggregate_descriptions; + for (auto & descr : aggregates) + if (descr.arguments.empty()) + for (const auto & name : descr.argument_names) + descr.arguments.push_back(header.getPositionByName(name)); + agg_aggregate_columns.resize(storage.rows_ttl_entry.aggregate_descriptions.size()); + + const Settings & settings = storage.global_context.getSettingsRef(); + + Aggregator::Params params(header, keys, aggregates, + false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, + SettingUInt64(0), SettingUInt64(0), + settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, + storage.global_context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data); + aggregator = std::make_unique(params); + } } bool TTLBlockInputStream::isTTLExpired(time_t ttl) const @@ -77,7 +101,8 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl) const Block TTLBlockInputStream::readImpl() { /// Skip all data if table ttl is expired for part - if (storage.hasRowsTTL() && isTTLExpired(old_ttl_infos.table_ttl.max)) + if (storage.hasRowsTTL() && !storage.rows_ttl_entry.where_expression && + storage.rows_ttl_entry.group_by_keys.empty() && isTTLExpired(old_ttl_infos.table_ttl.max)) { rows_removed = data_part->rows_count; return {}; @@ -85,7 +110,43 @@ Block TTLBlockInputStream::readImpl() Block block = children.at(0)->read(); if (!block) + { + if (aggregator && !agg_result.empty()) + { + MutableColumns result_columns; + const auto & column_names = header.getNames(); + for (const auto & column_name : column_names) + { + const IColumn * values_column = header.getByName(column_name).column.get(); + MutableColumnPtr result_column = values_column->cloneEmpty(); + result_columns.emplace_back(std::move(result_column)); + } + + auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1); + for (auto & agg_block : aggregated_res) + { + for (const auto & [name, actions] : storage.rows_ttl_entry.group_by_aggregations) + actions->execute(agg_block); + for (const auto & name : storage.rows_ttl_entry.group_by_keys) + { + const IColumn * values_column = agg_block.getByName(name).column.get(); + auto & result_column = result_columns[header.getPositionByName(name)]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + for (const auto & [name, res_column] : storage.rows_ttl_entry.group_by_aggregations_res_column) + { + const IColumn * values_column = agg_block.getByName(res_column).column.get(); + auto & result_column = result_columns[header.getPositionByName(name)]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + } + + block = header.cloneWithColumns(std::move(result_columns)); + agg_result.invalidate(); + } + return block; + } if (storage.hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min))) removeRowsWithExpiredTableTTL(block); @@ -114,35 +175,171 @@ void TTLBlockInputStream::readSuffixImpl() void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) { storage.rows_ttl_entry.expression->execute(block); + if (storage.rows_ttl_entry.where_expression) + storage.rows_ttl_entry.where_expression->execute(block); const IColumn * ttl_column = block.getByName(storage.rows_ttl_entry.result_column).column.get(); + const IColumn * where_filter_column = storage.rows_ttl_entry.where_expression ? + block.getByName(storage.rows_ttl_entry.where_filter_column).column.get() : nullptr; + const auto & column_names = header.getNames(); - MutableColumns result_columns; - result_columns.reserve(column_names.size()); - for (auto it = column_names.begin(); it != column_names.end(); ++it) + if (!aggregator) { - const IColumn * values_column = block.getByName(*it).column.get(); - MutableColumnPtr result_column = values_column->cloneEmpty(); - result_column->reserve(block.rows()); + MutableColumns result_columns; + result_columns.reserve(column_names.size()); + for (auto it = column_names.begin(); it != column_names.end(); ++it) + { + const IColumn * values_column = block.getByName(*it).column.get(); + MutableColumnPtr result_column = values_column->cloneEmpty(); + result_column->reserve(block.rows()); + for (size_t i = 0; i < block.rows(); ++i) + { + UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); + bool where_filter_passed = !where_filter_column || where_filter_column->getBool(i); + if (!isTTLExpired(cur_ttl) || !where_filter_passed) + { + new_ttl_infos.table_ttl.update(cur_ttl); + result_column->insertFrom(*values_column, i); + } + else if (it == column_names.begin()) + ++rows_removed; + } + result_columns.emplace_back(std::move(result_column)); + } + block = header.cloneWithColumns(std::move(result_columns)); + } + else + { + MutableColumns result_columns; + MutableColumns aggregate_columns; + + for (const auto & column_name : column_names) + { + const IColumn * values_column = block.getByName(column_name).column.get(); + MutableColumnPtr result_column = values_column->cloneEmpty(); + result_column->reserve(block.rows()); + result_columns.emplace_back(std::move(result_column)); + + MutableColumnPtr aggregate_column = values_column->cloneEmpty(); + aggregate_column->reserve(block.rows()); + aggregate_columns.emplace_back(std::move(aggregate_column)); + } + + size_t rows_aggregated = 0; + size_t current_key_start = 0; + size_t rows_with_current_key = 0; for (size_t i = 0; i < block.rows(); ++i) { UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); - if (!isTTLExpired(cur_ttl)) + bool where_filter_passed = !where_filter_column || where_filter_column->getBool(i); + bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed; + + bool same_as_current = true; + if (current_key_value.empty()) + { + same_as_current = false; + current_key_value.resize(storage.rows_ttl_entry.group_by_keys.size()); + } + for (size_t j = 0; j < storage.rows_ttl_entry.group_by_keys.size(); ++j) + { + const String & key_column = storage.rows_ttl_entry.group_by_keys[j]; + const IColumn * values_column = block.getByName(key_column).column.get(); + if (!same_as_current) + values_column->get(i, current_key_value[j]); + else + { + Field value; + values_column->get(i, value); + if (value != current_key_value[j]) + { + current_key_value[j] = value; + same_as_current = false; + } + } + } + if (!same_as_current) + { + if (rows_with_current_key) + { + Columns aggregate_chunk; + aggregate_chunk.reserve(aggregate_columns.size()); + for (const auto & name : column_names) + { + const auto & column = aggregate_columns[header.getPositionByName(name)]; + ColumnPtr chunk_column = column->cut(current_key_start, rows_with_current_key); + aggregate_chunk.emplace_back(std::move(chunk_column)); + } + aggregator->executeOnBlock(aggregate_chunk, rows_with_current_key, agg_result, agg_key_columns, + agg_aggregate_columns, agg_no_more_keys); + } + if (!agg_result.empty()) + { + auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1); + for (auto & agg_block : aggregated_res) + { + for (const auto & [name, actions] : storage.rows_ttl_entry.group_by_aggregations) + actions->execute(agg_block); + for (const auto & name : storage.rows_ttl_entry.group_by_keys) + { + const IColumn * values_column = agg_block.getByName(name).column.get(); + auto & result_column = result_columns[header.getPositionByName(name)]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + for (const auto & [name, res_column] : storage.rows_ttl_entry.group_by_aggregations_res_column) + { + const IColumn * values_column = agg_block.getByName(res_column).column.get(); + auto & result_column = result_columns[header.getPositionByName(name)]; + result_column->insertRangeFrom(*values_column, 0, agg_block.rows()); + } + } + } + agg_result.invalidate(); + current_key_start = rows_aggregated; + rows_with_current_key = 0; + } + if (ttl_expired) + { + ++rows_with_current_key; + ++rows_aggregated; + for (const auto & name : column_names) + { + const IColumn * values_column = block.getByName(name).column.get(); + auto & column = aggregate_columns[header.getPositionByName(name)]; + column->insertFrom(*values_column, i); + } + } + else { new_ttl_infos.table_ttl.update(cur_ttl); - result_column->insertFrom(*values_column, i); + for (const auto & name : column_names) + { + const IColumn * values_column = block.getByName(name).column.get(); + auto & column = result_columns[header.getPositionByName(name)]; + column->insertFrom(*values_column, i); + } } - else if (it == column_names.begin()) - ++rows_removed; } - result_columns.emplace_back(std::move(result_column)); - } - block = header.cloneWithColumns(std::move(result_columns)); + if (rows_with_current_key) + { + Columns aggregate_chunk; + aggregate_chunk.reserve(aggregate_columns.size()); + for (const auto & name : column_names) + { + const auto & column = aggregate_columns[header.getPositionByName(name)]; + ColumnPtr chunk_column = column->cut(current_key_start, rows_with_current_key); + aggregate_chunk.emplace_back(std::move(chunk_column)); + } + aggregator->executeOnBlock(aggregate_chunk, rows_with_current_key, agg_result, agg_key_columns, + agg_aggregate_columns, agg_no_more_keys); + } + + block = header.cloneWithColumns(std::move(result_columns)); + } } void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block) diff --git a/src/DataStreams/TTLBlockInputStream.h b/src/DataStreams/TTLBlockInputStream.h index 3896e5232f8..821f8dd9284 100644 --- a/src/DataStreams/TTLBlockInputStream.h +++ b/src/DataStreams/TTLBlockInputStream.h @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -39,6 +40,13 @@ private: time_t current_time; bool force; + std::unique_ptr aggregator; + std::vector current_key_value; + AggregatedDataVariants agg_result; + ColumnRawPtrs agg_key_columns; + Aggregator::AggregateColumns agg_aggregate_columns; + bool agg_no_more_keys; + IMergeTreeDataPart::TTLInfos old_ttl_infos; IMergeTreeDataPart::TTLInfos new_ttl_infos; NameSet empty_columns; diff --git a/src/Interpreters/ExpressionAnalyzer.cpp b/src/Interpreters/ExpressionAnalyzer.cpp index 3341855b8c6..c432b920a9f 100644 --- a/src/Interpreters/ExpressionAnalyzer.cpp +++ b/src/Interpreters/ExpressionAnalyzer.cpp @@ -192,10 +192,10 @@ void ExpressionAnalyzer::analyzeAggregation() if (has_aggregation) { - getSelectQuery(); /// assertSelect() + // getSelectQuery(); /// assertSelect() /// Find out aggregation keys. - if (select_query->groupBy()) + if (select_query && select_query->groupBy()) { NameSet unique_keys; ASTs & group_asts = select_query->groupBy()->children; @@ -926,7 +926,10 @@ void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result) { - ExpressionActionsPtr actions = std::make_shared(sourceColumns(), context); + NamesAndTypesList columns(sourceColumns()); + for (const auto & col : aggregated_columns) + columns.push_back(col); + ExpressionActionsPtr actions = std::make_shared(columns, context); NamesWithAliases result_columns; Names result_names; diff --git a/src/Interpreters/SyntaxAnalyzer.cpp b/src/Interpreters/SyntaxAnalyzer.cpp index b3d566dbdc8..ec7f2154dad 100644 --- a/src/Interpreters/SyntaxAnalyzer.cpp +++ b/src/Interpreters/SyntaxAnalyzer.cpp @@ -839,7 +839,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect( return std::make_shared(result); } -SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage) const +SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage, bool allow_aggregates) const { if (query->as()) throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR); @@ -855,7 +855,20 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTy optimizeIf(query, result.aliases, settings.optimize_if_chain_to_miltiif); - assertNoAggregates(query, "in wrong place"); + if (allow_aggregates) + { + GetAggregatesVisitor::Data data; + GetAggregatesVisitor(data).visit(query); + + /// There can not be other aggregate functions within the aggregate functions. + for (const ASTFunction * node : data.aggregates) + for (auto & arg : node->arguments->children) + assertNoAggregates(arg, "inside another aggregate function"); + result.aggregates = data.aggregates; + } + else + assertNoAggregates(query, "in wrong place"); + result.collectUsedColumns(query); return std::make_shared(result); } diff --git a/src/Interpreters/SyntaxAnalyzer.h b/src/Interpreters/SyntaxAnalyzer.h index dda0add38db..abacb25ac4d 100644 --- a/src/Interpreters/SyntaxAnalyzer.h +++ b/src/Interpreters/SyntaxAnalyzer.h @@ -86,7 +86,7 @@ public: {} /// Analyze and rewrite not select query - SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}) const; + SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}, bool allow_aggregations = false) const; /// Analyze and rewrite select query SyntaxAnalyzerResultPtr analyzeSelect( diff --git a/src/Parsers/ASTTTLElement.cpp b/src/Parsers/ASTTTLElement.cpp index 7e03a73e36d..fb35266465b 100644 --- a/src/Parsers/ASTTTLElement.cpp +++ b/src/Parsers/ASTTTLElement.cpp @@ -7,21 +7,80 @@ namespace DB { +ASTPtr ASTTTLElement::clone() const +{ + auto clone = std::make_shared(*this); + clone->children.clear(); + clone->positions.clear(); + + for (auto expr : {Expression::TTL, Expression::WHERE}) + clone->setExpression(expr, getExpression(expr, true)); + + for (auto & [name, expr] : clone->group_by_aggregations) + expr = expr->clone(); + + return clone; +} + void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const { - children.front()->formatImpl(settings, state, frame); - if (destination_type == PartDestinationType::DISK) + ttl()->formatImpl(settings, state, frame); + if (mode == Mode::MOVE && destination_type == PartDestinationType::DISK) { settings.ostr << " TO DISK " << quoteString(destination_name); } - else if (destination_type == PartDestinationType::VOLUME) + else if (mode == Mode::MOVE && destination_type == PartDestinationType::VOLUME) { settings.ostr << " TO VOLUME " << quoteString(destination_name); } - else if (destination_type == PartDestinationType::DELETE) + else if (mode == Mode::GROUP_BY) + { + settings.ostr << " GROUP BY "; + for (size_t i = 0; i < group_by_key_columns.size(); ++i) + { + settings.ostr << group_by_key_columns[i]; + if (i + 1 != group_by_key_columns.size()) + settings.ostr << ", "; + } + settings.ostr << " SET "; + for (size_t i = 0; i < group_by_aggregations.size(); ++i) + { + settings.ostr << group_by_aggregations[i].first << " = "; + group_by_aggregations[i].second->formatImpl(settings, state, frame); + if (i + 1 != group_by_aggregations.size()) + settings.ostr << ", "; + } + } + else if (mode == Mode::DELETE) { /// It would be better to output "DELETE" here but that will break compatibility with earlier versions. } + + if (where()) + { + settings.ostr << " WHERE "; + where()->formatImpl(settings, state, frame); + } +} + +void ASTTTLElement::setExpression(Expression expr, ASTPtr && ast) +{ + auto it = positions.find(expr); + if (it == positions.end()) + { + positions[expr] = children.size(); + children.emplace_back(ast); + } + else + children[it->second] = ast; +} + +ASTPtr ASTTTLElement::getExpression(Expression expr, bool clone) const +{ + auto it = positions.find(expr); + if (it != positions.end()) + return clone ? children[it->second]->clone() : children[it->second]; + return {}; } } diff --git a/src/Parsers/ASTTTLElement.h b/src/Parsers/ASTTTLElement.h index 02f70094e04..1a2d7d3c723 100644 --- a/src/Parsers/ASTTTLElement.h +++ b/src/Parsers/ASTTTLElement.h @@ -6,31 +6,53 @@ namespace DB { + /** Element of TTL expression. */ class ASTTTLElement : public IAST { public: + enum class Expression : uint8_t + { + TTL, + WHERE + }; + + enum class Mode : uint8_t + { + DELETE, + MOVE, + GROUP_BY + }; + + Mode mode; PartDestinationType destination_type; String destination_name; + std::vector group_by_key_columns; + std::vector> group_by_aggregations; - ASTTTLElement(PartDestinationType destination_type_, const String & destination_name_) - : destination_type(destination_type_) + ASTTTLElement(Mode mode_, PartDestinationType destination_type_, const String & destination_name_) + : mode(mode_) + , destination_type(destination_type_) , destination_name(destination_name_) { } String getID(char) const override { return "TTLElement"; } - ASTPtr clone() const override - { - auto clone = std::make_shared(*this); - clone->cloneChildren(); - return clone; - } + ASTPtr clone() const override; + + const ASTPtr ttl() const { return getExpression(Expression::TTL); } + const ASTPtr where() const { return getExpression(Expression::WHERE); } + + void setExpression(Expression expr, ASTPtr && ast); + ASTPtr getExpression(Expression expr, bool clone = false) const; protected: void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override; + +private: + std::unordered_map positions; }; } diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp index 70a8b282a72..2734aa67193 100644 --- a/src/Parsers/ExpressionElementParsers.cpp +++ b/src/Parsers/ExpressionElementParsers.cpp @@ -1455,23 +1455,50 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_to_disk("TO DISK"); ParserKeyword s_to_volume("TO VOLUME"); ParserKeyword s_delete("DELETE"); + ParserKeyword s_where("WHERE"); + ParserKeyword s_group_by("GROUP BY"); + ParserKeyword s_set("SET"); + ParserToken s_comma(TokenType::Comma); + ParserToken s_eq(TokenType::Equals); + ParserIdentifier parser_identifier; ParserStringLiteral parser_string_literal; ParserExpression parser_exp; + ParserIdentifierList parser_identifier_list; - ASTPtr expr_elem; - if (!parser_exp.parse(pos, expr_elem, expected)) + + ASTPtr ttl_expr; + if (!parser_exp.parse(pos, ttl_expr, expected)) return false; + ASTPtr where_expr; + + std::vector group_by_key_columns; + std::vector> group_by_aggregations; + + ASTTTLElement::Mode mode; PartDestinationType destination_type = PartDestinationType::DELETE; String destination_name; if (s_to_disk.ignore(pos)) + { + mode = ASTTTLElement::Mode::MOVE; destination_type = PartDestinationType::DISK; + } else if (s_to_volume.ignore(pos)) + { + mode = ASTTTLElement::Mode::MOVE; destination_type = PartDestinationType::VOLUME; + } + else if (s_group_by.ignore(pos)) + { + mode = ASTTTLElement::Mode::GROUP_BY; + } else + { s_delete.ignore(pos); + mode = ASTTTLElement::Mode::DELETE; + } - if (destination_type == PartDestinationType::DISK || destination_type == PartDestinationType::VOLUME) + if (mode == ASTTTLElement::Mode::MOVE) { ASTPtr ast_space_name; if (!parser_string_literal.parse(pos, ast_space_name, expected)) @@ -1479,10 +1506,57 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) destination_name = ast_space_name->as().value.get(); } + else if (mode == ASTTTLElement::Mode::GROUP_BY) + { + ASTPtr ast_group_by_key_columns; + if (!parser_identifier_list.parse(pos, ast_group_by_key_columns, expected)) + return false; + for (const auto identifier : ast_group_by_key_columns->children) + { + String identifier_str; + if (!tryGetIdentifierNameInto(identifier, identifier_str)) + return false; + group_by_key_columns.emplace_back(std::move(identifier_str)); + } - node = std::make_shared(destination_type, destination_name); - node->children.push_back(expr_elem); + if (!s_set.ignore(pos)) + return false; + while (true) + { + if (!group_by_aggregations.empty() && !s_comma.ignore(pos)) + break; + + ASTPtr name; + ASTPtr value; + if (!parser_identifier.parse(pos, name, expected)) + return false; + if (!s_eq.ignore(pos)) + return false; + if (!parser_exp.parse(pos, value, expected)) + return false; + String name_str; + if (!tryGetIdentifierNameInto(name, name_str)) + return false; + group_by_aggregations.emplace_back(name_str, std::move(value)); + } + } + + if ((mode == ASTTTLElement::Mode::MOVE || mode == ASTTTLElement::Mode::DELETE) && s_where.ignore(pos)) + { + if (!parser_exp.parse(pos, where_expr, expected)) + return false; + } + + auto ttl_element = std::make_shared(mode, destination_type, destination_name); + ttl_element->setExpression(ASTTTLElement::Expression::TTL, std::move(ttl_expr)); + if (where_expr) + ttl_element->setExpression(ASTTTLElement::Expression::WHERE, std::move(where_expr)); + + ttl_element->group_by_key_columns = std::move(group_by_key_columns); + ttl_element->group_by_aggregations = std::move(group_by_aggregations); + + node = ttl_element; return true; } diff --git a/src/Parsers/ExpressionListParsers.cpp b/src/Parsers/ExpressionListParsers.cpp index a967ae19691..062d1dd4483 100644 --- a/src/Parsers/ExpressionListParsers.cpp +++ b/src/Parsers/ExpressionListParsers.cpp @@ -742,4 +742,13 @@ bool ParserKeyValuePairsList::parseImpl(Pos & pos, ASTPtr & node, Expected & exp return parser.parse(pos, node, expected); } + +bool ParserIdentifierList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + return ParserList( + std::make_unique(), + std::make_unique(TokenType::Comma)) + .parse(pos, node, expected); } + +} \ No newline at end of file diff --git a/src/Parsers/ExpressionListParsers.h b/src/Parsers/ExpressionListParsers.h index 0cef29b6d67..ba939d8d160 100644 --- a/src/Parsers/ExpressionListParsers.h +++ b/src/Parsers/ExpressionListParsers.h @@ -421,4 +421,13 @@ protected: bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; }; -} + +/** A comma-separated list of identifiers, probably empty. */ +class ParserIdentifierList : public IParserBase +{ +protected: + const char * getName() const override { return "list of identifiers"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; + +} \ No newline at end of file diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 48cf3934820..9eb14ed5475 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -615,19 +615,50 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, auto new_column_ttls = new_columns.getColumnTTLs(); - auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_ast) + auto create_ttl_entry = [this, &new_columns](ASTPtr ttl_expr_ast) { TTLEntry result; - auto syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_ast, new_columns.getAllPhysical()); - result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false); + auto ttl_syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_expr_ast, new_columns.getAllPhysical()); + result.expression = ExpressionAnalyzer(ttl_expr_ast, ttl_syntax_result, global_context).getActions(false); + result.result_column = ttl_expr_ast->getColumnName(); + result.destination_type = PartDestinationType::DELETE; - result.result_column = ttl_ast->getColumnName(); checkTTLExpression(result.expression, result.result_column); return result; }; + auto create_rows_ttl_entry = [this, &new_columns, &create_ttl_entry](const ASTTTLElement * ttl_element) + { + auto result = create_ttl_entry(ttl_element->ttl()); + if (ttl_element->mode == ASTTTLElement::Mode::DELETE || ttl_element->mode ==ASTTTLElement::Mode::MOVE) + { + if (ASTPtr where_expr_ast = ttl_element->where()) + { + auto where_syntax_result = SyntaxAnalyzer(global_context).analyze(where_expr_ast, new_columns.getAllPhysical()); + result.where_expression = ExpressionAnalyzer(where_expr_ast, where_syntax_result, global_context).getActions(false); + result.where_filter_column = where_expr_ast->getColumnName(); + } + } + else if (ttl_element->mode == ASTTTLElement::Mode::GROUP_BY) + { + result.group_by_keys = ttl_element->group_by_key_columns; + for (auto [name, value] : ttl_element->group_by_aggregations) + { + auto syntax_result = SyntaxAnalyzer(global_context).analyze(value, new_columns.getAllPhysical(), {}, true); + auto expr_analyzer = ExpressionAnalyzer(value, syntax_result, global_context); + + result.group_by_aggregations.emplace_back(name, expr_analyzer.getActions(false)); + result.group_by_aggregations_res_column.emplace_back(name, value->getColumnName()); + + for (const auto descr : expr_analyzer.getAnalyzedData().aggregate_descriptions) + result.aggregate_descriptions.push_back(descr); + } + } + return result; + }; + if (!new_column_ttls.empty()) { NameSet columns_ttl_forbidden; @@ -672,7 +703,8 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION); } - auto new_rows_ttl_entry = create_ttl_entry(ttl_element->children[0]); + LOG_DEBUG(log, "ttl_element->size is " << ttl_element->size()); + auto new_rows_ttl_entry = create_rows_ttl_entry(ttl_element); if (!only_check) update_rows_ttl_entry = new_rows_ttl_entry; @@ -680,7 +712,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription & new_columns, } else { - auto new_ttl_entry = create_ttl_entry(ttl_element->children[0]); + auto new_ttl_entry = create_rows_ttl_entry(ttl_element); new_ttl_entry.entry_ast = ttl_element_ptr; new_ttl_entry.destination_type = ttl_element->destination_type; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index cd1d4912a29..dfcddb40f3c 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -652,6 +653,14 @@ public: ExpressionActionsPtr expression; String result_column; + ExpressionActionsPtr where_expression; + String where_filter_column; + + Names group_by_keys; + std::vector> group_by_aggregations; + std::vector> group_by_aggregations_res_column; + AggregateDescriptions aggregate_descriptions; + /// Name and type of a destination are only valid in table-level context. PartDestinationType destination_type; String destination_name;