diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 8adcdee7e23..5b235322394 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -34,7 +34,6 @@ namespace ErrorCodes extern const int TOO_FEW_LIVE_REPLICAS; extern const int UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE; extern const int UNEXPECTED_ZOOKEEPER_ERROR; - extern const int NO_ZOOKEEPER; extern const int READONLY; extern const int UNKNOWN_STATUS_OF_INSERT; extern const int INSERT_WAS_DEDUPLICATED; @@ -150,81 +149,86 @@ ReplicatedMergeTreeSinkImpl::ReplicatedMergeTreeSinkImpl( template ReplicatedMergeTreeSinkImpl::~ReplicatedMergeTreeSinkImpl() = default; -/// Allow to verify that the session in ZooKeeper is still alive. -static void assertSessionIsNotExpired(const zkutil::ZooKeeperPtr & zookeeper) -{ - if (!zookeeper) - throw Exception(ErrorCodes::NO_ZOOKEEPER, "No ZooKeeper session."); - - if (zookeeper->expired()) - throw Exception(ErrorCodes::NO_ZOOKEEPER, "ZooKeeper session has been expired."); -} - template size_t ReplicatedMergeTreeSinkImpl::checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper) { if (!isQuorumEnabled()) return 0; - quorum_info.status_path = storage.zookeeper_path + "/quorum/status"; + size_t replicas_number = 0; - Strings replicas = zookeeper->getChildren(fs::path(storage.zookeeper_path) / "replicas"); + ZooKeeperRetriesControl quorum_retries_ctl("checkQuorumPrecondition", zookeeper_retries_info, context->getProcessListElement()); + quorum_retries_ctl.retryLoop( + [&]() + { + zookeeper->setKeeper(storage.getZooKeeper()); - Strings exists_paths; - exists_paths.reserve(replicas.size()); - for (const auto & replica : replicas) - if (replica != storage.replica_name) - exists_paths.emplace_back(fs::path(storage.zookeeper_path) / "replicas" / replica / "is_active"); + quorum_info.status_path = storage.zookeeper_path + "/quorum/status"; - auto exists_result = zookeeper->exists(exists_paths); - auto get_results = zookeeper->get(Strings{storage.replica_path + "/is_active", storage.replica_path + "/host"}); + Strings replicas = zookeeper->getChildren(fs::path(storage.zookeeper_path) / "replicas"); - Coordination::Error keeper_error = Coordination::Error::ZOK; - size_t active_replicas = 1; /// Assume current replica is active (will check below) - for (size_t i = 0; i < exists_paths.size(); ++i) - { - auto error = exists_result[i].error; - if (error == Coordination::Error::ZOK) - ++active_replicas; - else if (Coordination::isHardwareError(error)) - keeper_error = error; - } + Strings exists_paths; + exists_paths.reserve(replicas.size()); + for (const auto & replica : replicas) + if (replica != storage.replica_name) + exists_paths.emplace_back(fs::path(storage.zookeeper_path) / "replicas" / replica / "is_active"); - size_t replicas_number = replicas.size(); - size_t quorum_size = getQuorumSize(replicas_number); + auto exists_result = zookeeper->exists(exists_paths); + auto get_results = zookeeper->get(Strings{storage.replica_path + "/is_active", storage.replica_path + "/host"}); - if (active_replicas < quorum_size) - { - if (Coordination::isHardwareError(keeper_error)) - throw Coordination::Exception::fromMessage(keeper_error, "Failed to check number of alive replicas"); + Coordination::Error keeper_error = Coordination::Error::ZOK; + size_t active_replicas = 1; /// Assume current replica is active (will check below) + for (size_t i = 0; i < exists_paths.size(); ++i) + { + auto error = exists_result[i].error; + if (error == Coordination::Error::ZOK) + ++active_replicas; + else if (Coordination::isHardwareError(error)) + keeper_error = error; + } - throw Exception(ErrorCodes::TOO_FEW_LIVE_REPLICAS, "Number of alive replicas ({}) is less than requested quorum ({}/{}).", - active_replicas, quorum_size, replicas_number); - } + replicas_number = replicas.size(); + size_t quorum_size = getQuorumSize(replicas_number); - /** Is there a quorum for the last part for which a quorum is needed? - * Write of all the parts with the included quorum is linearly ordered. - * This means that at any time there can be only one part, - * for which you need, but not yet reach the quorum. - * Information about this part will be located in `/quorum/status` node. - * If the quorum is reached, then the node is deleted. - */ + if (active_replicas < quorum_size) + { + if (Coordination::isHardwareError(keeper_error)) + throw Coordination::Exception::fromMessage(keeper_error, "Failed to check number of alive replicas"); - String quorum_status; - if (!quorum_parallel && zookeeper->tryGet(quorum_info.status_path, quorum_status)) - throw Exception(ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE, - "Quorum for previous write has not been satisfied yet. Status: {}", quorum_status); + throw Exception( + ErrorCodes::TOO_FEW_LIVE_REPLICAS, + "Number of alive replicas ({}) is less than requested quorum ({}/{}).", + active_replicas, + quorum_size, + replicas_number); + } - /// Both checks are implicitly made also later (otherwise there would be a race condition). + /** Is there a quorum for the last part for which a quorum is needed? + * Write of all the parts with the included quorum is linearly ordered. + * This means that at any time there can be only one part, + * for which you need, but not yet reach the quorum. + * Information about this part will be located in `/quorum/status` node. + * If the quorum is reached, then the node is deleted. + */ - auto is_active = get_results[0]; - auto host = get_results[1]; + String quorum_status; + if (!quorum_parallel && zookeeper->tryGet(quorum_info.status_path, quorum_status)) + throw Exception( + ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE, + "Quorum for previous write has not been satisfied yet. Status: {}", + quorum_status); - if (is_active.error == Coordination::Error::ZNONODE || host.error == Coordination::Error::ZNONODE) - throw Exception(ErrorCodes::READONLY, "Replica is not active right now"); + /// Both checks are implicitly made also later (otherwise there would be a race condition). - quorum_info.is_active_node_version = is_active.stat.version; - quorum_info.host_node_version = host.stat.version; + auto is_active = get_results[0]; + auto host = get_results[1]; + + if (is_active.error == Coordination::Error::ZNONODE || host.error == Coordination::Error::ZNONODE) + throw Exception(ErrorCodes::READONLY, "Replica is not active right now"); + + quorum_info.is_active_node_version = is_active.stat.version; + quorum_info.host_node_version = host.stat.version; + }); return replicas_number; } @@ -257,14 +261,7 @@ void ReplicatedMergeTreeSinkImpl::consume(Chunk chunk) * And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node). * TODO Too complex logic, you can do better. */ - size_t replicas_num = 0; - ZooKeeperRetriesControl quorum_retries_ctl("checkQuorumPrecondition", zookeeper_retries_info, context->getProcessListElement()); - quorum_retries_ctl.retryLoop( - [&]() - { - zookeeper->setKeeper(storage.getZooKeeper()); - replicas_num = checkQuorumPrecondition(zookeeper); - }); + size_t replicas_num = checkQuorumPrecondition(zookeeper); if (!storage_snapshot->object_columns.empty()) convertDynamicColumnsToTuples(block, storage_snapshot); @@ -483,7 +480,6 @@ bool ReplicatedMergeTreeSinkImpl::writeExistingPart(MergeTreeData::Mutabl { /// NOTE: No delay in this case. That's Ok. auto origin_zookeeper = storage.getZooKeeper(); - assertSessionIsNotExpired(origin_zookeeper); auto zookeeper = std::make_shared(origin_zookeeper); size_t replicas_num = checkQuorumPrecondition(zookeeper); @@ -1058,7 +1054,6 @@ template void ReplicatedMergeTreeSinkImpl::onFinish() { auto zookeeper = storage.getZooKeeper(); - assertSessionIsNotExpired(zookeeper); finishDelayedChunk(std::make_shared(zookeeper)); } diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 36815badd53..b1191af60b7 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2948,7 +2948,7 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster): # while materialized view is working to inject zookeeper failure pm.drop_instance_zk_connections(instance) instance.wait_for_log_line( - "Error.*(session has been expired|Connection loss).*while pushing to view" + "Error.*(Connection loss|Coordination::Exception).*while pushing to view" ) pm.heal_all() instance.wait_for_log_line("Committed offset 22")