mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Merge pull request #23301 from ClickHouse/fix_race_in_deduplication
Deduplication: check block version on removal
This commit is contained in:
commit
20e897bc5a
@ -307,8 +307,9 @@ struct ReplicatedMergeTreeCleanupThread::NodeWithStat
|
||||
{
|
||||
String node;
|
||||
Int64 ctime = 0;
|
||||
Int32 version = 0;
|
||||
|
||||
NodeWithStat(String node_, Int64 ctime_) : node(std::move(node_)), ctime(ctime_) {}
|
||||
NodeWithStat(String node_, Int64 ctime_, Int32 version_) : node(std::move(node_)), ctime(ctime_), version(version_) {}
|
||||
|
||||
static bool greaterByTime(const NodeWithStat & lhs, const NodeWithStat & rhs)
|
||||
{
|
||||
@ -334,7 +335,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
|
||||
current_time - static_cast<Int64>(1000 * storage_settings->replicated_deduplication_window_seconds));
|
||||
|
||||
/// Virtual node, all nodes that are "greater" than this one will be deleted
|
||||
NodeWithStat block_threshold{{}, time_threshold};
|
||||
NodeWithStat block_threshold{{}, time_threshold, 0};
|
||||
|
||||
size_t current_deduplication_window = std::min<size_t>(timed_blocks.size(), storage_settings->replicated_deduplication_window);
|
||||
auto first_outdated_block_fixed_threshold = timed_blocks.begin() + current_deduplication_window;
|
||||
@ -355,7 +356,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
|
||||
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
|
||||
{
|
||||
String path = storage.zookeeper_path + "/blocks/" + it->node;
|
||||
try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path));
|
||||
try_remove_futures.emplace_back(path, zookeeper->asyncTryRemove(path, it->version));
|
||||
}
|
||||
|
||||
for (auto & pair : try_remove_futures)
|
||||
@ -368,7 +369,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
|
||||
zookeeper->removeRecursive(path);
|
||||
cached_block_stats.erase(first_outdated_block->node);
|
||||
}
|
||||
else if (rc == Coordination::Error::ZOK || rc == Coordination::Error::ZNONODE)
|
||||
else if (rc == Coordination::Error::ZOK || rc == Coordination::Error::ZNONODE || rc == Coordination::Error::ZBADVERSION)
|
||||
{
|
||||
/// No node is Ok. Another replica is removing nodes concurrently.
|
||||
/// Successfully removed blocks have to be removed from cache
|
||||
@ -426,7 +427,8 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
|
||||
else
|
||||
{
|
||||
/// Cached block
|
||||
timed_blocks.emplace_back(block, it->second);
|
||||
const auto & ctime_and_version = it->second;
|
||||
timed_blocks.emplace_back(block, ctime_and_version.first, ctime_and_version.second);
|
||||
}
|
||||
}
|
||||
|
||||
@ -436,8 +438,8 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
|
||||
auto status = elem.second.get();
|
||||
if (status.error != Coordination::Error::ZNONODE)
|
||||
{
|
||||
cached_block_stats.emplace(elem.first, status.stat.ctime);
|
||||
timed_blocks.emplace_back(elem.first, status.stat.ctime);
|
||||
cached_block_stats.emplace(elem.first, std::make_pair(status.stat.ctime, status.stat.version));
|
||||
timed_blocks.emplace_back(elem.first, status.stat.ctime, status.stat.version);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -58,8 +58,8 @@ private:
|
||||
/// Remove old mutations that are done from ZooKeeper. This is done by the leader replica.
|
||||
void clearOldMutations();
|
||||
|
||||
using NodeCTimeCache = std::map<String, Int64>;
|
||||
NodeCTimeCache cached_block_stats;
|
||||
using NodeCTimeAndVersionCache = std::map<String, std::pair<Int64, Int32>>;
|
||||
NodeCTimeAndVersionCache cached_block_stats;
|
||||
|
||||
struct NodeWithStat;
|
||||
/// Returns list of blocks (with their stat) sorted by ctime in descending order.
|
||||
|
Loading…
Reference in New Issue
Block a user