diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index 23d7b6cb87e..7e43d8e64d0 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -82,6 +82,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &stat); UInt64 min_saved_log_pointer = std::numeric_limits::max(); + UInt64 min_inactive_log_pointer = std::numeric_limits::max(); Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/log"); @@ -94,6 +95,8 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() ? entries.size() - storage.data.settings.max_replicated_logs_to_keep.value : 0]; + std::unordered_set recovering_replicas; + std::unordered_map host_versions_inactive_replicas; std::unordered_map log_pointers_lost_replicas; @@ -111,13 +114,25 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() /// Check status of replica (active or not). /// If replica was not active, we could check when it's log_pointer locates. + + String res; + + bool new_version_of_replica = zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", res); + if (zookeeper->exists(storage.zookeeper_path + "/replicas/" + replica + "/is_active")) - min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); + if (new_version_of_replica && res == "1") + recovering_replicas.insert(replica); + else + min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); else { - String res; - if (!zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", res)) + if (!new_version_of_replica) + { + /// Only to support old versions CH. + /// If replica did not have "/is_lost" we must save it's log_pointer. + /// Because old version CH can not work with recovering. min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); + } else if (res == "0") { @@ -128,15 +143,27 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() { host_versions_inactive_replicas[replica] = host_stat.version; log_pointers_lost_replicas[replica] = log_pointer_str; + min_inactive_log_pointer = std::min(min_saved_log_pointer, log_pointer); } } - /// Only to support old versions CH. else ++replicas_were_marked_is_lost; host_versions_inactive_replicas[replica] = host_stat.version; } } + /// We must check log_pointer recovering replicas at the end. + /// Because log pointer recovering replicas can move backward. + for (const String & replica : recovering_replicas) + { + String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer"); + UInt32 log_pointer = parse(pointer); + min_saved_log_pointer = std::min(min_saved_log_pointer, log_pointer); + } + + if (recovering_replicas.size() != 0) + min_saved_log_pointer = std::min(min_saved_log_pointer, min_inactive_log_pointer); + /// We will not touch the last `min_replicated_logs_to_keep` records. entries.erase(entries.end() - std::min(entries.size(), storage.data.settings.min_replicated_logs_to_keep.value), entries.end()); /// We will not touch records that are no less than `min_pointer_active_replica`. @@ -154,9 +181,9 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size()) { - /// we need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer. - for (auto host_version: host_versions_inactive_replicas) - ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second)); + /// we need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer. + for (auto host_version: host_versions_inactive_replicas) + ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second)); /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list. ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version)); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 33dafccf696..77825b8a0a1 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -1999,7 +1999,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku if (error == ZooKeeperImpl::ZooKeeper::ZBADVERSION) throw Exception("Can not clone replica, because a " + source_path + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED); else if (error == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS) - throw Exception("Can not clone replica, because the clickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED); + throw Exception("Can not clone replica, because the ClickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED); else zkutil::KeeperMultiException::check(error, ops, resp);