fix accessing part in Deleting state

This commit is contained in:
Alexander Tokmakov 2022-10-30 17:30:51 +01:00
parent 88f3db9115
commit c68ab231f9
4 changed files with 46 additions and 15 deletions

View File

@ -3142,7 +3142,7 @@ void MergeTreeData::removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn,
removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(txn, drop_range, lock);
}
MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
MergeTreeData::PartsToRemoveFromZooKeeper MergeTreeData::removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock)
{
DataPartsVector parts_to_remove;
@ -3220,15 +3220,20 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSetAn
/// FIXME refactor removePartsFromWorkingSet(...), do not remove parts twice
removePartsFromWorkingSet(txn, parts_to_remove, clear_without_timeout, lock);
/// Since we can return parts in Deleting state, we have to use a wrapper that restricts access to such parts.
PartsToRemoveFromZooKeeper parts_to_remove_from_zookeeper;
for (auto & part : parts_to_remove)
parts_to_remove_from_zookeeper.emplace_back(std::move(part));
for (auto & part : inactive_parts_to_remove_immediately)
{
if (!drop_range.contains(part->info))
continue;
part->remove_time.store(0, std::memory_order_relaxed);
parts_to_remove.push_back(std::move(part));
parts_to_remove_from_zookeeper.emplace_back(std::move(part), /* was_active */ false);
}
return parts_to_remove;
return parts_to_remove_from_zookeeper;
}
void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLock * acquired_lock)

View File

@ -584,10 +584,33 @@ public:
/// Used in REPLACE PARTITION command.
void removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
/// This wrapper is required to restrict access to parts in Deleting state
class PartToRemoveFromZooKeeper
{
DataPartPtr part;
bool was_active;
public:
PartToRemoveFromZooKeeper(DataPartPtr && part_, bool was_active_ = true)
: part(std::move(part_)), was_active(was_active_)
{
}
/// It's s to get name of any part
const String & getPartName() const { return part->name; }
DataPartPtr getPartIfItWasActive() const
{
return was_active ? part : nullptr;
}
};
using PartsToRemoveFromZooKeeper = std::vector<PartToRemoveFromZooKeeper>;
/// Same as above, but also returns list of parts to remove from ZooKeeper.
/// It includes parts that have been just removed by these method
/// and Outdated parts covered by drop_range that were removed earlier for any reason.
DataPartsVector removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
PartsToRemoveFromZooKeeper removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
/// Restores Outdated part and adds it to working set

View File

@ -1827,7 +1827,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
/// Therefore, we use all data parts.
auto metadata_snapshot = getInMemoryMetadataPtr();
DataPartsVector parts_to_remove;
PartsToRemoveFromZooKeeper parts_to_remove;
{
auto data_parts_lock = lockParts();
parts_to_remove = removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(NO_TRANSACTION_RAW, drop_range_info, data_parts_lock);
@ -1849,8 +1849,11 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
/// If DETACH clone parts to detached/ directory
for (const auto & part : parts_to_remove)
{
LOG_INFO(log, "Detaching {}", part->getDataPartStorage().getPartDirectory());
part->makeCloneInDetached("", metadata_snapshot);
if (auto part_to_detach = part.getPartIfItWasActive())
{
LOG_INFO(log, "Detaching {}", part_to_detach->getDataPartStorage().getPartDirectory());
part_to_detach->makeCloneInDetached("", metadata_snapshot);
}
}
}
@ -1941,7 +1944,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
PartDescriptions all_parts;
PartDescriptions parts_to_add;
DataPartsVector parts_to_remove;
PartsToRemoveFromZooKeeper parts_to_remove;
auto table_lock_holder_dst_table = lockForShare(
RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -1972,7 +1975,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
String parts_to_remove_str;
for (const auto & part : parts_to_remove)
{
parts_to_remove_str += part->name;
parts_to_remove_str += part.getPartName();
parts_to_remove_str += " ";
}
LOG_TRACE(log, "Replacing {} parts {}with empty set", parts_to_remove.size(), parts_to_remove_str);
@ -2248,7 +2251,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
String parts_to_remove_str;
for (const auto & part : parts_to_remove)
{
parts_to_remove_str += part->name;
parts_to_remove_str += part.getPartName();
parts_to_remove_str += " ";
}
LOG_TRACE(log, "Replacing {} parts {}with {} parts {}", parts_to_remove.size(), parts_to_remove_str,
@ -6230,11 +6233,11 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
}
void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries)
void StorageReplicatedMergeTree::removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries)
{
Strings part_names_to_remove;
for (const auto & part : parts)
part_names_to_remove.emplace_back(part->name);
part_names_to_remove.emplace_back(part.getPartName());
return removePartsFromZooKeeperWithRetries(part_names_to_remove, max_retries);
}
@ -6561,7 +6564,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
if (replace)
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
PartsToRemoveFromZooKeeper parts_to_remove;
Coordination::Responses op_results;
try
@ -6797,7 +6800,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
clearBlocksInPartition(*zookeeper, drop_range.partition_id, drop_range.max_block, drop_range.max_block);
DataPartsVector parts_to_remove;
PartsToRemoveFromZooKeeper parts_to_remove;
Coordination::Responses op_results;
try

View File

@ -549,7 +549,7 @@ private:
/// Remove parts from ZooKeeper, throw exception if unable to do so after max_retries.
void removePartsFromZooKeeperWithRetries(const Strings & part_names, size_t max_retries = 5);
void removePartsFromZooKeeperWithRetries(DataPartsVector & parts, size_t max_retries = 5);
void removePartsFromZooKeeperWithRetries(PartsToRemoveFromZooKeeper & parts, size_t max_retries = 5);
/// Removes a part from ZooKeeper and adds a task to the queue to download it. It is supposed to do this with broken parts.
void removePartAndEnqueueFetch(const String & part_name);