diff --git a/dbms/src/DataStreams/TTLBlockInputStream.cpp b/dbms/src/DataStreams/TTLBlockInputStream.cpp index 33c111cb5bf..e2a3a7ca03b 100644 --- a/dbms/src/DataStreams/TTLBlockInputStream.cpp +++ b/dbms/src/DataStreams/TTLBlockInputStream.cpp @@ -69,22 +69,19 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl) Block TTLBlockInputStream::readImpl() { + /// Skip all data if table ttl is expired for part + if (storage.hasTableTTL() && isTTLExpired(old_ttl_infos.table_ttl.max)) + { + rows_removed = data_part->rows_count; + return {}; + } + Block block = children.at(0)->read(); if (!block) return block; - if (storage.hasTableTTL()) - { - /// Skip all data if table ttl is expired for part - if (isTTLExpired(old_ttl_infos.table_ttl.max)) - { - rows_removed = data_part->rows_count; - return {}; - } - - if (force || isTTLExpired(old_ttl_infos.table_ttl.min)) - removeRowsWithExpiredTableTTL(block); - } + if (storage.hasTableTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min))) + removeRowsWithExpiredTableTTL(block); removeValuesWithExpiredColumnTTL(block); @@ -94,9 +91,9 @@ Block TTLBlockInputStream::readImpl() void TTLBlockInputStream::readSuffixImpl() { for (const auto & elem : new_ttl_infos.columns_ttl) - new_ttl_infos.updatePartMinTTL(elem.second.min); + new_ttl_infos.updatePartMinMaxTTL(elem.second.min, elem.second.max); - new_ttl_infos.updatePartMinTTL(new_ttl_infos.table_ttl.min); + new_ttl_infos.updatePartMinMaxTTL(new_ttl_infos.table_ttl.min, new_ttl_infos.table_ttl.max); data_part->ttl_infos = std::move(new_ttl_infos); data_part->empty_columns = std::move(empty_columns); diff --git a/dbms/src/Storages/MergeTree/MergeSelector.h b/dbms/src/Storages/MergeTree/MergeSelector.h index b01ef1d1d43..3c3cd8190ac 100644 --- a/dbms/src/Storages/MergeTree/MergeSelector.h +++ b/dbms/src/Storages/MergeTree/MergeSelector.h @@ -40,8 +40,11 @@ public: /// Opaque pointer to avoid dependencies (it is not possible to do forward declaration of typedef). const void * data; - /// Minimal time, when we need to delete some data from this part + /// Minimal time, when we need to delete some data from this part. time_t min_ttl; + + /// Maximum time, when we will need to drop this part altogether because all rows in it are expired. + time_t max_ttl; }; /// Parts are belong to partitions. Only parts within same partition could be merged. diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index ad489a91603..258d235b777 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -211,8 +211,11 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( part_info.level = part->info.level; part_info.data = ∂ part_info.min_ttl = part->ttl_infos.part_min_ttl; + part_info.max_ttl = part->ttl_infos.part_max_ttl; - if (part_info.min_ttl && part_info.min_ttl <= current_time) + time_t ttl = data_settings->ttl_only_drop_parts ? part_info.max_ttl : part_info.min_ttl; + + if (ttl && ttl <= current_time) has_part_with_expired_ttl = true; partitions.back().emplace_back(part_info); @@ -239,7 +242,7 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge( /// NOTE Could allow selection of different merge strategy. if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled()) { - merge_selector = std::make_unique(current_time); + merge_selector = std::make_unique(current_time, data_settings->ttl_only_drop_parts); last_merge_with_ttl = current_time; } else diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp index d3fe3231e05..39665f03c84 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.cpp @@ -12,11 +12,11 @@ void MergeTreeDataPartTTLInfos::update(const MergeTreeDataPartTTLInfos & other_i for (const auto & [name, ttl_info] : other_infos.columns_ttl) { columns_ttl[name].update(ttl_info); - updatePartMinTTL(ttl_info.min); + updatePartMinMaxTTL(ttl_info.min, ttl_info.max); } table_ttl.update(other_infos.table_ttl); - updatePartMinTTL(table_ttl.min); + updatePartMinMaxTTL(table_ttl.min, table_ttl.max); } void MergeTreeDataPartTTLInfos::read(ReadBuffer & in) @@ -37,7 +37,7 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in) String name = col["name"].getString(); columns_ttl.emplace(name, ttl_info); - updatePartMinTTL(ttl_info.min); + updatePartMinMaxTTL(ttl_info.min, ttl_info.max); } } if (json.has("table")) @@ -46,7 +46,7 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in) table_ttl.min = table["min"].getUInt(); table_ttl.max = table["max"].getUInt(); - updatePartMinTTL(table_ttl.min); + updatePartMinMaxTTL(table_ttl.min, table_ttl.max); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h b/dbms/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h index ff181aa29a4..71a7c9f602f 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartTTLInfo.h @@ -36,15 +36,19 @@ struct MergeTreeDataPartTTLInfos std::unordered_map columns_ttl; MergeTreeDataPartTTLInfo table_ttl; time_t part_min_ttl = 0; + time_t part_max_ttl = 0; void read(ReadBuffer & in); void write(WriteBuffer & out) const; void update(const MergeTreeDataPartTTLInfos & other_infos); - void updatePartMinTTL(time_t time) + void updatePartMinMaxTTL(time_t time_min, time_t time_max) { - if (time && (!part_min_ttl || time < part_min_ttl)) - part_min_ttl = time; + if (time_min && (!part_min_ttl || time_min < part_min_ttl)) + part_min_ttl = time_min; + + if (time_max && (!part_max_ttl || time_max > part_max_ttl)) + part_max_ttl = time_max; } }; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index d7a48171499..8515348a4b5 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -99,7 +99,7 @@ void updateTTL(const MergeTreeData::TTLEntry & ttl_entry, MergeTreeDataPart::TTL else throw Exception("Unexpected type of result ttl column", ErrorCodes::LOGICAL_ERROR); - ttl_infos.updatePartMinTTL(ttl_info.min); + ttl_infos.updatePartMinMaxTTL(ttl_info.min, ttl_info.max); } } diff --git a/dbms/src/Storages/MergeTree/MergeTreeSettings.h b/dbms/src/Storages/MergeTree/MergeTreeSettings.h index 6ba08fed5da..bdb062a5981 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeSettings.h +++ b/dbms/src/Storages/MergeTree/MergeTreeSettings.h @@ -82,6 +82,7 @@ struct MergeTreeSettings : public SettingsCollection M(SettingUInt64, min_merge_bytes_to_use_direct_io, 10ULL * 1024 * 1024 * 1024, "Minimal amount of bytes to enable O_DIRECT in merge (0 - disabled).") \ IM(SettingUInt64, index_granularity_bytes, 10 * 1024 * 1024, "Approximate amount of bytes in single granule (0 - disabled).") \ M(SettingInt64, merge_with_ttl_timeout, 3600 * 24, "Minimal time in seconds, when merge with TTL can be repeated.") \ + M(SettingBool, ttl_only_drop_parts, false, "Only drop altogether the expired parts and not partially prune them.") \ M(SettingBool, write_final_mark, 1, "Write final mark after end of column (0 - disabled, do nothing if index_granularity_bytes=0)") \ M(SettingBool, enable_mixed_granularity_parts, 0, "Enable parts with adaptive and non adaptive granularity") \ M(SettingMaxThreads, max_part_loading_threads, 0, "The number of theads to load data parts at startup.") \ diff --git a/dbms/src/Storages/MergeTree/TTLMergeSelector.cpp b/dbms/src/Storages/MergeTree/TTLMergeSelector.cpp index 3669bd1e2cc..0ba341fca64 100644 --- a/dbms/src/Storages/MergeTree/TTLMergeSelector.cpp +++ b/dbms/src/Storages/MergeTree/TTLMergeSelector.cpp @@ -20,9 +20,11 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select( { for (auto it = partitions[i].begin(); it != partitions[i].end(); ++it) { - if (it->min_ttl && (partition_to_merge_index == -1 || it->min_ttl < partition_to_merge_min_ttl)) + time_t ttl = only_drop_parts ? it->max_ttl : it->min_ttl; + + if (ttl && (partition_to_merge_index == -1 || ttl < partition_to_merge_min_ttl)) { - partition_to_merge_min_ttl = it->min_ttl; + partition_to_merge_min_ttl = ttl; partition_to_merge_index = i; best_begin = it; } @@ -38,7 +40,9 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select( while (true) { - if (!best_begin->min_ttl || best_begin->min_ttl > current_time + time_t ttl = only_drop_parts ? best_begin->max_ttl : best_begin->min_ttl; + + if (!ttl || ttl > current_time || (max_total_size_to_merge && total_size > max_total_size_to_merge)) { ++best_begin; @@ -54,7 +58,9 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select( while (best_end != best_partition.end()) { - if (!best_end->min_ttl || best_end->min_ttl > current_time + time_t ttl = only_drop_parts ? best_end->max_ttl : best_end->min_ttl; + + if (!ttl || ttl > current_time || (max_total_size_to_merge && total_size > max_total_size_to_merge)) break; diff --git a/dbms/src/Storages/MergeTree/TTLMergeSelector.h b/dbms/src/Storages/MergeTree/TTLMergeSelector.h index 3c035d03a99..2f03d5b9feb 100644 --- a/dbms/src/Storages/MergeTree/TTLMergeSelector.h +++ b/dbms/src/Storages/MergeTree/TTLMergeSelector.h @@ -14,13 +14,14 @@ namespace DB class TTLMergeSelector : public IMergeSelector { public: - explicit TTLMergeSelector(time_t current_time_) : current_time(current_time_) {} + explicit TTLMergeSelector(time_t current_time_, bool only_drop_parts_) : current_time(current_time_), only_drop_parts(only_drop_parts_) {} PartsInPartition select( const Partitions & partitions, const size_t max_total_size_to_merge) override; private: time_t current_time; + bool only_drop_parts; }; }