From 7fbe7462b68bc7c7a6c8145d0f8612424b17550a Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 1 Dec 2023 19:12:05 +0000 Subject: [PATCH 1/5] add mutation command to apply deleted mask --- src/Interpreters/InterpreterAlterQuery.cpp | 2 + src/Interpreters/MutationsInterpreter.cpp | 147 ++++++++++++------ src/Interpreters/MutationsInterpreter.h | 3 + src/Parsers/ASTAlterQuery.cpp | 16 ++ src/Parsers/ASTAlterQuery.h | 1 + src/Parsers/ParserAlterQuery.cpp | 17 ++ src/Storages/MergeTree/MutateTask.cpp | 47 +++--- src/Storages/MutationCommands.cpp | 9 ++ src/Storages/MutationCommands.h | 1 + src/Storages/StorageSnapshot.cpp | 1 + .../02932_apply_deleted_mask.reference | 6 + .../0_stateless/02932_apply_deleted_mask.sql | 22 +++ 12 files changed, 207 insertions(+), 65 deletions(-) create mode 100644 tests/queries/0_stateless/02932_apply_deleted_mask.reference create mode 100644 tests/queries/0_stateless/02932_apply_deleted_mask.sql diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index 96bb7615416..0afdb3bab57 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -155,6 +155,7 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter) } else throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong parameter type in ALTER query"); + if (!getContext()->getSettings().allow_experimental_statistic && ( command_ast->type == ASTAlterCommand::ADD_STATISTIC || command_ast->type == ASTAlterCommand::DROP_STATISTIC || @@ -407,6 +408,7 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS break; } case ASTAlterCommand::DELETE: + case ASTAlterCommand::APPLY_DELETED_MASK: case ASTAlterCommand::DROP_PARTITION: case ASTAlterCommand::DROP_DETACHED_PARTITION: { diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 8e56b08f1ed..4cef15f6220 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -153,19 +154,29 @@ bool isStorageTouchedByMutations( return false; bool all_commands_can_be_skipped = true; - for (const MutationCommand & command : commands) + for (const auto & command : commands) { - if (!command.predicate) /// The command touches all rows. - return true; - - if (command.partition) + if (command.type == MutationCommand::APPLY_DELETED_MASK) { - const String partition_id = storage.getPartitionIDFromQuery(command.partition, context); - if (partition_id == source_part->info.partition_id) - all_commands_can_be_skipped = false; + if (source_part->hasLightweightDelete()) + return true; } else - all_commands_can_be_skipped = false; + { + if (!command.predicate) /// The command touches all rows. + return true; + + if (command.partition) + { + const String partition_id = storage.getPartitionIDFromQuery(command.partition, context); + if (partition_id == source_part->info.partition_id) + all_commands_can_be_skipped = false; + } + else + { + all_commands_can_be_skipped = false; + } + } } if (all_commands_can_be_skipped) @@ -211,7 +222,6 @@ bool isStorageTouchedByMutations( return count != 0; } - ASTPtr getPartitionAndPredicateExpressionForMutationCommand( const MutationCommand & command, const StoragePtr & storage, @@ -244,6 +254,32 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand( return command.predicate ? command.predicate->clone() : partition_predicate_as_ast_func; } + +MutationCommand createCommandToApplyDeletedMask(const MutationCommand & command) +{ + if (command.type != MutationCommand::APPLY_DELETED_MASK) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected APPLY_DELETED_MASK mutation command, got: {}", magic_enum::enum_name(command.type)); + + auto alter_command = std::make_shared(); + alter_command->type = ASTAlterCommand::DELETE; + alter_command->partition = command.partition; + + auto row_exists_predicate = makeASTFunction("equals", + std::make_shared(LightweightDeleteDescription::FILTER_COLUMN.name), + std::make_shared(Field(0))); + + if (command.predicate) + alter_command->predicate = makeASTFunction("and", row_exists_predicate, command.predicate); + else + alter_command->predicate = row_exists_predicate; + + auto mutation_command = MutationCommand::parse(alter_command.get()); + if (!mutation_command) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to parse command {}. It's a bug", queryToString(alter_command)); + + return *mutation_command; +} + MutationsInterpreter::Source::Source(StoragePtr storage_) : storage(std::move(storage_)) { } @@ -517,15 +553,18 @@ void MutationsInterpreter::prepare(bool dry_run) NameSet updated_columns; bool materialize_ttl_recalculate_only = source.materializeTTLRecalculateOnly(); - for (const MutationCommand & command : commands) + for (auto & command : commands) { - if (command.type == MutationCommand::Type::UPDATE - || command.type == MutationCommand::Type::DELETE) + if (command.type == MutationCommand::Type::APPLY_DELETED_MASK) + command = createCommandToApplyDeletedMask(command); + + if (command.type == MutationCommand::Type::UPDATE || command.type == MutationCommand::Type::DELETE) materialize_ttl_recalculate_only = false; for (const auto & [name, _] : command.column_to_update_expression) { - if (!available_columns_set.contains(name) && name != LightweightDeleteDescription::FILTER_COLUMN.name + if (!available_columns_set.contains(name) + && name != LightweightDeleteDescription::FILTER_COLUMN.name && name != BlockNumberColumn::name) throw Exception(ErrorCodes::THERE_IS_NO_COLUMN, "Column {} is updated but not requested to read", name); @@ -574,7 +613,7 @@ void MutationsInterpreter::prepare(bool dry_run) std::vector read_columns; /// First, break a sequence of commands into stages. - for (auto & command : commands) + for (const auto & command : commands) { // we can return deleted rows only if it's the only present command assert(command.type == MutationCommand::DELETE || command.type == MutationCommand::UPDATE || !settings.return_mutated_rows); @@ -585,7 +624,7 @@ void MutationsInterpreter::prepare(bool dry_run) if (stages.empty() || !stages.back().column_to_updated.empty()) stages.emplace_back(context); - auto predicate = getPartitionAndPredicateExpressionForMutationCommand(command); + auto predicate = getPartitionAndPredicateExpressionForMutationCommand(command); if (!settings.return_mutated_rows) predicate = makeASTFunction("isZeroOrNull", predicate); @@ -605,16 +644,12 @@ void MutationsInterpreter::prepare(bool dry_run) NameSet affected_materialized; - for (const auto & kv : command.column_to_update_expression) + for (const auto & [column_name, update_expr] : command.column_to_update_expression) { - const String & column = kv.first; - - auto materialized_it = column_to_affected_materialized.find(column); + auto materialized_it = column_to_affected_materialized.find(column_name); if (materialized_it != column_to_affected_materialized.end()) - { - for (const String & mat_column : materialized_it->second) + for (const auto & mat_column : materialized_it->second) affected_materialized.emplace(mat_column); - } /// When doing UPDATE column = expression WHERE condition /// we will replace column to the result of the following expression: @@ -627,33 +662,39 @@ void MutationsInterpreter::prepare(bool dry_run) /// Outer CAST is added just in case if we don't trust the returning type of 'if'. DataTypePtr type; - if (auto physical_column = columns_desc.tryGetPhysical(column)) + if (auto physical_column = columns_desc.tryGetPhysical(column_name)) + { type = physical_column->type; - else if (column == LightweightDeleteDescription::FILTER_COLUMN.name) + } + else if (column_name == LightweightDeleteDescription::FILTER_COLUMN.name) + { type = LightweightDeleteDescription::FILTER_COLUMN.type; - else if (column == BlockNumberColumn::name) + deleted_mask_updated = true; + } + else if (column_name == BlockNumberColumn::name) + { type = BlockNumberColumn::type; + } else - throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column); + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column_name); + } auto type_literal = std::make_shared(type->getName()); - - const auto & update_expr = kv.second; - ASTPtr condition = getPartitionAndPredicateExpressionForMutationCommand(command); /// And new check validateNestedArraySizes for Nested subcolumns - if (isArray(type) && !Nested::splitName(column).second.empty()) + if (isArray(type) && !Nested::splitName(column_name).second.empty()) { std::shared_ptr function = nullptr; - auto nested_update_exprs = getExpressionsOfUpdatedNestedSubcolumns(column, all_columns, command.column_to_update_expression); + auto nested_update_exprs = getExpressionsOfUpdatedNestedSubcolumns(column_name, all_columns, command.column_to_update_expression); if (!nested_update_exprs) { function = makeASTFunction("validateNestedArraySizes", condition, update_expr->clone(), - std::make_shared(column)); + std::make_shared(column_name)); condition = makeASTFunction("and", condition, function); } else if (nested_update_exprs->size() > 1) @@ -675,10 +716,10 @@ void MutationsInterpreter::prepare(bool dry_run) makeASTFunction("_CAST", update_expr->clone(), type_literal), - std::make_shared(column)), + std::make_shared(column_name)), type_literal); - stages.back().column_to_updated.emplace(column, updated_column); + stages.back().column_to_updated.emplace(column_name, updated_column); if (condition && settings.return_mutated_rows) stages.back().filters.push_back(condition); @@ -987,26 +1028,41 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s /// Add _row_exists column if it is present in the part if (source.hasLightweightDeleteMask()) - all_columns.push_back({LightweightDeleteDescription::FILTER_COLUMN}); + all_columns.push_back(LightweightDeleteDescription::FILTER_COLUMN); + + bool has_filters = false; /// Next, for each stage calculate columns changed by this and previous stages. for (size_t i = 0; i < prepared_stages.size(); ++i) { if (settings.return_all_columns || !prepared_stages[i].filters.empty()) { for (const auto & column : all_columns) + { + if (column.name == LightweightDeleteDescription::FILTER_COLUMN.name && !deleted_mask_updated) + continue; + prepared_stages[i].output_columns.insert(column.name); - continue; + } + + has_filters = true; } + else + { + if (i > 0) + prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns; - if (i > 0) - prepared_stages[i].output_columns = prepared_stages[i - 1].output_columns; + /// Make sure that all updated columns are included into output_columns set. + /// This is important for a "hidden" column like _row_exists gets because it is a virtual column + /// and so it is not in the list of AllPhysical columns. + for (const auto & [column_name, _] : prepared_stages[i].column_to_updated) + { + if (column_name == LightweightDeleteDescription::FILTER_COLUMN.name && has_filters && !deleted_mask_updated) + continue; - /// Make sure that all updated columns are included into output_columns set. - /// This is important for a "hidden" column like _row_exists gets because it is a virtual column - /// and so it is not in the list of AllPhysical columns. - for (const auto & kv : prepared_stages[i].column_to_updated) - prepared_stages[i].output_columns.insert(kv.first); + prepared_stages[i].output_columns.insert(column_name); + } + } } /// Now, calculate `expressions_chain` for each stage except the first. @@ -1024,7 +1080,7 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s all_asts->children.push_back(kv.second); /// Add all output columns to prevent ExpressionAnalyzer from deleting them from source columns. - for (const String & column : stage.output_columns) + for (const auto & column : stage.output_columns) all_asts->children.push_back(std::make_shared(column)); /// Executing scalar subquery on that stage can lead to deadlock @@ -1081,7 +1137,6 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s actions_chain.getLastStep().addRequiredOutput(name); actions_chain.getLastActions(); - actions_chain.finalize(); if (i) diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index 1372ea77f4f..eda94190185 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -32,6 +32,8 @@ ASTPtr getPartitionAndPredicateExpressionForMutationCommand( ContextPtr context ); +MutationCommand createCommandToApplyDeletedMask(const MutationCommand & command); + /// Create an input stream that will read data from storage and apply mutation commands (UPDATEs, DELETEs, MATERIALIZEs) /// to this data. class MutationsInterpreter @@ -213,6 +215,7 @@ private: std::unique_ptr updated_header; std::vector stages; bool is_prepared = false; /// Has the sequence of stages been prepared. + bool deleted_mask_updated = false; NameSet materialized_indices; NameSet materialized_projections; diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index 84893011222..ea116e6ccfd 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -466,6 +466,22 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState & settings.ostr << (settings.hilite ? hilite_keyword : "") << " TO "; rename_to->formatImpl(settings, state, frame); } + else if (type == ASTAlterCommand::APPLY_DELETED_MASK) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << "APPLY DELETED MASK" << (settings.hilite ? hilite_none : ""); + + if (partition) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : ""); + partition->formatImpl(settings, state, frame); + } + + if (predicate) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << " WHERE " << (settings.hilite ? hilite_none : ""); + predicate->formatImpl(settings, state, frame); + } + } else throw Exception(ErrorCodes::UNEXPECTED_AST_STRUCTURE, "Unexpected type of ALTER"); } diff --git a/src/Parsers/ASTAlterQuery.h b/src/Parsers/ASTAlterQuery.h index e601739595f..77c540aed33 100644 --- a/src/Parsers/ASTAlterQuery.h +++ b/src/Parsers/ASTAlterQuery.h @@ -71,6 +71,7 @@ public: DELETE, UPDATE, + APPLY_DELETED_MASK, NO_TYPE, diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index c616c6e0441..3522611ec4c 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -111,6 +111,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_remove_ttl("REMOVE TTL"); ParserKeyword s_remove_sample_by("REMOVE SAMPLE BY"); + ParserKeyword s_apply_deleted_mask("APPLY DELETED MASK"); ParserCompoundIdentifier parser_name; ParserStringLiteral parser_string_literal; @@ -823,6 +824,22 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected command->type = ASTAlterCommand::MODIFY_COMMENT; } + else if (s_apply_deleted_mask.ignore(pos, expected)) + { + command->type = ASTAlterCommand::APPLY_DELETED_MASK; + + if (s_in_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + } + + if (s_where.ignore(pos, expected)) + { + if (!parser_exp_elem.parse(pos, command->predicate, expected)) + return false; + } + } else return false; } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 6b6b5947581..fc36840cf92 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -51,7 +51,6 @@ static bool checkOperationIsNotCanceled(ActionBlocker & merges_blocker, MergeLis return true; } - /** Split mutation commands into two parts: * First part should be executed by mutations interpreter. * Other is just simple drop/renames, so they can be executed without interpreter. @@ -79,7 +78,8 @@ static void splitAndModifyMutationCommands( || command.type == MutationCommand::Type::MATERIALIZE_PROJECTION || command.type == MutationCommand::Type::MATERIALIZE_TTL || command.type == MutationCommand::Type::DELETE - || command.type == MutationCommand::Type::UPDATE) + || command.type == MutationCommand::Type::UPDATE + || command.type == MutationCommand::Type::APPLY_DELETED_MASK) { for_interpreter.push_back(command); for (const auto & [column_name, expr] : command.column_to_update_expression) @@ -202,7 +202,8 @@ static void splitAndModifyMutationCommands( || command.type == MutationCommand::Type::MATERIALIZE_PROJECTION || command.type == MutationCommand::Type::MATERIALIZE_TTL || command.type == MutationCommand::Type::DELETE - || command.type == MutationCommand::Type::UPDATE) + || command.type == MutationCommand::Type::UPDATE + || command.type == MutationCommand::Type::APPLY_DELETED_MASK) { for_interpreter.push_back(command); } @@ -257,15 +258,12 @@ getColumnsForNewDataPart( NameToNameMap renamed_columns_from_to; ColumnsDescription part_columns(source_part->getColumns()); NamesAndTypesList system_columns; - if (source_part->supportLightweightDeleteMutate()) - system_columns.push_back(LightweightDeleteDescription::FILTER_COLUMN); - /// Preserve system columns that have persisted values in the source_part - for (const auto & column : system_columns) - { - if (part_columns.has(column.name) && !storage_columns.contains(column.name)) - storage_columns.emplace_back(column); - } + const auto & deleted_mask_column = LightweightDeleteDescription::FILTER_COLUMN; + bool supports_lightweight_deletes = source_part->supportLightweightDeleteMutate(); + + bool deleted_mask_updated = false; + bool has_delete_command = false; NameSet storage_columns_set; for (const auto & [name, _] : storage_columns) @@ -277,23 +275,22 @@ getColumnsForNewDataPart( { for (const auto & [column_name, _] : command.column_to_update_expression) { - /// Allow to update and persist values of system column - auto column = system_columns.tryGetByName(column_name); - if (column && !storage_columns.contains(column_name)) - storage_columns.emplace_back(column_name, column->type); + if (column_name == deleted_mask_column.name + && supports_lightweight_deletes + && !storage_columns_set.contains(deleted_mask_column.name)) + deleted_mask_updated = true; } } + if (command.type == MutationCommand::DELETE || command.type == MutationCommand::APPLY_DELETED_MASK) + has_delete_command = true; + /// If we don't have this column in source part, than we don't need to materialize it if (!part_columns.has(command.column_name)) - { continue; - } if (command.type == MutationCommand::DROP_COLUMN) - { removed_columns.insert(command.column_name); - } if (command.type == MutationCommand::RENAME_COLUMN) { @@ -302,6 +299,15 @@ getColumnsForNewDataPart( } } + if (!storage_columns_set.contains(deleted_mask_column.name)) + { + if (deleted_mask_updated || (part_columns.has(deleted_mask_column.name) && !has_delete_command)) + { + storage_columns.push_back(deleted_mask_column); + storage_columns_set.insert(deleted_mask_column.name); + } + } + SerializationInfoByName new_serialization_infos; for (const auto & [name, old_info] : serialization_infos) { @@ -1900,6 +1906,9 @@ static bool canSkipMutationCommandForPart(const MergeTreeDataPartPtr & part, con return true; } + if (command.type == MutationCommand::APPLY_DELETED_MASK && !part->hasLightweightDelete()) + return true; + if (canSkipConversionToNullable(part, command)) return true; diff --git a/src/Storages/MutationCommands.cpp b/src/Storages/MutationCommands.cpp index 03200d0d9fa..36388a32b41 100644 --- a/src/Storages/MutationCommands.cpp +++ b/src/Storages/MutationCommands.cpp @@ -59,6 +59,15 @@ std::optional MutationCommand::parse(ASTAlterCommand * command, } return res; } + else if (command->type == ASTAlterCommand::APPLY_DELETED_MASK) + { + MutationCommand res; + res.ast = command->ptr(); + res.type = APPLY_DELETED_MASK; + res.predicate = command->predicate; + res.partition = command->partition; + return res; + } else if (command->type == ASTAlterCommand::MATERIALIZE_INDEX) { MutationCommand res; diff --git a/src/Storages/MutationCommands.h b/src/Storages/MutationCommands.h index 014a227dff3..6e10f7d9b2d 100644 --- a/src/Storages/MutationCommands.h +++ b/src/Storages/MutationCommands.h @@ -39,6 +39,7 @@ struct MutationCommand MATERIALIZE_TTL, RENAME_COLUMN, MATERIALIZE_COLUMN, + APPLY_DELETED_MASK, ALTER_WITHOUT_MUTATION, /// pure metadata command, currently unusned }; diff --git a/src/Storages/StorageSnapshot.cpp b/src/Storages/StorageSnapshot.cpp index a22ba6586ac..34c092c7208 100644 --- a/src/Storages/StorageSnapshot.cpp +++ b/src/Storages/StorageSnapshot.cpp @@ -35,6 +35,7 @@ void StorageSnapshot::init() if (storage.hasLightweightDeletedMask()) system_columns[LightweightDeleteDescription::FILTER_COLUMN.name] = LightweightDeleteDescription::FILTER_COLUMN.type; + system_columns[BlockNumberColumn::name] = BlockNumberColumn::type; } diff --git a/tests/queries/0_stateless/02932_apply_deleted_mask.reference b/tests/queries/0_stateless/02932_apply_deleted_mask.reference new file mode 100644 index 00000000000..0d75f7c1b30 --- /dev/null +++ b/tests/queries/0_stateless/02932_apply_deleted_mask.reference @@ -0,0 +1,6 @@ +10 45 +all_1_1_0 10 0 +7 33 +all_1_1_0_2 10 1 +7 33 +all_1_1_0_3 7 0 diff --git a/tests/queries/0_stateless/02932_apply_deleted_mask.sql b/tests/queries/0_stateless/02932_apply_deleted_mask.sql new file mode 100644 index 00000000000..602c67de52e --- /dev/null +++ b/tests/queries/0_stateless/02932_apply_deleted_mask.sql @@ -0,0 +1,22 @@ +DROP TABLE IF EXISTS t_materialize_delete; + +CREATE TABLE t_materialize_delete (id UInt64, v UInt64) ENGINE = MergeTree ORDER BY tuple() settings min_bytes_for_wide_part = 0; + +SET mutations_sync = 2; + +INSERT INTO t_materialize_delete SELECT number, number FROM numbers(10); + +SELECT count(), sum(v) FROM t_materialize_delete; +SELECT name, rows, has_lightweight_delete FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; + +DELETE FROM t_materialize_delete WHERE id % 3 = 1; + +SELECT count(), sum(v) FROM t_materialize_delete; +SELECT name, rows, has_lightweight_delete FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; + +ALTER TABLE t_materialize_delete APPLY DELETED MASK; + +SELECT count(), sum(v) FROM t_materialize_delete; +SELECT name, rows, has_lightweight_delete FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; + +DROP TABLE t_materialize_delete; From 8d30e22a09613257e049fd07414eeaf0b7bb9097 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Mon, 4 Dec 2023 13:00:50 +0000 Subject: [PATCH 2/5] fix lighweight delete with heavy delete --- src/Interpreters/MutationsInterpreter.cpp | 4 +-- src/Storages/MergeTree/MergeTask.cpp | 4 +++ .../MergeTree/MergeTreeSequentialSource.cpp | 36 +++++++++++++------ .../MergeTree/MergeTreeSequentialSource.h | 4 ++- .../02352_lightweight_delete.reference | 2 +- ...02521_lightweight_delete_and_ttl.reference | 2 +- 6 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 4cef15f6220..d333477f36e 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -1030,7 +1030,6 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s if (source.hasLightweightDeleteMask()) all_columns.push_back(LightweightDeleteDescription::FILTER_COLUMN); - bool has_filters = false; /// Next, for each stage calculate columns changed by this and previous stages. for (size_t i = 0; i < prepared_stages.size(); ++i) @@ -1046,6 +1045,7 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s } has_filters = true; + settings.apply_deleted_mask = true; } else { @@ -1279,7 +1279,7 @@ void MutationsInterpreter::Source::read( VirtualColumns virtual_columns(std::move(required_columns), part); - createMergeTreeSequentialSource( + createReadFromPartStep( plan, *data, storage_snapshot, part, std::move(virtual_columns.columns_to_read), apply_deleted_mask_, filter, context_, diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index e8e307bb148..94bd0f98986 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -556,6 +556,8 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const global_ctx->storage_snapshot, global_ctx->future_part->parts[part_num], column_names, + /*mark_ranges=*/ {}, + /*apply_deleted_mask=*/ true, ctx->read_with_direct_io, /*take_column_types_from_storage=*/ true, /*quiet=*/ false, @@ -909,6 +911,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream() global_ctx->storage_snapshot, part, global_ctx->merging_column_names, + /*mark_ranges=*/ {}, + /*apply_deleted_mask=*/ true, ctx->read_with_direct_io, /*take_column_types_from_storage=*/ true, /*quiet=*/ false, diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp index a586997360a..5075e43448a 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.cpp @@ -131,6 +131,7 @@ MergeTreeSequentialSource::MergeTreeSequentialSource( auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical) .withExtendedObjects() .withSystemColumns(); + if (storage.supportsSubcolumns()) options.withSubcolumns(); columns_for_reader = storage_snapshot->getColumnsByNames(options, columns_to_read); @@ -241,19 +242,24 @@ Pipe createMergeTreeSequentialSource( const StorageSnapshotPtr & storage_snapshot, MergeTreeData::DataPartPtr data_part, Names columns_to_read, + std::optional mark_ranges, + bool apply_deleted_mask, bool read_with_direct_io, bool take_column_types_from_storage, bool quiet, std::shared_ptr> filtered_rows_count) { + const auto & filter_column = LightweightDeleteDescription::FILTER_COLUMN; + /// The part might have some rows masked by lightweight deletes - const bool need_to_filter_deleted_rows = data_part->hasLightweightDelete(); - auto columns = columns_to_read; - if (need_to_filter_deleted_rows) - columns.emplace_back(LightweightDeleteDescription::FILTER_COLUMN.name); + const bool need_to_filter_deleted_rows = apply_deleted_mask && data_part->hasLightweightDelete(); + const bool has_filter_column = std::ranges::find(columns_to_read, filter_column.name) != columns_to_read.end(); + + if (need_to_filter_deleted_rows && !has_filter_column) + columns_to_read.emplace_back(filter_column.name); auto column_part_source = std::make_shared( - storage, storage_snapshot, data_part, columns, std::optional{}, + storage, storage_snapshot, data_part, columns_to_read, std::move(mark_ranges), /*apply_deleted_mask=*/ false, read_with_direct_io, take_column_types_from_storage, quiet); Pipe pipe(std::move(column_part_source)); @@ -261,10 +267,10 @@ Pipe createMergeTreeSequentialSource( /// Add filtering step that discards deleted rows if (need_to_filter_deleted_rows) { - pipe.addSimpleTransform([filtered_rows_count](const Block & header) + pipe.addSimpleTransform([filtered_rows_count, has_filter_column](const Block & header) { return std::make_shared( - header, nullptr, LightweightDeleteDescription::FILTER_COLUMN.name, true, false, filtered_rows_count); + header, nullptr, filter_column.name, !has_filter_column, false, filtered_rows_count); }); } @@ -325,9 +331,17 @@ public: } } - auto source = std::make_unique( - storage, storage_snapshot, data_part, columns_to_read, - std::move(mark_ranges), apply_deleted_mask, false, true); + auto source = createMergeTreeSequentialSource( + storage, + storage_snapshot, + data_part, + columns_to_read, + std::move(mark_ranges), + apply_deleted_mask, + /*read_with_direct_io=*/ false, + /*take_column_types_from_storage=*/ true, + /*quiet=*/ false, + /*filtered_rows_count=*/ nullptr); pipeline.init(Pipe(std::move(source))); } @@ -343,7 +357,7 @@ private: Poco::Logger * log; }; -void createMergeTreeSequentialSource( +void createReadFromPartStep( QueryPlan & plan, const MergeTreeData & storage, const StorageSnapshotPtr & storage_snapshot, diff --git a/src/Storages/MergeTree/MergeTreeSequentialSource.h b/src/Storages/MergeTree/MergeTreeSequentialSource.h index fb249568e8f..396d3f76886 100644 --- a/src/Storages/MergeTree/MergeTreeSequentialSource.h +++ b/src/Storages/MergeTree/MergeTreeSequentialSource.h @@ -15,6 +15,8 @@ Pipe createMergeTreeSequentialSource( const StorageSnapshotPtr & storage_snapshot, MergeTreeData::DataPartPtr data_part, Names columns_to_read, + std::optional mark_ranges, + bool apply_deleted_mask, bool read_with_direct_io, bool take_column_types_from_storage, bool quiet, @@ -22,7 +24,7 @@ Pipe createMergeTreeSequentialSource( class QueryPlan; -void createMergeTreeSequentialSource( +void createReadFromPartStep( QueryPlan & plan, const MergeTreeData & storage, const StorageSnapshotPtr & storage_snapshot, diff --git a/tests/queries/0_stateless/02352_lightweight_delete.reference b/tests/queries/0_stateless/02352_lightweight_delete.reference index 3386b3294c3..ce7c6e81ac8 100644 --- a/tests/queries/0_stateless/02352_lightweight_delete.reference +++ b/tests/queries/0_stateless/02352_lightweight_delete.reference @@ -26,7 +26,7 @@ Rows in parts 800000 Count 700000 First row 300000 1 Do ALTER DELETE mutation that does a "heavyweight" delete -Rows in parts 533333 +Rows in parts 466666 Count 466666 First row 300001 10 Delete 100K more rows using lightweight DELETE diff --git a/tests/queries/0_stateless/02521_lightweight_delete_and_ttl.reference b/tests/queries/0_stateless/02521_lightweight_delete_and_ttl.reference index 3b40d9048cd..e60b2a184db 100644 --- a/tests/queries/0_stateless/02521_lightweight_delete_and_ttl.reference +++ b/tests/queries/0_stateless/02521_lightweight_delete_and_ttl.reference @@ -15,7 +15,7 @@ SELECT 'Count', count() FROM lwd_test_02521; Count 25000 ALTER TABLE lwd_test_02521 DELETE WHERE id >= 40000 SETTINGS mutations_sync = 1; SELECT 'Rows in parts', SUM(rows) FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_test_02521' AND active; -Rows in parts 40000 +Rows in parts 15000 SELECT 'Count', count() FROM lwd_test_02521; Count 15000 OPTIMIZE TABLE lwd_test_02521 FINAL SETTINGS mutations_sync = 1; From c0e45c15fbb7ba5ee9f8f22c82fdaa613370bdb5 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 5 Dec 2023 13:38:25 +0000 Subject: [PATCH 3/5] add test for lightweight deletes and mutations --- src/Interpreters/MutationsInterpreter.cpp | 2 +- src/Parsers/ASTAlterQuery.cpp | 6 --- src/Parsers/ParserAlterQuery.cpp | 6 --- .../02932_apply_deleted_mask.reference | 21 ++++++--- .../0_stateless/02932_apply_deleted_mask.sql | 33 +++++++++++--- .../02932_lwd_and_mutations.reference | 14 ++++++ .../0_stateless/02932_lwd_and_mutations.sql | 43 +++++++++++++++++++ 7 files changed, 100 insertions(+), 25 deletions(-) create mode 100644 tests/queries/0_stateless/02932_lwd_and_mutations.reference create mode 100644 tests/queries/0_stateless/02932_lwd_and_mutations.sql diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index d333477f36e..a492ea266cf 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -1027,7 +1027,7 @@ void MutationsInterpreter::prepareMutationStages(std::vector & prepared_s auto all_columns = storage_snapshot->getColumnsByNames(options, available_columns); /// Add _row_exists column if it is present in the part - if (source.hasLightweightDeleteMask()) + if (source.hasLightweightDeleteMask() || deleted_mask_updated) all_columns.push_back(LightweightDeleteDescription::FILTER_COLUMN); bool has_filters = false; diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index ea116e6ccfd..ed9de6a46eb 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -475,12 +475,6 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState & settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : ""); partition->formatImpl(settings, state, frame); } - - if (predicate) - { - settings.ostr << (settings.hilite ? hilite_keyword : "") << " WHERE " << (settings.hilite ? hilite_none : ""); - predicate->formatImpl(settings, state, frame); - } } else throw Exception(ErrorCodes::UNEXPECTED_AST_STRUCTURE, "Unexpected type of ALTER"); diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index 3522611ec4c..6c772db0193 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -833,12 +833,6 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected if (!parser_partition.parse(pos, command->partition, expected)) return false; } - - if (s_where.ignore(pos, expected)) - { - if (!parser_exp_elem.parse(pos, command->predicate, expected)) - return false; - } } else return false; diff --git a/tests/queries/0_stateless/02932_apply_deleted_mask.reference b/tests/queries/0_stateless/02932_apply_deleted_mask.reference index 0d75f7c1b30..22499472f84 100644 --- a/tests/queries/0_stateless/02932_apply_deleted_mask.reference +++ b/tests/queries/0_stateless/02932_apply_deleted_mask.reference @@ -1,6 +1,15 @@ -10 45 -all_1_1_0 10 0 -7 33 -all_1_1_0_2 10 1 -7 33 -all_1_1_0_3 7 0 +Inserted +100 4950 +10 100 0 +Lighweight deleted +86 4271 +10 100 10 +Mask applied +86 4271 +10 86 0 +Lighweight deleted +72 3578 +10 86 10 +Mask applied in partition +72 3578 +10 84 9 diff --git a/tests/queries/0_stateless/02932_apply_deleted_mask.sql b/tests/queries/0_stateless/02932_apply_deleted_mask.sql index 602c67de52e..0ada0640a8f 100644 --- a/tests/queries/0_stateless/02932_apply_deleted_mask.sql +++ b/tests/queries/0_stateless/02932_apply_deleted_mask.sql @@ -1,22 +1,43 @@ DROP TABLE IF EXISTS t_materialize_delete; -CREATE TABLE t_materialize_delete (id UInt64, v UInt64) ENGINE = MergeTree ORDER BY tuple() settings min_bytes_for_wide_part = 0; +CREATE TABLE t_materialize_delete (id UInt64, v UInt64) +ENGINE = MergeTree ORDER BY id PARTITION BY id % 10; SET mutations_sync = 2; -INSERT INTO t_materialize_delete SELECT number, number FROM numbers(10); +INSERT INTO t_materialize_delete SELECT number, number FROM numbers(100); + +SELECT 'Inserted'; SELECT count(), sum(v) FROM t_materialize_delete; -SELECT name, rows, has_lightweight_delete FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; -DELETE FROM t_materialize_delete WHERE id % 3 = 1; +SELECT 'Lighweight deleted'; + +DELETE FROM t_materialize_delete WHERE id % 7 = 3; SELECT count(), sum(v) FROM t_materialize_delete; -SELECT name, rows, has_lightweight_delete FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; + +SELECT 'Mask applied'; ALTER TABLE t_materialize_delete APPLY DELETED MASK; SELECT count(), sum(v) FROM t_materialize_delete; -SELECT name, rows, has_lightweight_delete FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; + +SELECT 'Lighweight deleted'; + +DELETE FROM t_materialize_delete WHERE id % 7 = 4; + +SELECT count(), sum(v) FROM t_materialize_delete; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; + +SELECT 'Mask applied in partition'; + +ALTER TABLE t_materialize_delete APPLY DELETED MASK IN PARTITION 5; + +SELECT count(), sum(v) FROM t_materialize_delete; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_materialize_delete' AND active; DROP TABLE t_materialize_delete; diff --git a/tests/queries/0_stateless/02932_lwd_and_mutations.reference b/tests/queries/0_stateless/02932_lwd_and_mutations.reference new file mode 100644 index 00000000000..dc0d3536b8f --- /dev/null +++ b/tests/queries/0_stateless/02932_lwd_and_mutations.reference @@ -0,0 +1,14 @@ +900 0 [1,2,3,4,5,6,7,8,9] +1 1000 1 +800 200 [2,3,4,5,6,7,8,9] +1 800 0 +700 150 [3,4,5,6,7,8,9] +1 800 1 +600 300 [4,5,6,7,8,9] +1 600 0 +400 200 [6,7,8,9] +1 500 1 +200 100 [8,9] +1 300 1 +200 100 [8,9] +1 200 0 diff --git a/tests/queries/0_stateless/02932_lwd_and_mutations.sql b/tests/queries/0_stateless/02932_lwd_and_mutations.sql new file mode 100644 index 00000000000..a68aca91764 --- /dev/null +++ b/tests/queries/0_stateless/02932_lwd_and_mutations.sql @@ -0,0 +1,43 @@ +DROP TABLE IF EXISTS t_lwd_mutations; + +CREATE TABLE t_lwd_mutations(id UInt64, v UInt64) ENGINE = MergeTree ORDER BY id; +INSERT INTO t_lwd_mutations SELECT number, 0 FROM numbers(1000); + +SET mutations_sync = 2; + +DELETE FROM t_lwd_mutations WHERE id % 10 = 0; + +SELECT count(), sum(v), arraySort(groupUniqArray(id % 10)) FROM t_lwd_mutations; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_lwd_mutations' AND active; + +ALTER TABLE t_lwd_mutations UPDATE v = 1 WHERE id % 4 = 0, DELETE WHERE id % 10 = 1; + +SELECT count(), sum(v), arraySort(groupUniqArray(id % 10)) FROM t_lwd_mutations; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_lwd_mutations' AND active; + +DELETE FROM t_lwd_mutations WHERE id % 10 = 2; + +SELECT count(), sum(v), arraySort(groupUniqArray(id % 10)) FROM t_lwd_mutations; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_lwd_mutations' AND active; + +ALTER TABLE t_lwd_mutations UPDATE v = 1 WHERE id % 4 = 1, DELETE WHERE id % 10 = 3; + +SELECT count(), sum(v), arraySort(groupUniqArray(id % 10)) FROM t_lwd_mutations; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_lwd_mutations' AND active; + +ALTER TABLE t_lwd_mutations UPDATE _row_exists = 0 WHERE id % 10 = 4, DELETE WHERE id % 10 = 5; + +SELECT count(), sum(v), arraySort(groupUniqArray(id % 10)) FROM t_lwd_mutations; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_lwd_mutations' AND active; + +ALTER TABLE t_lwd_mutations DELETE WHERE id % 10 = 6, UPDATE _row_exists = 0 WHERE id % 10 = 7; + +SELECT count(), sum(v), arraySort(groupUniqArray(id % 10)) FROM t_lwd_mutations; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_lwd_mutations' AND active; + +ALTER TABLE t_lwd_mutations APPLY DELETED MASK; + +SELECT count(), sum(v), arraySort(groupUniqArray(id % 10)) FROM t_lwd_mutations; +SELECT count(), sum(rows), sum(has_lightweight_delete) FROM system.parts WHERE database = currentDatabase() AND table = 't_lwd_mutations' AND active; + +DROP TABLE IF EXISTS t_lwd_mutations; From eb990d863df624dca621d05d14c9e8285ce7abb0 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 5 Dec 2023 15:15:46 +0000 Subject: [PATCH 4/5] fix tests --- src/Storages/MergeTree/MutateTask.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index d9f4eeb04ef..d5222312c80 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1536,7 +1536,8 @@ private: for (auto & command_for_interpreter : ctx->for_interpreter) { - if (command_for_interpreter.type == MutationCommand::DELETE) + if (command_for_interpreter.type == MutationCommand::DELETE + || command_for_interpreter.type == MutationCommand::APPLY_DELETED_MASK) { has_delete = true; break; From 1960713176777cc75a082170bbb75ea3b4accdbc Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 13 Dec 2023 17:27:23 +0000 Subject: [PATCH 5/5] add docs --- .../statements/alter/apply-deleted-mask.md | 22 +++++++++++++++++++ .../sql-reference/statements/alter/index.md | 5 +++-- src/Interpreters/MutationsInterpreter.cpp | 6 ++--- 3 files changed, 28 insertions(+), 5 deletions(-) create mode 100644 docs/en/sql-reference/statements/alter/apply-deleted-mask.md diff --git a/docs/en/sql-reference/statements/alter/apply-deleted-mask.md b/docs/en/sql-reference/statements/alter/apply-deleted-mask.md new file mode 100644 index 00000000000..7a11d66e739 --- /dev/null +++ b/docs/en/sql-reference/statements/alter/apply-deleted-mask.md @@ -0,0 +1,22 @@ +--- +slug: /en/sql-reference/statements/alter/apply-deleted-mask +sidebar_position: 46 +sidebar_label: APPLY DELETED MASK +--- + +# Apply mask of deleted rows + +``` sql +ALTER TABLE [db].name [ON CLUSTER cluster] APPLY DELETED MASK [IN PARTITION partition_id] +``` + +The command applies mask created by [lightweight delete](/docs/en/sql-reference/statements/delete) and forcefully removes rows marked as deleted from disk. This command is a heavyweight mutation and it semantically equals to query ```ALTER TABLE [db].name DELETE WHERE _row_exists = 0```. + +:::note +It only works for tables in the [`MergeTree`](../../../engines/table-engines/mergetree-family/mergetree.md) family (including [replicated](../../../engines/table-engines/mergetree-family/replication.md) tables). +::: + +**See also** + +- [Lightweight deletes](/docs/en/sql-reference/statements/delete) +- [Heavyweight deletes](/docs/en/sql-reference/statements/alter/delete.md) diff --git a/docs/en/sql-reference/statements/alter/index.md b/docs/en/sql-reference/statements/alter/index.md index d28542e0a43..dc6668c7983 100644 --- a/docs/en/sql-reference/statements/alter/index.md +++ b/docs/en/sql-reference/statements/alter/index.md @@ -17,8 +17,9 @@ Most `ALTER TABLE` queries modify table settings or data: - [CONSTRAINT](/docs/en/sql-reference/statements/alter/constraint.md) - [TTL](/docs/en/sql-reference/statements/alter/ttl.md) - [STATISTIC](/docs/en/sql-reference/statements/alter/statistic.md) +- [APPLY DELETED MASK](/docs/en/sql-reference/statements/alter/apply-deleted-mask.md) -:::note +:::note Most `ALTER TABLE` queries are supported only for [\*MergeTree](/docs/en/engines/table-engines/mergetree-family/index.md) tables, as well as [Merge](/docs/en/engines/table-engines/special/merge.md) and [Distributed](/docs/en/engines/table-engines/special/distributed.md). ::: @@ -59,7 +60,7 @@ For all `ALTER` queries, you can use the [alter_sync](/docs/en/operations/settin You can specify how long (in seconds) to wait for inactive replicas to execute all `ALTER` queries with the [replication_wait_for_inactive_replica_timeout](/docs/en/operations/settings/settings.md/#replication-wait-for-inactive-replica-timeout) setting. -:::note +:::note For all `ALTER` queries, if `alter_sync = 2` and some replicas are not active for more than the time, specified in the `replication_wait_for_inactive_replica_timeout` setting, then an exception `UNFINISHED` is thrown. ::: diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index a492ea266cf..bf50766c165 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -269,9 +269,9 @@ MutationCommand createCommandToApplyDeletedMask(const MutationCommand & command) std::make_shared(Field(0))); if (command.predicate) - alter_command->predicate = makeASTFunction("and", row_exists_predicate, command.predicate); - else - alter_command->predicate = row_exists_predicate; + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Mutation command APPLY DELETED MASK does not support WHERE clause"); + + alter_command->predicate = row_exists_predicate; auto mutation_command = MutationCommand::parse(alter_command.get()); if (!mutation_command)