From 76c4ac9f607cbb5f7f30e62bfdf91674787c2c99 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Tue, 29 Oct 2019 02:01:07 +0300 Subject: [PATCH] Fixed ttl move logic in background move task and added ttl enforcement on inserts. --- .../MergeTree/MergeTreeDataWriter.cpp | 55 +++++++++++++++++-- .../MergeTree/MergeTreePartsMover.cpp | 10 ++-- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp index cb48e13ffc1..3e89be98d17 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -211,10 +211,56 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa else part_name = new_part_info.getPartName(); - /// Size of part would not be grater than block.bytes() + epsilon - size_t expected_size = block.bytes(); - auto reservation = data.reserveSpace(expected_size); + auto current_time = time(nullptr); + const MergeTreeData::MoveTTLEntry * best_ttl_entry = nullptr; + time_t max_min_ttl = 0; + DB::MergeTreeDataPart::TTLInfos move_ttl_infos; + for (const auto & [expression, ttl_entry] : data.move_ttl_entries_by_name) + { + auto & ttl_info = move_ttl_infos.moves_ttl[expression]; + updateTTL(ttl_entry, move_ttl_infos, ttl_info, block); + if (ttl_info.min > current_time && max_min_ttl < ttl_info.min) + { + best_ttl_entry = &ttl_entry; + max_min_ttl = ttl_info.min; + } + } + + DiskSpace::ReservationPtr reservation; + /// Size of part would not be greater than block.bytes() + epsilon + size_t expected_size = block.bytes(); + if (best_ttl_entry != nullptr) + { + if (best_ttl_entry->destination_type == ASTTTLElement::DestinationType::VOLUME) + { + auto volume_ptr = data.getStoragePolicy()->getVolumeByName(best_ttl_entry->destination_name); + if (volume_ptr) + { + reservation = volume_ptr->reserve(expected_size); + } + else + { + /// FIXME: log warning + } + } + else if (best_ttl_entry->destination_type == ASTTTLElement::DestinationType::DISK) + { + auto disk_ptr = data.getStoragePolicy()->getDiskByName(best_ttl_entry->destination_name); + if (disk_ptr) + { + reservation = disk_ptr->reserve(expected_size); + } + else + { + /// FIXME: log warning + } + } + } + if (!reservation) + { + reservation = data.reserveSpace(expected_size); + } MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared(data, reservation->getDisk(), part_name, new_part_info); @@ -270,8 +316,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa for (const auto & [name, ttl_entry] : data.ttl_entries_by_name) updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block); - for (const auto & [expression, ttl_entry] : data.move_ttl_entries_by_name) - updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.moves_ttl[expression], block); + new_data_part->ttl_infos.update(move_ttl_infos); /// This effectively chooses minimal compression method: /// either default lz4 or compression method with zero thresholds on absolute and relative part size. diff --git a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp index 405a3b0c3ce..69ef7d88602 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -113,16 +113,16 @@ bool MergeTreePartsMover::selectPartsForMove( const auto ttl_entries_end = part->storage.move_ttl_entries_by_name.end(); auto best_ttl_entry_it = ttl_entries_end; - time_t max_max_ttl = 0; + time_t max_min_ttl = 0; for (auto & [name, ttl_info] : part->ttl_infos.moves_ttl) { auto move_ttl_entry_it = part->storage.move_ttl_entries_by_name.find(name); if (move_ttl_entry_it != part->storage.move_ttl_entries_by_name.end()) { - if (ttl_info.max < current_time && max_max_ttl < ttl_info.max) + if (ttl_info.min > current_time && max_min_ttl < ttl_info.min) { best_ttl_entry_it = move_ttl_entry_it; - max_max_ttl = ttl_info.max; + max_min_ttl = ttl_info.min; } } } @@ -143,7 +143,7 @@ bool MergeTreePartsMover::selectPartsForMove( } else { - /// FIXME: log error + /// FIXME: log warning? } } else if (move_ttl_entry.destination_type == ASTTTLElement::DestinationType::DISK) @@ -160,7 +160,7 @@ bool MergeTreePartsMover::selectPartsForMove( } else { - /// FIXME: log error + /// FIXME: log warning? } } }