#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int BAD_TTL_EXPRESSION; } TTLAggregateDescription::TTLAggregateDescription(const TTLAggregateDescription & other) : column_name(other.column_name) , expression_result_column_name(other.expression_result_column_name) { if (other.expression) expression = other.expression->clone(); } TTLAggregateDescription & TTLAggregateDescription::operator=(const TTLAggregateDescription & other) { if (&other == this) return *this; column_name = other.column_name; expression_result_column_name = other.expression_result_column_name; if (other.expression) expression = other.expression->clone(); else expression.reset(); return *this; } namespace { void checkTTLExpression(const ExpressionActionsPtr & ttl_expression, const String & result_column_name, bool allow_suspicious) { /// Do not apply this check in ATTACH queries for compatibility reasons and if explicitly allowed. if (!allow_suspicious) { if (ttl_expression->getRequiredColumns().empty()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "TTL expression {} does not depend on any of the columns of the table", result_column_name); for (const auto & action : ttl_expression->getActions()) { if (action.node->type == ActionsDAG::ActionType::FUNCTION) { const IFunctionBase & func = *action.node->function_base; if (!func.isDeterministic()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "TTL expression cannot contain non-deterministic functions, but contains function {}", func.getName()); } } } const auto & result_column = ttl_expression->getSampleBlock().getByName(result_column_name); if (!typeid_cast(result_column.type.get()) && !typeid_cast(result_column.type.get())) { throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, "TTL expression result column should have DateTime or Date type, but has {}", result_column.type->getName()); } } class FindAggregateFunctionData { public: using TypeToVisit = ASTFunction; bool has_aggregate_function = false; void visit(const ASTFunction & func, ASTPtr &) { /// Do not throw if found aggregate function inside another aggregate function, /// because it will be checked, while creating expressions. if (AggregateUtils::isAggregateFunction(func)) has_aggregate_function = true; } }; using FindAggregateFunctionFinderMatcher = OneTypeMatcher; using FindAggregateFunctionVisitor = InDepthNodeVisitor; } TTLDescription::TTLDescription(const TTLDescription & other) : mode(other.mode) , expression_ast(other.expression_ast ? other.expression_ast->clone() : nullptr) , expression_columns(other.expression_columns) , result_column(other.result_column) , where_expression_ast(other.where_expression_ast ? other.where_expression_ast->clone() : nullptr) , where_expression_columns(other.where_expression_columns) , where_result_column(other.where_result_column) , group_by_keys(other.group_by_keys) , set_parts(other.set_parts) , aggregate_descriptions(other.aggregate_descriptions) , destination_type(other.destination_type) , destination_name(other.destination_name) , if_exists(other.if_exists) , recompression_codec(other.recompression_codec) { } TTLDescription & TTLDescription::operator=(const TTLDescription & other) { if (&other == this) return *this; mode = other.mode; if (other.expression_ast) expression_ast = other.expression_ast->clone(); else expression_ast.reset(); expression_columns = other.expression_columns; result_column = other.result_column; if (other.where_expression_ast) where_expression_ast = other.where_expression_ast->clone(); else where_expression_ast.reset(); where_expression_columns = other.where_expression_columns; where_result_column = other.where_result_column; group_by_keys = other.group_by_keys; set_parts = other.set_parts; aggregate_descriptions = other.aggregate_descriptions; destination_type = other.destination_type; destination_name = other.destination_name; if_exists = other.if_exists; if (other.recompression_codec) recompression_codec = other.recompression_codec->clone(); else recompression_codec.reset(); return * this; } static ExpressionAndSets buildExpressionAndSets(ASTPtr & ast, const NamesAndTypesList & columns, const ContextPtr & context) { ExpressionAndSets result; auto ttl_string = queryToString(ast); auto syntax_analyzer_result = TreeRewriter(context).analyze(ast, columns); ExpressionAnalyzer analyzer(ast, syntax_analyzer_result, context); auto dag = analyzer.getActionsDAG(false); const auto * col = &dag->findInOutputs(ast->getColumnName()); if (col->result_name != ttl_string) col = &dag->addAlias(*col, ttl_string); dag->getOutputs() = {col}; dag->removeUnusedActions(); result.expression = std::make_shared(dag, ExpressionActionsSettings::fromContext(context)); result.sets = analyzer.getPreparedSets(); return result; } ExpressionAndSets TTLDescription::buildExpression(const ContextPtr & context) const { auto ast = expression_ast->clone(); return buildExpressionAndSets(ast, expression_columns, context); } ExpressionAndSets TTLDescription::buildWhereExpression(const ContextPtr & context) const { if (where_expression_ast) { auto ast = where_expression_ast->clone(); return buildExpressionAndSets(ast, where_expression_columns, context); } return {}; } TTLDescription TTLDescription::getTTLFromAST( const ASTPtr & definition_ast, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key, bool is_attach) { TTLDescription result; const auto * ttl_element = definition_ast->as(); /// First child is expression: `TTL expr TO DISK` if (ttl_element != nullptr) result.expression_ast = ttl_element->children.front()->clone(); else /// It's columns TTL without any additions, just copy it result.expression_ast = definition_ast->clone(); auto ttl_ast = result.expression_ast->clone(); auto expression = buildExpressionAndSets(ttl_ast, columns.getAllPhysical(), context).expression; result.expression_columns = expression->getRequiredColumnsWithTypes(); result.result_column = expression->getSampleBlock().safeGetByPosition(0).name; ExpressionActionsPtr where_expression; if (ttl_element == nullptr) /// columns TTL { result.destination_type = DataDestinationType::DELETE; result.mode = TTLMode::DELETE; } else /// rows TTL { result.mode = ttl_element->mode; result.destination_type = ttl_element->destination_type; result.destination_name = ttl_element->destination_name; result.if_exists = ttl_element->if_exists; if (ttl_element->mode == TTLMode::DELETE) { if (ASTPtr where_expr_ast = ttl_element->where()) { result.where_expression_ast = where_expr_ast->clone(); where_expression = buildExpressionAndSets(where_expr_ast, columns.getAllPhysical(), context).expression; result.where_expression_columns = where_expression->getRequiredColumnsWithTypes(); result.where_result_column = where_expression->getSampleBlock().safeGetByPosition(0).name; } } else if (ttl_element->mode == TTLMode::GROUP_BY) { const auto & pk_columns = primary_key.column_names; if (ttl_element->group_by_key.size() > pk_columns.size()) throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, "TTL Expression GROUP BY key should be a prefix of primary key"); NameSet aggregation_columns_set; NameSet used_primary_key_columns_set; for (size_t i = 0; i < ttl_element->group_by_key.size(); ++i) { if (ttl_element->group_by_key[i]->getColumnName() != pk_columns[i]) throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, "TTL Expression GROUP BY key should be a prefix of primary key"); used_primary_key_columns_set.insert(pk_columns[i]); } std::vector> aggregations; for (const auto & ast : ttl_element->group_by_assignments) { const auto assignment = ast->as(); auto ass_expression = assignment.expression(); FindAggregateFunctionVisitor::Data data{false}; FindAggregateFunctionVisitor(data).visit(ass_expression); if (!data.has_aggregate_function) throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, "Invalid expression for assignment of column {}. Should contain an aggregate function", assignment.column_name); ass_expression = addTypeConversionToAST(std::move(ass_expression), columns.getPhysical(assignment.column_name).type->getName()); aggregations.emplace_back(assignment.column_name, std::move(ass_expression)); aggregation_columns_set.insert(assignment.column_name); } if (aggregation_columns_set.size() != ttl_element->group_by_assignments.size()) throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, "Multiple aggregations set for one column in TTL Expression"); result.group_by_keys = Names(pk_columns.begin(), pk_columns.begin() + ttl_element->group_by_key.size()); const auto & primary_key_expressions = primary_key.expression_list_ast->children; /// Wrap with 'any' aggregate function primary key columns, /// which are not in 'GROUP BY' key and was not set explicitly. /// The separate step, because not all primary key columns are ordinary columns. for (size_t i = ttl_element->group_by_key.size(); i < primary_key_expressions.size(); ++i) { if (!aggregation_columns_set.contains(pk_columns[i])) { ASTPtr expr = makeASTFunction("any", primary_key_expressions[i]->clone()); aggregations.emplace_back(pk_columns[i], std::move(expr)); aggregation_columns_set.insert(pk_columns[i]); } } /// Wrap with 'any' aggregate function other columns, which was not set explicitly. for (const auto & column : columns.getOrdinary()) { if (!aggregation_columns_set.contains(column.name) && !used_primary_key_columns_set.contains(column.name)) { ASTPtr expr = makeASTFunction("any", std::make_shared(column.name)); aggregations.emplace_back(column.name, std::move(expr)); } } for (auto [name, value] : aggregations) { auto syntax_result = TreeRewriter(context).analyze(value, columns.getAllPhysical(), {}, {}, true); auto expr_analyzer = ExpressionAnalyzer(value, syntax_result, context); TTLAggregateDescription set_part; set_part.column_name = name; set_part.expression_result_column_name = value->getColumnName(); set_part.expression = expr_analyzer.getActions(false); result.set_parts.emplace_back(set_part); for (const auto & descr : expr_analyzer.getAnalyzedData().aggregate_descriptions) result.aggregate_descriptions.push_back(descr); } } else if (ttl_element->mode == TTLMode::RECOMPRESS) { result.recompression_codec = CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST( ttl_element->recompression_codec, {}, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_deflate_qpl_codec, context->getSettingsRef().enable_zstd_qat_codec); } } checkTTLExpression(expression, result.result_column, is_attach || context->getSettingsRef().allow_suspicious_ttl_expressions); return result; } TTLTableDescription::TTLTableDescription(const TTLTableDescription & other) : definition_ast(other.definition_ast ? other.definition_ast->clone() : nullptr) , rows_ttl(other.rows_ttl) , rows_where_ttl(other.rows_where_ttl) , move_ttl(other.move_ttl) , recompression_ttl(other.recompression_ttl) , group_by_ttl(other.group_by_ttl) { } TTLTableDescription & TTLTableDescription::operator=(const TTLTableDescription & other) { if (&other == this) return *this; if (other.definition_ast) definition_ast = other.definition_ast->clone(); else definition_ast.reset(); rows_ttl = other.rows_ttl; rows_where_ttl = other.rows_where_ttl; move_ttl = other.move_ttl; recompression_ttl = other.recompression_ttl; group_by_ttl = other.group_by_ttl; return *this; } TTLTableDescription TTLTableDescription::getTTLForTableFromAST( const ASTPtr & definition_ast, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key, bool is_attach) { TTLTableDescription result; if (!definition_ast) return result; result.definition_ast = definition_ast->clone(); bool have_unconditional_delete_ttl = false; for (const auto & ttl_element_ptr : definition_ast->children) { auto ttl = TTLDescription::getTTLFromAST(ttl_element_ptr, columns, context, primary_key, is_attach); if (ttl.mode == TTLMode::DELETE) { if (!ttl.where_expression_ast) { if (have_unconditional_delete_ttl) throw Exception(ErrorCodes::BAD_TTL_EXPRESSION, "More than one DELETE TTL expression without WHERE expression is not allowed"); have_unconditional_delete_ttl = true; result.rows_ttl = ttl; } else { result.rows_where_ttl.emplace_back(std::move(ttl)); } } else if (ttl.mode == TTLMode::RECOMPRESS) { result.recompression_ttl.emplace_back(std::move(ttl)); } else if (ttl.mode == TTLMode::GROUP_BY) { result.group_by_ttl.emplace_back(std::move(ttl)); } else { result.move_ttl.emplace_back(std::move(ttl)); } } return result; } TTLTableDescription TTLTableDescription::parse(const String & str, const ColumnsDescription & columns, ContextPtr context, const KeyDescription & primary_key) { TTLTableDescription result; if (str.empty()) return result; ParserTTLExpressionList parser; ASTPtr ast = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH, DBMS_DEFAULT_MAX_PARSER_BACKTRACKS); FunctionNameNormalizer().visit(ast.get()); return getTTLForTableFromAST(ast, columns, context, primary_key, context->getSettingsRef().allow_suspicious_ttl_expressions); } }