diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 04d95dd151a..50afbecc5d1 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -633,8 +633,10 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new if (new_ttl_table_ast) { + std::unique_lock move_ttl_entries_lock; if (!only_check) { + move_ttl_entries_lock = std::unique_lock(move_ttl_entries_mutex); move_ttl_entries.clear(); ttl_table_ast = nullptr; } @@ -677,9 +679,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new } if (!only_check) - { move_ttl_entries.emplace_back(std::move(new_ttl_entry)); - } } } } @@ -3299,7 +3299,7 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_ ReservationPtr reservation; auto ttl_entry = selectTTLEntryForTTLInfos(ttl_infos, time_of_move); - if (ttl_entry != nullptr) + if (ttl_entry) { SpacePtr destination_ptr = ttl_entry->getDestination(storage_policy); if (!destination_ptr) @@ -3358,27 +3358,28 @@ bool MergeTreeData::TTLEntry::isPartInDestination(const StoragePolicyPtr & polic return false; } -const MergeTreeData::TTLEntry * MergeTreeData::selectTTLEntryForTTLInfos( +std::optional MergeTreeData::selectTTLEntryForTTLInfos( const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const { - const MergeTreeData::TTLEntry * result = nullptr; - /// Prefer TTL rule which went into action last. time_t max_max_ttl = 0; + std::vector::const_iterator best_entry_it; - for (const auto & ttl_entry : move_ttl_entries) + auto lock = std::lock_guard(move_ttl_entries_mutex); + for (auto ttl_entry_it = move_ttl_entries.begin(); ttl_entry_it != move_ttl_entries.end(); ++ttl_entry_it) { - auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry.result_column); + auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry_it->result_column); + /// Prefer TTL rule which went into action last. if (ttl_info_it != ttl_infos.moves_ttl.end() && ttl_info_it->second.max <= time_of_move && max_max_ttl <= ttl_info_it->second.max) { - result = &ttl_entry; + best_entry_it = ttl_entry_it; max_max_ttl = ttl_info_it->second.max; } } - return result; + return max_max_ttl ? *best_entry_it : std::optional(); } MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index ca2730a8aef..628420cf692 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -737,12 +737,16 @@ public: bool isPartInDestination(const StoragePolicyPtr & policy, const MergeTreeDataPart & part) const; }; - const TTLEntry * selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const; + std::optional selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const; using TTLEntriesByName = std::unordered_map; TTLEntriesByName column_ttl_entries_by_name; TTLEntry ttl_table_entry; + + /// This mutex is required for background move operations which do not obtain global locks. + mutable std::mutex move_ttl_entries_mutex; + std::vector move_ttl_entries; String sampling_expr_column_name; diff --git a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp index 551a0de1338..0e3538409e9 100644 --- a/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -127,14 +127,14 @@ bool MergeTreePartsMover::selectPartsForMove( if (!can_move(part, &reason)) continue; - const MergeTreeData::TTLEntry * ttl_entry_ptr = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move); + auto ttl_entry = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move); auto to_insert = need_to_move.find(part->disk); ReservationPtr reservation; - if (ttl_entry_ptr) + if (ttl_entry) { - auto destination = ttl_entry_ptr->getDestination(policy); - if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part)) - reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy)); + auto destination = ttl_entry->getDestination(policy); + if (destination && !ttl_entry->isPartInDestination(policy, *part)) + reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry->getDestination(policy)); } if (reservation) /// Found reservation by TTL rule.