diff --git a/src/DataStreams/TTLBlockInputStream.cpp b/src/DataStreams/TTLBlockInputStream.cpp index 8b31da6d2f1..05d4ba0a395 100644 --- a/src/DataStreams/TTLBlockInputStream.cpp +++ b/src/DataStreams/TTLBlockInputStream.cpp @@ -76,17 +76,17 @@ TTLBlockInputStream::TTLBlockInputStream( algorithms.emplace_back(std::make_unique( description, old_ttl_infos.columns_ttl[name], current_time_, - force_, name, default_expression, default_column_name)); + force_, name, default_expression, default_column_name, isCompactPart(data_part))); } } for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs()) - algorithms.emplace_back(std::make_unique( - move_ttl, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_)); + algorithms.emplace_back(std::make_unique( + move_ttl, TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_)); for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs()) - algorithms.emplace_back(std::make_unique( - recompression_ttl, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); + algorithms.emplace_back(std::make_unique( + recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); } Block reorderColumns(Block block, const Block & header) diff --git a/src/DataStreams/TTLCalcInputStream.cpp b/src/DataStreams/TTLCalcInputStream.cpp new file mode 100644 index 00000000000..2353e9ec259 --- /dev/null +++ b/src/DataStreams/TTLCalcInputStream.cpp @@ -0,0 +1,77 @@ +#include +#include + +namespace DB +{ + +TTLCalcInputStream::TTLCalcInputStream( + const BlockInputStreamPtr & input_, + const MergeTreeData & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const MergeTreeData::MutableDataPartPtr & data_part_, + time_t current_time_, + bool force_) + : data_part(data_part_) + , log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcInputStream)")) +{ + children.push_back(input_); + header = children.at(0)->getHeader(); + auto old_ttl_infos = data_part->ttl_infos; + + if (metadata_snapshot_->hasRowsTTL()) + { + const auto & rows_ttl = metadata_snapshot_->getRowsTTL(); + algorithms.emplace_back(std::make_unique( + rows_ttl, TTLUpdateField::TABLE_TTL, rows_ttl.result_column, old_ttl_infos.table_ttl, current_time_, force_)); + } + + for (const auto & where_ttl : metadata_snapshot_->getRowsWhereTTLs()) + algorithms.emplace_back(std::make_unique( + where_ttl, TTLUpdateField::ROWS_WHERE_TTL, where_ttl.result_column, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_)); + + for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs()) + algorithms.emplace_back(std::make_unique( + group_by_ttl, TTLUpdateField::GROUP_BY_TTL, group_by_ttl.result_column, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_)); + + if (metadata_snapshot_->hasAnyColumnTTL()) + { + for (const auto & [name, description] : metadata_snapshot_->getColumnTTLs()) + { + algorithms.emplace_back(std::make_unique( + description, TTLUpdateField::COLUMNS_TTL, name, old_ttl_infos.columns_ttl[name], current_time_, force_)); + } + } + + for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs()) + algorithms.emplace_back(std::make_unique( + move_ttl, TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_)); + + for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs()) + algorithms.emplace_back(std::make_unique( + recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_)); +} + +Block TTLCalcInputStream::readImpl() +{ + auto block = children.at(0)->read(); + for (const auto & algorithm : algorithms) + algorithm->execute(block); + + if (!block) + return block; + + Block res; + for (const auto & col : header) + res.insert(block.getByName(col.name)); + + return res; +} + +void TTLCalcInputStream::readSuffixImpl() +{ + data_part->ttl_infos = {}; + for (const auto & algorithm : algorithms) + algorithm->finalize(data_part); +} + +} diff --git a/src/DataStreams/TTLCalcInputStream.h b/src/DataStreams/TTLCalcInputStream.h new file mode 100644 index 00000000000..d1b629c2ad5 --- /dev/null +++ b/src/DataStreams/TTLCalcInputStream.h @@ -0,0 +1,44 @@ +#pragma once +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +class TTLCalcInputStream : public IBlockInputStream +{ +public: + TTLCalcInputStream( + const BlockInputStreamPtr & input_, + const MergeTreeData & storage_, + const StorageMetadataPtr & metadata_snapshot_, + const MergeTreeData::MutableDataPartPtr & data_part_, + time_t current_time, + bool force_ + ); + + String getName() const override { return "TTL_CALC"; } + Block getHeader() const override { return header; } + +protected: + Block readImpl() override; + + /// Finalizes ttl infos and updates data part + void readSuffixImpl() override; + +private: + std::vector algorithms; + + /// ttl_infos and empty_columns are updating while reading + const MergeTreeData::MutableDataPartPtr & data_part; + Poco::Logger * log; + Block header; +}; + +} diff --git a/src/DataStreams/TTLColumnAlgorithm.cpp b/src/DataStreams/TTLColumnAlgorithm.cpp index 1318ea382db..71ad2a4e38f 100644 --- a/src/DataStreams/TTLColumnAlgorithm.cpp +++ b/src/DataStreams/TTLColumnAlgorithm.cpp @@ -10,11 +10,13 @@ TTLColumnAlgorithm::TTLColumnAlgorithm( bool force_, const String & column_name_, const ExpressionActionsPtr & default_expression_, - const String & default_column_name_) + const String & default_column_name_, + bool is_compact_part_) : ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_) , column_name(column_name_) , default_expression(default_expression_) , default_column_name(default_column_name_) + , is_compact_part(is_compact_part_) { if (!isMinTTLExpired()) { @@ -40,7 +42,7 @@ void TTLColumnAlgorithm::execute(Block & block) return; /// Later drop full column - if (isMaxTTLExpired()) + if (isMaxTTLExpired() && !is_compact_part) return; auto default_column = executeExpressionAndGetColumn(default_expression, block, default_column_name); diff --git a/src/DataStreams/TTLColumnAlgorithm.h b/src/DataStreams/TTLColumnAlgorithm.h index e09dd663af0..ddf963eaee2 100644 --- a/src/DataStreams/TTLColumnAlgorithm.h +++ b/src/DataStreams/TTLColumnAlgorithm.h @@ -17,7 +17,9 @@ public: bool force_, const String & column_name_, const ExpressionActionsPtr & default_expression_, - const String & default_column_name_); + const String & default_column_name_, + bool is_compact_part_ + ); void execute(Block & block) override; void finalize(const MutableDataPartPtr & data_part) const override; @@ -28,6 +30,7 @@ private: const String default_column_name; bool is_fully_empty = true; + bool is_compact_part; }; } diff --git a/src/DataStreams/TTLUpdateInfoAlgorithm.cpp b/src/DataStreams/TTLUpdateInfoAlgorithm.cpp index d5feb14658b..6a983d052c1 100644 --- a/src/DataStreams/TTLUpdateInfoAlgorithm.cpp +++ b/src/DataStreams/TTLUpdateInfoAlgorithm.cpp @@ -4,8 +4,15 @@ namespace DB { TTLUpdateInfoAlgorithm::TTLUpdateInfoAlgorithm( - const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_) + const TTLDescription & description_, + const TTLUpdateField ttl_update_field_, + const String ttl_update_key_, + const TTLInfo & old_ttl_info_, + time_t current_time_, + bool force_) : ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_) + , ttl_update_field(ttl_update_field_) + , ttl_update_key(ttl_update_key_) { } @@ -22,26 +29,37 @@ void TTLUpdateInfoAlgorithm::execute(Block & block) } } -TTLMoveAlgorithm::TTLMoveAlgorithm( - const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_) - : TTLUpdateInfoAlgorithm(description_, old_ttl_info_, current_time_, force_) +void TTLUpdateInfoAlgorithm::finalize(const MutableDataPartPtr & data_part) const { -} + if (ttl_update_field == TTLUpdateField::RECOMPRESSION_TTL) + { + data_part->ttl_infos.recompression_ttl[ttl_update_key] = new_ttl_info; + } + else if (ttl_update_field == TTLUpdateField::MOVES_TTL) + { + data_part->ttl_infos.moves_ttl[ttl_update_key] = new_ttl_info; + } + else if (ttl_update_field == TTLUpdateField::GROUP_BY_TTL) + { + data_part->ttl_infos.group_by_ttl[ttl_update_key] = new_ttl_info; + data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max); + } + else if (ttl_update_field == TTLUpdateField::ROWS_WHERE_TTL) + { + data_part->ttl_infos.rows_where_ttl[ttl_update_key] = new_ttl_info; + data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max); + } + else if (ttl_update_field == TTLUpdateField::TABLE_TTL) + { + data_part->ttl_infos.table_ttl = new_ttl_info; + data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max); + } + else if (ttl_update_field == TTLUpdateField::COLUMNS_TTL) + { + data_part->ttl_infos.columns_ttl[ttl_update_key] = new_ttl_info; + data_part->ttl_infos.updatePartMinMaxTTL(new_ttl_info.min, new_ttl_info.max); + } -void TTLMoveAlgorithm::finalize(const MutableDataPartPtr & data_part) const -{ - data_part->ttl_infos.moves_ttl[description.result_column] = new_ttl_info; -} - -TTLRecompressionAlgorithm::TTLRecompressionAlgorithm( - const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_) - : TTLUpdateInfoAlgorithm(description_, old_ttl_info_, current_time_, force_) -{ -} - -void TTLRecompressionAlgorithm::finalize(const MutableDataPartPtr & data_part) const -{ - data_part->ttl_infos.recompression_ttl[description.result_column] = new_ttl_info; } } diff --git a/src/DataStreams/TTLUpdateInfoAlgorithm.h b/src/DataStreams/TTLUpdateInfoAlgorithm.h index c1ef0e1c90d..551211fc47f 100644 --- a/src/DataStreams/TTLUpdateInfoAlgorithm.h +++ b/src/DataStreams/TTLUpdateInfoAlgorithm.h @@ -5,28 +5,35 @@ namespace DB { +enum class TTLUpdateField +{ + COLUMNS_TTL, + TABLE_TTL, + ROWS_WHERE_TTL, + MOVES_TTL, + RECOMPRESSION_TTL, + GROUP_BY_TTL, +}; + /// Calculates new ttl_info and does nothing with data. class TTLUpdateInfoAlgorithm : public ITTLAlgorithm { public: - TTLUpdateInfoAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_); + TTLUpdateInfoAlgorithm( + const TTLDescription & description_, + const TTLUpdateField ttl_update_field_, + const String ttl_update_key_, + const TTLInfo & old_ttl_info_, + time_t current_time_, bool force_ + ); void execute(Block & block) override; - void finalize(const MutableDataPartPtr & data_part) const override = 0; + void finalize(const MutableDataPartPtr & data_part) const override; + +private: + const TTLUpdateField ttl_update_field; + const String ttl_update_key; }; -class TTLMoveAlgorithm final : public TTLUpdateInfoAlgorithm -{ -public: - TTLMoveAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_); - void finalize(const MutableDataPartPtr & data_part) const override; -}; - -class TTLRecompressionAlgorithm final : public TTLUpdateInfoAlgorithm -{ -public: - TTLRecompressionAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_); - void finalize(const MutableDataPartPtr & data_part) const override; -}; } diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 4e5e3b4e86b..83af913c7ab 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -156,7 +156,7 @@ ColumnDependencies getAllColumnDependencies(const StorageMetadataPtr & metadata_ ColumnDependencies dependencies; while (!new_updated_columns.empty()) { - auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns); + auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true); new_updated_columns.clear(); for (const auto & dependency : new_dependencies) { @@ -303,6 +303,15 @@ static NameSet getKeyColumns(const StoragePtr & storage, const StorageMetadataPt return key_columns; } +static bool materializeTTLRecalculateOnly(const StoragePtr & storage) +{ + auto storage_from_merge_tree_data_part = std::dynamic_pointer_cast(storage); + if (!storage_from_merge_tree_data_part) + return false; + + return storage_from_merge_tree_data_part->materializeTTLRecalculateOnly(); +} + static void validateUpdateColumns( const StoragePtr & storage, const StorageMetadataPtr & metadata_snapshot, const NameSet & updated_columns, @@ -394,8 +403,13 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) NamesAndTypesList all_columns = columns_desc.getAllPhysical(); NameSet updated_columns; + bool materialize_ttl_recalculate_only = materializeTTLRecalculateOnly(storage); for (const MutationCommand & command : commands) { + if (command.type == MutationCommand::Type::UPDATE + || command.type == MutationCommand::Type::DELETE) + materialize_ttl_recalculate_only = false; + for (const auto & kv : command.column_to_update_expression) { updated_columns.insert(kv.first); @@ -569,7 +583,18 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) else if (command.type == MutationCommand::MATERIALIZE_TTL) { mutation_kind.set(MutationKind::MUTATE_OTHER); - if (metadata_snapshot->hasRowsTTL()) + if (materialize_ttl_recalculate_only) + { + // just recalculate ttl_infos without remove expired data + auto all_columns_vec = all_columns.getNames(); + auto new_dependencies = metadata_snapshot->getColumnDependencies(NameSet(all_columns_vec.begin(), all_columns_vec.end()), false); + for (const auto & dependency : new_dependencies) + { + if (dependency.kind == ColumnDependency::TTL_EXPRESSION) + dependencies.insert(dependency); + } + } + else if (metadata_snapshot->hasRowsTTL()) { for (const auto & column : all_columns) dependencies.emplace(column.name, ColumnDependency::TTL_TARGET); @@ -594,19 +619,19 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) } /// Recalc only skip indices and projections of columns which could be updated by TTL. - auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns); + auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true); for (const auto & dependency : new_dependencies) { if (dependency.kind == ColumnDependency::SKIP_INDEX || dependency.kind == ColumnDependency::PROJECTION) dependencies.insert(dependency); } + } - if (dependencies.empty()) - { - /// Very rare case. It can happen if we have only one MOVE TTL with constant expression. - /// But we still have to read at least one column. - dependencies.emplace(all_columns.front().name, ColumnDependency::TTL_EXPRESSION); - } + if (dependencies.empty()) + { + /// Very rare case. It can happen if we have only one MOVE TTL with constant expression. + /// But we still have to read at least one column. + dependencies.emplace(all_columns.front().name, ColumnDependency::TTL_EXPRESSION); } } else if (command.type == MutationCommand::READ_COLUMN) diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 84be8012509..57606c8ed9a 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include #include @@ -1288,10 +1289,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension(new_data_part->getType()) : getNonAdaptiveMrkExtension(); bool need_sync = needSyncPart(source_part->rows_count, source_part->getBytesOnDisk(), *data_settings); - bool need_remove_expired_values = false; + auto execute_ttl_type = ExecuteTTLType::NONE; - if (in && shouldExecuteTTL(metadata_snapshot, interpreter->getColumnDependencies(), commands_for_part)) - need_remove_expired_values = true; + if (in) + execute_ttl_type = shouldExecuteTTL(metadata_snapshot, interpreter->getColumnDependencies()); /// All columns from part are changed and may be some more that were missing before in part /// TODO We can materialize compact part without copying data @@ -1319,7 +1320,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor time_of_mutation, compression_codec, merge_entry, - need_remove_expired_values, + execute_ttl_type, need_sync, space_reservation, holder, @@ -1356,7 +1357,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor return data.cloneAndLoadDataPartOnSameDisk(source_part, "tmp_clone_", future_part.part_info, metadata_snapshot); } - if (need_remove_expired_values) + if (execute_ttl_type != ExecuteTTLType::NONE) files_to_skip.insert("ttl.txt"); disk->createDirectories(new_part_tmp_path); @@ -1416,7 +1417,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor time_of_mutation, compression_codec, merge_entry, - need_remove_expired_values, + execute_ttl_type, need_sync, space_reservation, holder, @@ -1437,7 +1438,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor } } - finalizeMutatedPart(source_part, new_data_part, need_remove_expired_values, compression_codec); + finalizeMutatedPart(source_part, new_data_part, execute_ttl_type, compression_codec); } return new_data_part; @@ -1984,21 +1985,22 @@ std::set MergeTreeDataMergerMutator::getProjectionsToRec return projections_to_recalc; } -bool MergeTreeDataMergerMutator::shouldExecuteTTL( - const StorageMetadataPtr & metadata_snapshot, const ColumnDependencies & dependencies, const MutationCommands & commands) +ExecuteTTLType MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadataPtr & metadata_snapshot, const ColumnDependencies & dependencies) { if (!metadata_snapshot->hasAnyTTL()) - return false; + return ExecuteTTLType::NONE; - for (const auto & command : commands) - if (command.type == MutationCommand::MATERIALIZE_TTL) - return true; + bool has_ttl_expression = false; for (const auto & dependency : dependencies) - if (dependency.kind == ColumnDependency::TTL_EXPRESSION || dependency.kind == ColumnDependency::TTL_TARGET) - return true; + { + if (dependency.kind == ColumnDependency::TTL_EXPRESSION) + has_ttl_expression = true; - return false; + if (dependency.kind == ColumnDependency::TTL_TARGET) + return ExecuteTTLType::NORMAL; + } + return has_ttl_expression ? ExecuteTTLType::RECALCULATE : ExecuteTTLType::NONE; } // 1. get projection pipeline and a sink to write parts @@ -2172,7 +2174,7 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns( time_t time_of_mutation, const CompressionCodecPtr & compression_codec, MergeListEntry & merge_entry, - bool need_remove_expired_values, + ExecuteTTLType execute_ttl_type, bool need_sync, const ReservationPtr & space_reservation, TableLockHolder & holder, @@ -2185,9 +2187,12 @@ void MergeTreeDataMergerMutator::mutateAllPartColumns( mutating_stream = std::make_shared( std::make_shared(mutating_stream, data.getPrimaryKeyAndSkipIndicesExpression(metadata_snapshot))); - if (need_remove_expired_values) + if (execute_ttl_type == ExecuteTTLType::NORMAL) mutating_stream = std::make_shared(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true); + if (execute_ttl_type == ExecuteTTLType::RECALCULATE) + mutating_stream = std::make_shared(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true); + IMergeTreeDataPart::MinMaxIndex minmax_idx; MergedBlockOutputStream out{ @@ -2229,7 +2234,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns( time_t time_of_mutation, const CompressionCodecPtr & compression_codec, MergeListEntry & merge_entry, - bool need_remove_expired_values, + ExecuteTTLType execute_ttl_type, bool need_sync, const ReservationPtr & space_reservation, TableLockHolder & holder, @@ -2238,9 +2243,12 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns( if (mutating_stream == nullptr) throw Exception("Cannot mutate part columns with uninitialized mutations stream. It's a bug", ErrorCodes::LOGICAL_ERROR); - if (need_remove_expired_values) + if (execute_ttl_type == ExecuteTTLType::NORMAL) mutating_stream = std::make_shared(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true); + if (execute_ttl_type == ExecuteTTLType::RECALCULATE) + mutating_stream = std::make_shared(mutating_stream, data, metadata_snapshot, new_data_part, time_of_mutation, true); + IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; MergedColumnOnlyOutputStream out( new_data_part, @@ -2279,7 +2287,7 @@ void MergeTreeDataMergerMutator::mutateSomePartColumns( void MergeTreeDataMergerMutator::finalizeMutatedPart( const MergeTreeDataPartPtr & source_part, MergeTreeData::MutableDataPartPtr new_data_part, - bool need_remove_expired_values, + ExecuteTTLType execute_ttl_type, const CompressionCodecPtr & codec) { auto disk = new_data_part->volume->getDisk(); @@ -2293,7 +2301,7 @@ void MergeTreeDataMergerMutator::finalizeMutatedPart( new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_hash = out_hashing.getHash(); } - if (need_remove_expired_values) + if (execute_ttl_type != ExecuteTTLType::NONE) { /// Write a file with ttl infos in json format. auto out_ttl = disk->writeFile(fs::path(new_data_part->getFullRelativePath()) / "ttl.txt", 4096); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index ca7376d8f3e..3a0041e4a37 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -23,6 +23,13 @@ enum class SelectPartsDecision NOTHING_TO_MERGE = 2, }; +enum class ExecuteTTLType +{ + NONE = 0, + NORMAL = 1, + RECALCULATE= 2, +}; + /// Auxiliary struct holding metainformation for the future merged or mutated part. struct FutureMergedMutatedPart { @@ -200,8 +207,7 @@ private: const ProjectionsDescription & all_projections, const MutationCommands & commands_for_removes); - static bool shouldExecuteTTL( - const StorageMetadataPtr & metadata_snapshot, const ColumnDependencies & dependencies, const MutationCommands & commands); + static ExecuteTTLType shouldExecuteTTL(const StorageMetadataPtr & metadata_snapshot, const ColumnDependencies & dependencies); /// Return set of indices which should be recalculated during mutation also /// wraps input stream into additional expression stream @@ -242,7 +248,7 @@ private: time_t time_of_mutation, const CompressionCodecPtr & compression_codec, MergeListEntry & merge_entry, - bool need_remove_expired_values, + ExecuteTTLType execute_ttl_type, bool need_sync, const ReservationPtr & space_reservation, TableLockHolder & holder, @@ -260,7 +266,7 @@ private: time_t time_of_mutation, const CompressionCodecPtr & compression_codec, MergeListEntry & merge_entry, - bool need_remove_expired_values, + ExecuteTTLType execute_ttl_type, bool need_sync, const ReservationPtr & space_reservation, TableLockHolder & holder, @@ -271,7 +277,7 @@ private: static void finalizeMutatedPart( const MergeTreeDataPartPtr & source_part, MergeTreeData::MutableDataPartPtr new_data_part, - bool need_remove_expired_values, + ExecuteTTLType execute_ttl_type, const CompressionCodecPtr & codec); public : diff --git a/src/Storages/MergeTree/MergeTreeSettings.h b/src/Storages/MergeTree/MergeTreeSettings.h index 9a198500447..890cfca8d71 100644 --- a/src/Storages/MergeTree/MergeTreeSettings.h +++ b/src/Storages/MergeTree/MergeTreeSettings.h @@ -117,6 +117,7 @@ struct Settings; M(Int64, merge_with_ttl_timeout, 3600 * 4, "Minimal time in seconds, when merge with delete TTL can be repeated.", 0) \ M(Int64, merge_with_recompression_ttl_timeout, 3600 * 4, "Minimal time in seconds, when merge with recompression TTL can be repeated.", 0) \ M(Bool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.", 0) \ + M(Bool, materialize_ttl_recalculate_only, false, "Only recalculate ttl info when MATERIALIZE TTL", 0) \ M(Bool, write_final_mark, true, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)", 0) \ M(Bool, enable_mixed_granularity_parts, true, "Enable parts with adaptive and non adaptive granularity", 0) \ M(MaxThreads, max_part_loading_threads, 0, "The number of threads to load data parts at startup.", 0) \ diff --git a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index bcce2d990ca..997e6e8bb74 100644 --- a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -73,6 +73,11 @@ public: return storage.getPartitionIDFromQuery(ast, context); } + bool materializeTTLRecalculateOnly() const + { + return parts.front()->storage.getSettings()->materialize_ttl_recalculate_only; + } + protected: /// Used in part mutation. StorageFromMergeTreeDataPart(const MergeTreeData::DataPartPtr & part_) diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index 5183b925141..fc1c0571e86 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -214,7 +214,7 @@ bool StorageInMemoryMetadata::hasAnyGroupByTTL() const return !table_ttl.group_by_ttl.empty(); } -ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet & updated_columns) const +ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet & updated_columns, bool include_ttl_target) const { if (updated_columns.empty()) return {}; @@ -250,7 +250,7 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet if (hasRowsTTL()) { auto rows_expression = getRowsTTL().expression; - if (add_dependent_columns(rows_expression, required_ttl_columns)) + if (add_dependent_columns(rows_expression, required_ttl_columns) && include_ttl_target) { /// Filter all columns, if rows TTL expression have to be recalculated. for (const auto & column : getColumns().getAllPhysical()) @@ -263,13 +263,15 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(const NameSet for (const auto & [name, entry] : getColumnTTLs()) { - if (add_dependent_columns(entry.expression, required_ttl_columns)) + if (add_dependent_columns(entry.expression, required_ttl_columns) && include_ttl_target) updated_ttl_columns.insert(name); } for (const auto & entry : getMoveTTLs()) add_dependent_columns(entry.expression, required_ttl_columns); + //TODO what about rows_where_ttl and group_by_ttl ?? + for (const auto & column : indices_columns) res.emplace(column, ColumnDependency::SKIP_INDEX); for (const auto & column : projections_columns) diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index d0d60f608d7..9accdb9b3b6 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -143,7 +143,7 @@ struct StorageInMemoryMetadata /// Returns columns, which will be needed to calculate dependencies (skip /// indices, TTL expressions) if we update @updated_columns set of columns. - ColumnDependencies getColumnDependencies(const NameSet & updated_columns) const; + ColumnDependencies getColumnDependencies(const NameSet & updated_columns, bool include_ttl_target) const; /// Block with ordinary + materialized columns. Block getSampleBlock() const; diff --git a/tests/queries/0_stateless/01070_modify_ttl_recalc_only.reference b/tests/queries/0_stateless/01070_modify_ttl_recalc_only.reference new file mode 100644 index 00000000000..fe9cba71c4c --- /dev/null +++ b/tests/queries/0_stateless/01070_modify_ttl_recalc_only.reference @@ -0,0 +1,68 @@ +2000-10-10 1 +2000-10-10 2 +2100-10-10 3 +2100-10-10 4 +2000-10-11 00:00:00 2000-10-11 00:00:00 +2000-10-11 00:00:00 2000-10-11 00:00:00 +2100-10-11 00:00:00 2100-10-11 00:00:00 +2100-10-11 00:00:00 2100-10-11 00:00:00 +2100-10-10 3 +2100-10-10 4 +============= +1 a +2 b +3 c +4 d +2000-01-01 00:00:00 2100-01-01 00:00:00 +1 a +3 c +============= +1 a +3 c +2000-01-01 00:00:00 2000-01-01 00:00:00 +============= +1 a +2 b +3 c +4 d +1 a +2 +3 c +4 +============= +1 a +2 +3 c +4 +1 +2 +3 +4 +============= +1 a +2 b +3 c +4 d +2000-01-01 00:00:00 2100-01-01 00:00:00 +1 a +2 b +4 d +============= +1 a +2 b +4 d +1 +2 +4 d +============= +1 a aa +2 b bb +3 c cc +4 d dd +1 a +2 b bb +3 cc +4 d +1 +============= +0 diff --git a/tests/queries/0_stateless/01070_modify_ttl_recalc_only.sql b/tests/queries/0_stateless/01070_modify_ttl_recalc_only.sql new file mode 100644 index 00000000000..aafed1a7bce --- /dev/null +++ b/tests/queries/0_stateless/01070_modify_ttl_recalc_only.sql @@ -0,0 +1,107 @@ +set mutations_sync = 2; + +drop table if exists ttl; + +create table ttl (d Date, a Int) engine = MergeTree order by a partition by toDayOfMonth(d) +SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true; + +insert into ttl values (toDateTime('2000-10-10 00:00:00'), 1); +insert into ttl values (toDateTime('2000-10-10 00:00:00'), 2); +insert into ttl values (toDateTime('2100-10-10 00:00:00'), 3); +insert into ttl values (toDateTime('2100-10-10 00:00:00'), 4); + + +alter table ttl modify ttl d + interval 1 day; +select * from ttl order by a; +select delete_ttl_info_min, delete_ttl_info_max from system.parts where database = currentDatabase() and table = 'ttl' and active > 0 order by name asc; +optimize table ttl final; +select * from ttl order by a; +select '============='; + +drop table if exists ttl; + +create table ttl (i Int, s String) engine = MergeTree order by i +SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true; + +insert into ttl values (1, 'a') (2, 'b') (3, 'c') (4, 'd'); + +alter table ttl modify ttl i % 2 = 0 ? toDate('2000-01-01') : toDate('2100-01-01'); +select * from ttl order by i; +select delete_ttl_info_min, delete_ttl_info_max from system.parts where database = currentDatabase() and table = 'ttl' and active > 0; +optimize table ttl final; +select * from ttl order by i; +select '============='; + +alter table ttl modify ttl toDate('2000-01-01'); +select * from ttl order by i; +select delete_ttl_info_min, delete_ttl_info_max from system.parts where database = currentDatabase() and table = 'ttl' and active > 0; +optimize table ttl final; +select * from ttl order by i; +select '============='; + +drop table if exists ttl; + +create table ttl (i Int, s String) engine = MergeTree order by i +SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true; + +insert into ttl values (1, 'a') (2, 'b') (3, 'c') (4, 'd'); + +alter table ttl modify column s String ttl i % 2 = 0 ? today() - 10 : toDate('2100-01-01'); +select * from ttl order by i; +optimize table ttl final; +select * from ttl order by i; +select '============='; + +alter table ttl modify column s String ttl toDate('2000-01-01'); +select * from ttl order by i; +optimize table ttl final; +select * from ttl order by i; +select '============='; + +drop table if exists ttl; + +create table ttl (d Date, i Int, s String) engine = MergeTree order by i +SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true; + +insert into ttl values (toDate('2000-01-02'), 1, 'a') (toDate('2000-01-03'), 2, 'b') (toDate('2080-01-01'), 3, 'c') (toDate('2080-01-03'), 4, 'd'); + +alter table ttl modify ttl i % 3 = 0 ? toDate('2000-01-01') : toDate('2100-01-01'); +select i, s from ttl order by i; +select delete_ttl_info_min, delete_ttl_info_max from system.parts where database = currentDatabase() and table = 'ttl' and active > 0; +optimize table ttl final; +select i, s from ttl order by i; +select '============='; + +alter table ttl modify column s String ttl d + interval 1 month; +select i, s from ttl order by i; +optimize table ttl final; +select i, s from ttl order by i; +select '============='; + +drop table if exists ttl; + +create table ttl (i Int, s String, t String) engine = MergeTree order by i +SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true; + +insert into ttl values (1, 'a', 'aa') (2, 'b', 'bb') (3, 'c', 'cc') (4, 'd', 'dd'); + +alter table ttl modify column s String ttl i % 3 = 0 ? today() - 10 : toDate('2100-01-01'), + modify column t String ttl i % 3 = 1 ? today() - 10 : toDate('2100-01-01'); + +select i, s, t from ttl order by i; +optimize table ttl final; +select i, s, t from ttl order by i; +-- MATERIALIZE TTL ran only once +select count() from system.mutations where database = currentDatabase() and table = 'ttl' and is_done; +select '============='; + +drop table if exists ttl; + +-- Nothing changed, don't run mutation +create table ttl (i Int, s String ttl toDate('2000-01-02')) engine = MergeTree order by i +SETTINGS max_number_of_merges_with_ttl_in_pool=0,materialize_ttl_recalculate_only=true; + +alter table ttl modify column s String ttl toDate('2000-01-02'); +select count() from system.mutations where database = currentDatabase() and table = 'ttl' and is_done; + +drop table if exists ttl;