diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 916dee01431..7ec879e49f9 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -857,7 +857,10 @@ void MutationsInterpreter::prepare(bool dry_run) else if (command.type == MutationCommand::MATERIALIZE_TTL) { mutation_kind.set(MutationKind::MUTATE_OTHER); - if (materialize_ttl_recalculate_only) + bool suitable_for_ttl_optimization = source.getMergeTreeData()->getSettings()->ttl_only_drop_parts + && metadata_snapshot->hasOnlyRowsTTL(); + + if (materialize_ttl_recalculate_only || suitable_for_ttl_optimization) { // just recalculate ttl_infos without remove expired data auto all_columns_vec = all_columns.getNames(); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 03c7c1b3024..3ba145256c6 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -119,6 +119,7 @@ static void splitAndModifyMutationCommands( const MutationCommands & commands, MutationCommands & for_interpreter, MutationCommands & for_file_renames, + bool suitable_for_ttl_optimization, LoggerPtr log) { auto part_columns = part->getColumnsDescription(); @@ -128,6 +129,7 @@ static void splitAndModifyMutationCommands( { NameSet mutated_columns; NameSet dropped_columns; + NameSet ignored_columns; for (const auto & command : commands) { @@ -153,6 +155,15 @@ static void splitAndModifyMutationCommands( for_interpreter.push_back(command); for (const auto & [column_name, expr] : command.column_to_update_expression) mutated_columns.emplace(column_name); + + if (command.type == MutationCommand::Type::MATERIALIZE_TTL && suitable_for_ttl_optimization) + { + for (const auto & col : part_columns) + { + if (!mutated_columns.contains(col.name)) + ignored_columns.emplace(col.name); + } + } } else if (command.type == MutationCommand::Type::DROP_INDEX || command.type == MutationCommand::Type::DROP_PROJECTION @@ -213,7 +224,7 @@ static void splitAndModifyMutationCommands( /// from disk we just don't read dropped columns for (const auto & column : part_columns) { - if (!mutated_columns.contains(column.name)) + if (!mutated_columns.contains(column.name) && !ignored_columns.contains(column.name)) { if (!metadata_snapshot->getColumns().has(column.name) && !part->storage.getVirtualsPtr()->has(column.name)) { @@ -1884,6 +1895,82 @@ private: std::unique_ptr part_merger_writer_task{nullptr}; }; +/* + * Decorator that'll drop expired parts by replacing them with empty ones. + * Main use case (only use case for now) is to decorate `MutateSomePartColumnsTask`, + * which is used to recalculate TTL. If the part is expired, this class will replace it with + * an empty one. + * + * Triggered when `ttl_only_drop_parts` is set and the only TTL is rows TTL. + * */ +class ExecutableTaskDropTTLExpiredPartsDecorator : public IExecutableTask +{ +public: + explicit ExecutableTaskDropTTLExpiredPartsDecorator( + std::unique_ptr executable_task_, + MutationContextPtr ctx_ + ) + : executable_task(std::move(executable_task_)), ctx(ctx_) {} + + void onCompleted() override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + StorageID getStorageID() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + Priority getPriority() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + String getQueryId() const override { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); } + + bool executeStep() override + { + switch (state) + { + case State::NEED_EXECUTE: + { + if (executable_task->executeStep()) + return true; + + if (isRowsMaxTTLExpired()) + replacePartWithEmpty(); + + state = State::SUCCESS; + return true; + } + case State::SUCCESS: + { + return false; + } + } + return false; + } + +private: + enum class State + { + NEED_EXECUTE, + + SUCCESS + }; + + State state{State::NEED_EXECUTE}; + + std::unique_ptr executable_task; + MutationContextPtr ctx; + + bool isRowsMaxTTLExpired() const + { + const auto ttl = ctx->new_data_part->ttl_infos.table_ttl; + return ttl.max && ttl.max <= ctx->time_of_mutation; + } + + void replacePartWithEmpty() + { + MergeTreePartInfo part_info = ctx->new_data_part->info; + part_info.level += 1; + + MergeTreePartition partition = ctx->new_data_part->partition; + std::string part_name = ctx->new_data_part->getNewName(part_info); + + auto [mutable_empty_part, _] = ctx->data->createEmptyPart(part_info, partition, part_name, ctx->txn); + ctx->new_data_part = std::move(mutable_empty_part); + } +}; MutateTask::MutateTask( FutureMergedMutatedPartPtr future_part_, @@ -2122,6 +2209,7 @@ bool MutateTask::prepare() context_for_reading->setSetting("max_streams_for_merge_tree_reading", Field(0)); context_for_reading->setSetting("read_from_filesystem_cache_if_exists_otherwise_bypass_cache", 1); + bool suitable_for_ttl_optimization = ctx->metadata_snapshot->hasOnlyRowsTTL() && ctx->data->getSettings()->ttl_only_drop_parts; MutationHelpers::splitAndModifyMutationCommands( ctx->source_part, ctx->metadata_snapshot, @@ -2129,6 +2217,7 @@ bool MutateTask::prepare() ctx->commands_for_part, ctx->for_interpreter, ctx->for_file_renames, + suitable_for_ttl_optimization, ctx->log); ctx->stage_progress = std::make_unique(1.0); @@ -2235,7 +2324,12 @@ bool MutateTask::prepare() /// The blobs have to be removed along with the part, this temporary part owns them and does not share them yet. ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::REMOVE_BLOBS; - task = std::make_unique(ctx); + bool drop_expired_parts = suitable_for_ttl_optimization && !ctx->data->getSettings()->materialize_ttl_recalculate_only; + if (drop_expired_parts) + task = std::make_unique(std::make_unique(ctx), ctx); + else + task = std::make_unique(ctx); + ProfileEvents::increment(ProfileEvents::MutationAllPartColumns); } else /// TODO: check that we modify only non-key columns in this case. @@ -2295,7 +2389,12 @@ bool MutateTask::prepare() /// Keeper has to be asked with unlock request to release the references to the blobs ctx->new_data_part->remove_tmp_policy = IMergeTreeDataPart::BlobsRemovalPolicyForTemporaryParts::ASK_KEEPER; - task = std::make_unique(ctx); + bool drop_expired_parts = suitable_for_ttl_optimization && !ctx->data->getSettings()->materialize_ttl_recalculate_only; + if (drop_expired_parts) + task = std::make_unique(std::make_unique(ctx), ctx); + else + task = std::make_unique(ctx); + ProfileEvents::increment(ProfileEvents::MutationSomePartColumns); } diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index 4a655cac566..4c74c8f56d1 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -260,6 +260,12 @@ bool StorageInMemoryMetadata::hasAnyTableTTL() const return hasAnyMoveTTL() || hasRowsTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL(); } +bool StorageInMemoryMetadata::hasOnlyRowsTTL() const +{ + bool has_any_other_ttl = hasAnyMoveTTL() || hasAnyRecompressionTTL() || hasAnyGroupByTTL() || hasAnyRowsWhereTTL() || hasAnyColumnTTL(); + return hasRowsTTL() && !has_any_other_ttl; +} + TTLColumnsDescription StorageInMemoryMetadata::getColumnTTLs() const { return column_ttls_by_name; diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index afc3bc81ef6..64ae499ec6e 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -144,6 +144,9 @@ struct StorageInMemoryMetadata /// Returns true if there is set table TTL, any column TTL or any move TTL. bool hasAnyTTL() const { return hasAnyColumnTTL() || hasAnyTableTTL(); } + /// Returns true if only rows TTL is set, not even rows where. + bool hasOnlyRowsTTL() const; + /// Common tables TTLs (for rows and moves). TTLTableDescription getTableTTLs() const; bool hasAnyTableTTL() const;