diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index e626e9c9d4b..9c1d84380c7 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -95,11 +95,13 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() : 0]; std::unordered_map host_versions_inactive_replicas; - std::unordered_map log_pointers_inactive_replicas; + std::unordered_map log_pointers_lost_replicas; + + size_t replicas_were_marked_is_lost = 0; for (const String & replica : replicas) { - zkutil::Stat host_stat + zkutil::Stat host_stat; zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/host", &host_stat); String pointer = zookeeper->get(storage.zookeeper_path + "/replicas/" + replica + "/log_pointer"); if (pointer.empty()) @@ -125,10 +127,12 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() else { host_versions_inactive_replicas[replica] = host_stat.version; - log_pointers_inactive_replicas[replica] = log_pointer_str; + log_pointers_lost_replicas[replica] = log_pointer_str; } } + /// Only to support old versions CH. else + ++replicas_were_marked_is_lost; host_versions_inactive_replicas[replica] = host_stat.version; } } @@ -141,19 +145,19 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() if (entries.empty()) return; - markLostReplicas(host_versions_inactive_replicas, log_pointers_inactive_replicas, replicas.size(), entries.back(), zookeeper); + markLostReplicas(host_versions_inactive_replicas, log_pointers_lost_replicas, replicas.size() - replicas_were_marked_is_lost, zookeeper); zkutil::Requests ops; for (size_t i = 0; i < entries.size(); ++i) { ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/log/" + entries[i], -1)); - /// we need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer. - for (auto host_version: host_versions_inactive_replicas) - ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second)); - if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size()) { + /// we need to check this because the replica that was restored from one of the marked replicas does not copy a non-valid log_pointer. + for (auto host_version: host_versions_inactive_replicas) + ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas" + host_version.first + "/host", host_version.second)); + /// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list. ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", stat.version)); zookeeper->multi(ops); @@ -166,38 +170,41 @@ void ReplicatedMergeTreeCleanupThread::clearOldLogs() void ReplicatedMergeTreeCleanupThread::markLostReplicas(const std::unordered_map & host_versions_inactive_replicas, - const std::unordered_map & log_pointers_inactive_replicas, - size_t replicas_count, const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper) + const std::unordered_map & log_pointers_lost_replicas, + size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper) { - std::vector requests; - std::vector futures; + struct LostReplicaInfo { + String name; + zkutil::Requests requests; + }; - for (auto pair : log_pointers_inactive_replicas) + std::vector requests; + std::vector lost_replicas_info; + std::vector> info_and_future; + + for (auto pair : log_pointers_lost_replicas) { String replica = pair.first; - if (pair.second <= remove_border) - { - zkutil::Requests ops; - /// If host changed version we can not mark replicas, because replica started to be active. - ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_inactive_replicas.at(replica))); - ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1)); - requests.push_back(ops); - } + zkutil::Requests ops; + /// If host changed version we can not mark replicas, because replica started to be active. + ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas/" + replica + "/host", host_versions_inactive_replicas.at(replica))); + ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/replicas/" + replica + "/is_lost", "1", -1)); + lost_replicas_info.push_back(LostReplicaInfo{replica, ops}); } - if (requests.size() == replicas_count) + if (lost_replicas_info.size() == replicas_count) throw Exception("All replicas wiil be lost", ErrorCodes::ALL_REPLICAS_LOST); - for (auto & req : requests) - futures.push_back(zookeeper->tryAsyncMulti(req)); + for (auto & replica_info : lost_replicas_info) + info_and_future.emplace_back(replica_info, zookeeper->tryAsyncMulti(replica_info.requests)); - for (size_t i = 0; i < futures.size(); ++i) + for (auto & pair : info_and_future) { - auto multi_responses = futures[i].get(); + auto multi_responses = pair.second.get(); if (multi_responses.responses[0]->error == ZooKeeperImpl::ZooKeeper::ZBADVERSION) - throw Exception("One of the replicas became active, when we clear log", DB::ErrorCodes::REPLICA_STATUS_CHANGED); + throw Exception(pair.first.name + " became active, when we clear log", DB::ErrorCodes::REPLICA_STATUS_CHANGED); else if (multi_responses.error != ZooKeeperImpl::ZooKeeper::ZOK) - zkutil::KeeperMultiException::check(multi_responses.error, requests[i], multi_responses.responses); + zkutil::KeeperMultiException::check(multi_responses.error, pair.first.requests, multi_responses.responses); } } diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h index fab93e84b4d..cd7e59c3326 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.h @@ -48,8 +48,8 @@ private: /// Mark lost replicas. void markLostReplicas(const std::unordered_map & host_versions_inactive_replicas, - const std::unordered_map & log_pointers_inactive_replicas, - size_t replicas_count, const String & remove_border, const zkutil::ZooKeeperPtr & zookeeper); + const std::unordered_map & log_pointers_lost_replicas, + size_t replicas_count, const zkutil::ZooKeeperPtr & zookeeper); /// Remove old block hashes from ZooKeeper. This is done by the leader replica. void clearOldBlocks(); diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index b5a7fd46b75..51881bd2f82 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -26,8 +26,6 @@ namespace DB namespace ErrorCodes { extern const int REPLICA_IS_ALREADY_ACTIVE; - extern const int ALL_REPLICAS_LOST; - extern const int CAN_NOT_CLONE_REPLICA; } namespace diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 70eb53c8ce1..33dafccf696 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -617,7 +617,9 @@ void StorageReplicatedMergeTree::createReplica() code = zookeeper->tryMulti(ops, resps); if (code == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS) throw Exception("Replica " + replica_path + " already exists.", ErrorCodes::REPLICA_IS_ALREADY_EXIST); - else if (code != ZooKeeperImpl::ZooKeeper::ZBADVERSION) + else if (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION) + LOG_ERROR(log, "Retry createReplica(), because some replicas were created"); + else zkutil::KeeperMultiException::check(code, ops, resps); } while (code == ZooKeeperImpl::ZooKeeper::ZBADVERSION); } @@ -1982,6 +1984,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku zkutil::Requests ops; ops.push_back(zkutil::makeSetRequest(replica_path + "/log_pointer", raw_log_pointer, -1)); + /// For support old versions CH. if (source_is_lost_stat.version == -1) { ops.push_back(zkutil::makeCreateRequest(replica_path + "/is_lost", "0", zkutil::CreateMode::PersistentSequential)); @@ -1994,9 +1997,9 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, zku auto error = zookeeper->tryMulti(ops, resp); if (error == ZooKeeperImpl::ZooKeeper::ZBADVERSION) - throw Exception("Can not clone replica, because a source replica is lost", ErrorCodes::REPLICA_STATUS_CHANGED); + throw Exception("Can not clone replica, because a " + source_path + " became lost", ErrorCodes::REPLICA_STATUS_CHANGED); else if (error == ZooKeeperImpl::ZooKeeper::ZNODEEXISTS) - throw Exception("ClickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED); + throw Exception("Can not clone replica, because the clickHouse server updated to new version", ErrorCodes::REPLICA_STATUS_CHANGED); else zkutil::KeeperMultiException::check(error, ops, resp); @@ -2069,6 +2072,7 @@ void StorageReplicatedMergeTree::cloneReplicaIfNeeded(zkutil::ZooKeeperPtr zooke String resp; if (!zookeeper->tryGet(source_replica_path + "/is_lost", resp, &source_is_lost_stat) || resp == "0") source_replica = replica_name; + break; } } diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.h b/dbms/src/Storages/StorageReplicatedMergeTree.h index 519859062bd..05ea48bdc30 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.h +++ b/dbms/src/Storages/StorageReplicatedMergeTree.h @@ -401,8 +401,7 @@ private: void mutationsUpdatingTask(); /** Clone data from another replica. - * return true, if replica wil be cloned - * else return false (when source_replica was lost or log was empty). + * If replica can not be cloned throw Exception. */ void cloneReplica(const String & source_replica, zkutil::Stat source_is_lost_stat, zkutil::ZooKeeperPtr & zookeeper);