diff --git a/dbms/src/DataStreams/TTLBlockInputStream.cpp b/dbms/src/DataStreams/TTLBlockInputStream.cpp index 11999b894aa..33c111cb5bf 100644 --- a/dbms/src/DataStreams/TTLBlockInputStream.cpp +++ b/dbms/src/DataStreams/TTLBlockInputStream.cpp @@ -17,10 +17,12 @@ TTLBlockInputStream::TTLBlockInputStream( const BlockInputStreamPtr & input_, const MergeTreeData & storage_, const MergeTreeData::MutableDataPartPtr & data_part_, - time_t current_time_) + time_t current_time_, + bool force_) : storage(storage_) , data_part(data_part_) , current_time(current_time_) + , force(force_) , old_ttl_infos(data_part->ttl_infos) , log(&Logger::get(storage.getLogName() + " (TTLBlockInputStream)")) , date_lut(DateLUT::instance()) @@ -60,6 +62,10 @@ TTLBlockInputStream::TTLBlockInputStream( } } +bool TTLBlockInputStream::isTTLExpired(time_t ttl) +{ + return (ttl && (ttl <= current_time)); +} Block TTLBlockInputStream::readImpl() { @@ -70,13 +76,13 @@ Block TTLBlockInputStream::readImpl() if (storage.hasTableTTL()) { /// Skip all data if table ttl is expired for part - if (old_ttl_infos.table_ttl.max <= current_time) + if (isTTLExpired(old_ttl_infos.table_ttl.max)) { rows_removed = data_part->rows_count; return {}; } - if (old_ttl_infos.table_ttl.min <= current_time) + if (force || isTTLExpired(old_ttl_infos.table_ttl.min)) removeRowsWithExpiredTableTTL(block); } @@ -96,15 +102,15 @@ void TTLBlockInputStream::readSuffixImpl() data_part->empty_columns = std::move(empty_columns); if (rows_removed) - LOG_INFO(log, "Removed " << rows_removed << " rows with expired ttl from part " << data_part->name); + LOG_INFO(log, "Removed " << rows_removed << " rows with expired TTL from part " << data_part->name); } void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) { storage.ttl_table_entry.expression->execute(block); - const auto & current = block.getByName(storage.ttl_table_entry.result_column); - const IColumn * ttl_column = current.column.get(); + const IColumn * ttl_column = + block.getByName(storage.ttl_table_entry.result_column).column.get(); const auto & column_names = header.getNames(); MutableColumns result_columns; @@ -112,15 +118,14 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block) for (auto it = column_names.begin(); it != column_names.end(); ++it) { - auto & column_with_type = block.getByName(*it); - const IColumn * values_column = column_with_type.column.get(); + const IColumn * values_column = block.getByName(*it).column.get(); MutableColumnPtr result_column = values_column->cloneEmpty(); result_column->reserve(block.rows()); for (size_t i = 0; i < block.rows(); ++i) { UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); - if (cur_ttl > current_time) + if (!isTTLExpired(cur_ttl)) { new_ttl_infos.table_ttl.update(cur_ttl); result_column->insertFrom(*values_column, i); @@ -148,10 +153,12 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block) const auto & old_ttl_info = old_ttl_infos.columns_ttl[name]; auto & new_ttl_info = new_ttl_infos.columns_ttl[name]; - if (old_ttl_info.min > current_time) + /// Nothing to do + if (!force && !isTTLExpired(old_ttl_info.min)) continue; - if (old_ttl_info.max <= current_time) + /// Later drop full column + if (isTTLExpired(old_ttl_info.max)) continue; if (!block.has(ttl_entry.result_column)) @@ -166,14 +173,12 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block) MutableColumnPtr result_column = values_column->cloneEmpty(); result_column->reserve(block.rows()); - const auto & current = block.getByName(ttl_entry.result_column); - const IColumn * ttl_column = current.column.get(); + const IColumn * ttl_column = block.getByName(ttl_entry.result_column).column.get(); for (size_t i = 0; i < block.rows(); ++i) { UInt32 cur_ttl = getTimestampByIndex(ttl_column, i); - - if (cur_ttl <= current_time) + if (isTTLExpired(cur_ttl)) { if (default_column) result_column->insertFrom(*default_column, i); diff --git a/dbms/src/DataStreams/TTLBlockInputStream.h b/dbms/src/DataStreams/TTLBlockInputStream.h index de0d4f9156b..a57c6cf74bf 100644 --- a/dbms/src/DataStreams/TTLBlockInputStream.h +++ b/dbms/src/DataStreams/TTLBlockInputStream.h @@ -16,7 +16,8 @@ public: const BlockInputStreamPtr & input_, const MergeTreeData & storage_, const MergeTreeData::MutableDataPartPtr & data_part_, - time_t current_time + time_t current_time, + bool force_ ); String getName() const override { return "TTLBlockInputStream"; } @@ -36,6 +37,7 @@ private: const MergeTreeData::MutableDataPartPtr & data_part; time_t current_time; + bool force; MergeTreeDataPart::TTLInfos old_ttl_infos; MergeTreeDataPart::TTLInfos new_ttl_infos; @@ -50,13 +52,14 @@ private: Block header; private: - /// Removes values with expired ttl and computes new min_ttl and empty_columns for part + /// Removes values with expired ttl and computes new_ttl_infos and empty_columns for part void removeValuesWithExpiredColumnTTL(Block & block); - /// Remove rows with expired table ttl and computes new min_ttl for part + /// Removes rows with expired table ttl and computes new ttl_infos for part void removeRowsWithExpiredTableTTL(Block & block); UInt32 getTimestampByIndex(const IColumn * column, size_t ind); + bool isTTLExpired(time_t ttl); }; } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 271ad231f41..c6b85e6d98f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -531,6 +531,7 @@ public: bool hasPrimaryKey() const { return !primary_key_columns.empty(); } bool hasSkipIndices() const { return !skip_indices.empty(); } bool hasTableTTL() const { return ttl_table_ast != nullptr; } + bool hasAnyColumnTTL() const { return !ttl_entries_by_name.empty(); } /// Check that the part is not broken and calculate the checksums for it if they are not present. MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path); diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index e30bbdcda0b..2b17475801f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -522,7 +522,7 @@ public: /// parts should be sorted. MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart( const FutureMergedMutatedPart & future_part, MergeList::Entry & merge_entry, - time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation, bool deduplicate) + time_t time_of_merge, DiskSpaceMonitor::Reservation * disk_reservation, bool deduplicate, bool force_ttl) { static const String TMP_PREFIX = "tmp_merge_"; @@ -560,7 +560,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor size_t sum_input_rows_upper_bound = merge_entry->total_rows_count; - bool need_remove_expired_values = false; + bool need_remove_expired_values = force_ttl; for (const MergeTreeData::DataPartPtr & part : parts) new_data_part->ttl_infos.update(part->ttl_infos); @@ -568,7 +568,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor if (part_min_ttl && part_min_ttl <= time_of_merge) need_remove_expired_values = true; - MergeAlgorithm merge_alg = chooseMergeAlgorithm(parts, sum_input_rows_upper_bound, gathering_columns, deduplicate, need_remove_expired_values); LOG_DEBUG(log, "Selected MergeAlgorithm: " << ((merge_alg == MergeAlgorithm::Vertical) ? "Vertical" : "Horizontal")); @@ -707,7 +706,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor merged_stream = std::make_shared(merged_stream, SizeLimits(), 0 /*limit_hint*/, Names()); if (need_remove_expired_values) - merged_stream = std::make_shared(merged_stream, data, new_data_part, time_of_merge); + merged_stream = std::make_shared(merged_stream, data, new_data_part, time_of_merge, force_ttl); MergedBlockOutputStream to{ data, diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h index 29fd615d39b..a380b5c2bc2 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.h @@ -95,7 +95,7 @@ public: MergeTreeData::MutableDataPartPtr mergePartsToTemporaryPart( const FutureMergedMutatedPart & future_part, MergeListEntry & merge_entry, time_t time_of_merge, - DiskSpaceMonitor::Reservation * disk_reservation, bool deduplication); + DiskSpaceMonitor::Reservation * disk_reservation, bool deduplication, bool force_ttl); /// Mutate a single data part with the specified commands. Will create and return a temporary part. MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart( diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index 602249c86ea..4797922d837 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -77,6 +77,8 @@ struct ReplicatedMergeTreeLogEntryData bool deduplicate = false; /// Do deduplicate on merge String column_name; + bool force_ttl = false; + /// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory. bool detach = false; diff --git a/dbms/src/Storages/StorageMergeTree.cpp b/dbms/src/Storages/StorageMergeTree.cpp index 0536423101d..589a5ad2af2 100644 --- a/dbms/src/Storages/StorageMergeTree.cpp +++ b/dbms/src/Storages/StorageMergeTree.cpp @@ -590,9 +590,10 @@ bool StorageMergeTree::merge( try { + bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL())); new_part = merger_mutator.mergePartsToTemporaryPart( future_part, *merge_entry, time(nullptr), - merging_tagger->reserved_space.get(), deduplicate); + merging_tagger->reserved_space.get(), deduplicate, force_ttl); merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); removeEmptyColumnsFromPart(new_part); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 934d651fead..8492dd2c007 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1083,7 +1083,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) try { part = merger_mutator.mergePartsToTemporaryPart( - future_merged_part, *merge_entry, entry.create_time, reserved_space.get(), entry.deduplicate); + future_merged_part, *merge_entry, entry.create_time, reserved_space.get(), entry.deduplicate, entry.force_ttl); merger_mutator.renameMergedTemporaryPart(part, parts, &transaction); removeEmptyColumnsFromPart(part); @@ -2157,6 +2157,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() return; const bool deduplicate = false; /// TODO: read deduplicate option from table config + const bool force_ttl = false; bool success = false; @@ -2190,7 +2191,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask() if (max_source_parts_size_for_merge > 0 && merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred)) { - success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate); + success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, force_ttl); } else if (max_source_part_size_for_mutation > 0 && queue.countMutations() > 0) { @@ -2254,6 +2255,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts( const DataPartsVector & parts, const String & merged_name, bool deduplicate, + bool force_ttl, ReplicatedMergeTreeLogEntryData * out_log_entry) { std::vector> exists_futures; @@ -2289,6 +2291,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts( entry.source_replica = replica_name; entry.new_part_name = merged_name; entry.deduplicate = deduplicate; + entry.force_ttl = force_ttl; entry.create_time = time(nullptr); for (const auto & part : parts) @@ -2999,6 +3002,8 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p return false; }; + bool force_ttl = (final && (hasTableTTL() || hasAnyColumnTTL())); + if (!partition && final) { DataPartsVector data_parts = getDataPartsVector(); @@ -3016,7 +3021,7 @@ bool StorageReplicatedMergeTree::optimize(const ASTPtr & query, const ASTPtr & p future_merged_part, disk_space, can_merge, partition_id, true, nullptr); ReplicatedMergeTreeLogEntryData merge_entry; if (selected && - !createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, &merge_entry)) + !createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate, force_ttl, &merge_entry)) return handle_noop("Can't create merge queue node in ZooKeeper"); if (merge_entry.type != ReplicatedMergeTreeLogEntryData::Type::EMPTY) merge_entries.push_back(std::move(merge_entry)); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 59afb96a523..878c5ce0619 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -424,6 +424,7 @@ private: const DataPartsVector & parts, const String & merged_name, bool deduplicate, + bool force_ttl, ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr); bool createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version); diff --git a/dbms/tests/queries/0_stateless/00976_ttl_with_old_parts.reference b/dbms/tests/queries/0_stateless/00976_ttl_with_old_parts.reference new file mode 100644 index 00000000000..301c460aba8 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00976_ttl_with_old_parts.reference @@ -0,0 +1,2 @@ +2100-10-10 3 +2100-10-10 4 diff --git a/dbms/tests/queries/0_stateless/00976_ttl_with_old_parts.sql b/dbms/tests/queries/0_stateless/00976_ttl_with_old_parts.sql new file mode 100644 index 00000000000..0ed69850cf6 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00976_ttl_with_old_parts.sql @@ -0,0 +1,16 @@ +drop table if exists ttl; + +create table ttl (d Date, a Int) engine = MergeTree order by a partition by toDayOfMonth(d); +insert into ttl values (toDateTime('2000-10-10 00:00:00'), 1); +insert into ttl values (toDateTime('2000-10-10 00:00:00'), 2); +insert into ttl values (toDateTime('2100-10-10 00:00:00'), 3); +insert into ttl values (toDateTime('2100-10-10 00:00:00'), 4); + +alter table ttl modify ttl d + interval 1 day; + +select sleep(1) format Null; -- wait if very fast merge happen +optimize table ttl partition 10 final; + +select * from ttl order by d; + +drop table if exists ttl;