mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Fixed concurrency problems in ALTER MODIFY TTL
.
This commit is contained in:
parent
89d74a599b
commit
4f1f9e7e42
@ -633,8 +633,10 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new
|
||||
|
||||
if (new_ttl_table_ast)
|
||||
{
|
||||
std::unique_lock<std::mutex> move_ttl_entries_lock;
|
||||
if (!only_check)
|
||||
{
|
||||
move_ttl_entries_lock = std::unique_lock<std::mutex>(move_ttl_entries_mutex);
|
||||
move_ttl_entries.clear();
|
||||
ttl_table_ast = nullptr;
|
||||
}
|
||||
@ -677,9 +679,7 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new
|
||||
}
|
||||
|
||||
if (!only_check)
|
||||
{
|
||||
move_ttl_entries.emplace_back(std::move(new_ttl_entry));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -3299,7 +3299,7 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_
|
||||
ReservationPtr reservation;
|
||||
|
||||
auto ttl_entry = selectTTLEntryForTTLInfos(ttl_infos, time_of_move);
|
||||
if (ttl_entry != nullptr)
|
||||
if (ttl_entry)
|
||||
{
|
||||
SpacePtr destination_ptr = ttl_entry->getDestination(storage_policy);
|
||||
if (!destination_ptr)
|
||||
@ -3358,27 +3358,28 @@ bool MergeTreeData::TTLEntry::isPartInDestination(const StoragePolicyPtr & polic
|
||||
return false;
|
||||
}
|
||||
|
||||
const MergeTreeData::TTLEntry * MergeTreeData::selectTTLEntryForTTLInfos(
|
||||
std::optional<MergeTreeData::TTLEntry> MergeTreeData::selectTTLEntryForTTLInfos(
|
||||
const MergeTreeDataPart::TTLInfos & ttl_infos,
|
||||
time_t time_of_move) const
|
||||
{
|
||||
const MergeTreeData::TTLEntry * result = nullptr;
|
||||
/// Prefer TTL rule which went into action last.
|
||||
time_t max_max_ttl = 0;
|
||||
std::vector<DB::MergeTreeData::TTLEntry>::const_iterator best_entry_it;
|
||||
|
||||
for (const auto & ttl_entry : move_ttl_entries)
|
||||
auto lock = std::lock_guard(move_ttl_entries_mutex);
|
||||
for (auto ttl_entry_it = move_ttl_entries.begin(); ttl_entry_it != move_ttl_entries.end(); ++ttl_entry_it)
|
||||
{
|
||||
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry.result_column);
|
||||
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry_it->result_column);
|
||||
/// Prefer TTL rule which went into action last.
|
||||
if (ttl_info_it != ttl_infos.moves_ttl.end()
|
||||
&& ttl_info_it->second.max <= time_of_move
|
||||
&& max_max_ttl <= ttl_info_it->second.max)
|
||||
{
|
||||
result = &ttl_entry;
|
||||
best_entry_it = ttl_entry_it;
|
||||
max_max_ttl = ttl_info_it->second.max;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
return max_max_ttl ? *best_entry_it : std::optional<MergeTreeData::TTLEntry>();
|
||||
}
|
||||
|
||||
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
|
||||
|
@ -737,12 +737,16 @@ public:
|
||||
bool isPartInDestination(const StoragePolicyPtr & policy, const MergeTreeDataPart & part) const;
|
||||
};
|
||||
|
||||
const TTLEntry * selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
|
||||
std::optional<TTLEntry> selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
|
||||
|
||||
using TTLEntriesByName = std::unordered_map<String, TTLEntry>;
|
||||
TTLEntriesByName column_ttl_entries_by_name;
|
||||
|
||||
TTLEntry ttl_table_entry;
|
||||
|
||||
/// This mutex is required for background move operations which do not obtain global locks.
|
||||
mutable std::mutex move_ttl_entries_mutex;
|
||||
|
||||
std::vector<TTLEntry> move_ttl_entries;
|
||||
|
||||
String sampling_expr_column_name;
|
||||
|
@ -127,14 +127,14 @@ bool MergeTreePartsMover::selectPartsForMove(
|
||||
if (!can_move(part, &reason))
|
||||
continue;
|
||||
|
||||
const MergeTreeData::TTLEntry * ttl_entry_ptr = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
|
||||
auto ttl_entry = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
|
||||
auto to_insert = need_to_move.find(part->disk);
|
||||
ReservationPtr reservation;
|
||||
if (ttl_entry_ptr)
|
||||
if (ttl_entry)
|
||||
{
|
||||
auto destination = ttl_entry_ptr->getDestination(policy);
|
||||
if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part))
|
||||
reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy));
|
||||
auto destination = ttl_entry->getDestination(policy);
|
||||
if (destination && !ttl_entry->isPartInDestination(policy, *part))
|
||||
reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry->getDestination(policy));
|
||||
}
|
||||
|
||||
if (reservation) /// Found reservation by TTL rule.
|
||||
|
Loading…
Reference in New Issue
Block a user