diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 502c6215a9a..d3496d99cef 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -307,8 +307,9 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat { String node; Int64 ctime = 0; + Int32 version = 0; - NodeWithStat(String node_, Int64 ctime_) : node(std::move(node_)), ctime(ctime_) {} + NodeWithStat(String node_, Int64 ctime_, Int32 version_) : node(std::move(node_)), ctime(ctime_), version(version_) {} static bool greaterByTime(const NodeWithStat & lhs, const NodeWithStat & rhs) { @@ -334,7 +335,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() current_time - static_cast(1000 * storage_settings->replicated_deduplication_window_seconds)); /// Virtual node, all nodes that are "greater" than this one will be deleted - NodeWithStat block_threshold{{}, time_threshold}; + NodeWithStat block_threshold{{}, time_threshold, 0}; size_t current_deduplication_window = std::min(timed_blocks.size(), storage_settings->replicated_deduplication_window); auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window; @@ -355,7 +356,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() for (auto it = first_outdated_block; it != timed_blocks.end(); ++it) { String path = storage.zookeeper_path + "/blocks/" + it->node; - try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path)); + try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path, it->version)); } for (auto & pair : try_remove_futures) @@ -368,7 +369,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks() zookeeper->removeRecursive(path); cached_block_stats.erase(first_outdated_block->node); } - else if (rc == Coordination::Error::ZOK || rc == Coordination::Error::ZNONODE) + else if (rc == Coordination::Error::ZOK || rc == Coordination::Error::ZNONODE || rc == Coordination::Error::ZBADVERSION) { /// No node is Ok. Another replica is removing nodes concurrently. /// Successfully removed blocks have to be removed from cache @@ -426,7 +427,8 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & else { /// Cached block - timed_blocks.emplace_back(block, it->second); + const auto & ctime_and_version = it->second; + timed_blocks.emplace_back(block, ctime_and_version.first, ctime_and_version.second); } } @@ -436,8 +438,8 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper & auto status = elem.second.get(); if (status.error != Coordination::Error::ZNONODE) { - cached_block_stats.emplace(elem.first, status.stat.ctime); - timed_blocks.emplace_back(elem.first, status.stat.ctime); + cached_block_stats.emplace(elem.first, std::make_pair(status.stat.ctime, status.stat.version)); + timed_blocks.emplace_back(elem.first, status.stat.ctime, status.stat.version); } } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index 520af888621..939a40db8c8 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -58,8 +58,8 @@ private: /// Remove old mutations that are done from ZooKeeper. This is done by the leader replica. void clearOldMutations(); - using NodeCTimeCache = std::map; - NodeCTimeCache cached_block_stats; + using NodeCTimeAndVersionCache = std::map>; + NodeCTimeAndVersionCache cached_block_stats; struct NodeWithStat; /// Returns list of blocks (with their stat) sorted by ctime in descending order.