remove outdated parts immediately on drop partition

This commit is contained in:
Alexander Tokmakov 2022-04-13 18:01:22 +02:00
parent 457a9e9691
commit 66fdf35dfd
4 changed files with 26 additions and 11 deletions

View File

@ -2878,8 +2878,7 @@ void MergeTreeData::removePartsFromWorkingSet(
} }
MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet( MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock)
bool clear_without_timeout, DataPartsLock & lock)
{ {
DataPartsVector parts_to_remove; DataPartsVector parts_to_remove;
@ -2947,6 +2946,13 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(
parts_to_remove.emplace_back(part); parts_to_remove.emplace_back(part);
} }
bool clear_without_timeout = true;
/// We a going to remove active parts covered by drop_range without timeout.
/// Let's also reset timeout for inactive parts.
auto inactive_parts_to_remove_immediately = getDataPartsVectorInPartitionForInternalUsage(DataPartState::Outdated, drop_range.partition_id, &lock);
for (auto & part : inactive_parts_to_remove_immediately)
part->remove_time.store(0, std::memory_order_relaxed);
/// FIXME refactor removePartsFromWorkingSet(...), do not remove parts twice /// FIXME refactor removePartsFromWorkingSet(...), do not remove parts twice
removePartsFromWorkingSet(txn, parts_to_remove, clear_without_timeout, lock); removePartsFromWorkingSet(txn, parts_to_remove, clear_without_timeout, lock);
@ -3382,7 +3388,8 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartiti
return getVisibleDataPartsVectorInPartition(local_context->getCurrentTransaction().get(), partition_id); return getVisibleDataPartsVectorInPartition(local_context->getCurrentTransaction().get(), partition_id);
} }
MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartition(MergeTreeTransaction * txn, const String & partition_id, DataPartsLock * acquired_lock) const MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartition(
MergeTreeTransaction * txn, const String & partition_id, DataPartsLock * acquired_lock) const
{ {
if (txn) if (txn)
{ {
@ -3398,7 +3405,13 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartiti
return res; return res;
} }
DataPartStateAndPartitionID state_with_partition{MergeTreeDataPartState::Active, partition_id}; return getDataPartsVectorInPartitionForInternalUsage(MergeTreeDataPartState::Active, partition_id, acquired_lock);
}
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartitionForInternalUsage(
const MergeTreeData::DataPartState & state, const String & partition_id, DataPartsLock * acquired_lock) const
{
DataPartStateAndPartitionID state_with_partition{state, partition_id};
auto lock = (acquired_lock) ? DataPartsLock() : lockParts(); auto lock = (acquired_lock) ? DataPartsLock() : lockParts();
return DataPartsVector( return DataPartsVector(

View File

@ -494,6 +494,8 @@ public:
DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const; DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const;
DataPartsVector getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set<String> & partition_ids) const; DataPartsVector getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set<String> & partition_ids) const;
DataPartsVector getDataPartsVectorInPartitionForInternalUsage(const DataPartState & state, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
/// Returns the part with the given name and state or nullptr if no such part. /// Returns the part with the given name and state or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states); DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states); DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states);
@ -577,7 +579,7 @@ public:
/// Removes all parts from the working set parts /// Removes all parts from the working set parts
/// for which (partition_id = drop_range.partition_id && min_block >= drop_range.min_block && max_block <= drop_range.max_block). /// for which (partition_id = drop_range.partition_id && min_block >= drop_range.min_block && max_block <= drop_range.max_block).
/// Used in REPLACE PARTITION command; /// Used in REPLACE PARTITION command;
DataPartsVector removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, bool clear_without_timeout, DataPartsVector removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range,
DataPartsLock & lock); DataPartsLock & lock);
/// Restores Outdated part and adds it to working set /// Restores Outdated part and adds it to working set

View File

@ -1618,7 +1618,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
/// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block /// If it is REPLACE (not ATTACH), remove all parts which max_block_number less then min_block_number of the first new block
if (replace) if (replace)
removePartsInRangeFromWorkingSet(local_context->getCurrentTransaction().get(), drop_range, true, data_parts_lock); removePartsInRangeFromWorkingSet(local_context->getCurrentTransaction().get(), drop_range, data_parts_lock);
} }
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed()); PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());

View File

@ -1886,7 +1886,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
DataPartsVector parts_to_remove; DataPartsVector parts_to_remove;
{ {
auto data_parts_lock = lockParts(); auto data_parts_lock = lockParts();
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range_info, true, data_parts_lock); parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range_info, data_parts_lock);
if (parts_to_remove.empty()) if (parts_to_remove.empty())
{ {
if (!drop_range_info.isFakeDropRangePart()) if (!drop_range_info.isFakeDropRangePart())
@ -2019,7 +2019,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
if (parts_to_add.empty() && replace) if (parts_to_add.empty() && replace)
{ {
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, true, data_parts_lock); parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
String parts_to_remove_str; String parts_to_remove_str;
for (const auto & part : parts_to_remove) for (const auto & part : parts_to_remove)
{ {
@ -2257,7 +2257,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
transaction.commit(&data_parts_lock); transaction.commit(&data_parts_lock);
if (replace) if (replace)
{ {
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, true, data_parts_lock); parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
String parts_to_remove_str; String parts_to_remove_str;
for (const auto & part : parts_to_remove) for (const auto & part : parts_to_remove)
{ {
@ -6487,7 +6487,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
auto data_parts_lock = lockParts(); auto data_parts_lock = lockParts();
transaction.commit(&data_parts_lock); transaction.commit(&data_parts_lock);
if (replace) if (replace)
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, true, data_parts_lock); parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, data_parts_lock);
} }
PartLog::addNewParts(getContext(), dst_parts, watch.elapsed()); PartLog::addNewParts(getContext(), dst_parts, watch.elapsed());
@ -6694,7 +6694,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
else else
zkutil::KeeperMultiException::check(code, ops, op_results); zkutil::KeeperMultiException::check(code, ops, op_results);
parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, true, lock); parts_to_remove = removePartsInRangeFromWorkingSet(NO_TRANSACTION_RAW, drop_range, lock);
transaction.commit(&lock); transaction.commit(&lock);
} }