diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 69e49fccb7c..30182697b21 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -77,7 +77,6 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe { quorum_info.status_path = storage.zookeeper_path + "/quorum/status"; - std::future quorum_status_future = zookeeper->asyncTryGet(quorum_info.status_path); std::future is_active_future = zookeeper->asyncTryGet(storage.replica_path + "/is_active"); std::future host_future = zookeeper->asyncTryGet(storage.replica_path + "/host"); @@ -99,9 +98,9 @@ void ReplicatedMergeTreeBlockOutputStream::checkQuorumPrecondition(zkutil::ZooKe * If the quorum is reached, then the node is deleted. */ - auto quorum_status = quorum_status_future.get(); - if (quorum_status.error != Coordination::Error::ZNONODE && !quorum_parallel) - throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status.data, + String quorum_status; + if (!quorum_parallel && zookeeper->tryGet(quorum_info.status_path, quorum_status)) + throw Exception("Quorum for previous write has not been satisfied yet. Status: " + quorum_status, ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE); /// Both checks are implicitly made also later (otherwise there would be a race condition). @@ -470,7 +469,11 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( if (is_already_existing_part) { /// We get duplicate part without fetch - storage.updateQuorum(part->name); + /// Check if this quorum insert is parallel or not + if (zookeeper->exists(storage.zookeeper_path + "/quorum/status")) + storage.updateQuorum(part->name, false); + else if (zookeeper->exists(storage.zookeeper_path + "/quorum/parallel/" + part->name)) + storage.updateQuorum(part->name, true); } /// We are waiting for quorum to be satisfied. diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp index 9d5aa84b3ab..29e8f55ecd0 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeRestartingThread.cpp @@ -232,7 +232,7 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart() && zookeeper->exists(storage.replica_path + "/parts/" + quorum_entry.part_name)) { LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", quorum_entry.part_name); - storage.updateQuorum(quorum_entry.part_name); + storage.updateQuorum(quorum_entry.part_name, false); } } @@ -249,7 +249,7 @@ void ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart() && zookeeper->exists(storage.replica_path + "/parts/" + partition)) { LOG_WARNING(log, "We have part {} but we is not in quorum. Updating quorum. This shouldn't happen often.", partition); - storage.updateQuorum(partition); + storage.updateQuorum(partition, true); } } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 55213bf5f40..75cdfc5607d 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -3038,35 +3038,22 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart( /** If a quorum is tracked for a part, update information about it in ZK. */ -void StorageReplicatedMergeTree::updateQuorum(const String & part_name) +void StorageReplicatedMergeTree::updateQuorum(const String & part_name, bool is_parallel) { auto zookeeper = getZooKeeper(); /// Information on which replicas a part has been added, if the quorum has not yet been reached. - const String quorum_unparallel_status_path = zookeeper_path + "/quorum/status"; + const String quorum_status_path = is_parallel ? zookeeper_path + "/quorum/parallel/" + part_name : zookeeper_path + "/quorum/status"; /// The name of the previous part for which the quorum was reached. const String quorum_last_part_path = zookeeper_path + "/quorum/last_part"; - const String quorum_parallel_status_path = zookeeper_path + "/quorum/parallel/" + part_name; String value; Coordination::Stat stat; /// If there is no node, then all quorum INSERTs have already reached the quorum, and nothing is needed. - while (true) + while (zookeeper->tryGet(quorum_status_path, value, &stat)) { - bool quorum_parallel(false); - ReplicatedMergeTreeQuorumEntry quorum_entry; - - if (zookeeper->tryGet(quorum_unparallel_status_path, value, &stat)) - {} - else if (zookeeper->tryGet(quorum_parallel_status_path, value, &stat)) - quorum_parallel = true; - else - break; - - const String quorum_status_path = quorum_parallel ? quorum_parallel_status_path : quorum_unparallel_status_path; - quorum_entry.fromString(value); - + ReplicatedMergeTreeQuorumEntry quorum_entry(value); if (quorum_entry.part_name != part_name) { /// The quorum has already been achieved. Moreover, another INSERT with a quorum has already started. @@ -3082,7 +3069,7 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name) Coordination::Requests ops; Coordination::Responses responses; - if (!quorum_parallel) + if (!is_parallel) { Coordination::Stat added_parts_stat; String old_added_parts = zookeeper->get(quorum_last_part_path, &added_parts_stat); @@ -3332,18 +3319,23 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora * If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method. */ if (quorum) - updateQuorum(part_name); + { + /// Check if this quorum insert is parallel or not + if (zookeeper->exists(zookeeper_path + "/quorum/status")) + updateQuorum(part_name, false); + else if (zookeeper->exists(zookeeper_path + "/quorum/parallel/" + part_name)) + updateQuorum(part_name, true); + } - /// alexelex: i'm not sure in witch order should that and merge_selecting_task->schedule() be /// merged parts that are still inserted with quorum. if it only contains one block, it hasn't been merged before - if (part_info.max_block != part_info.min_block) + if (part_info.level != 0 || part_info.mutation != 0) { Strings quorum_parts = zookeeper->getChildren(zookeeper_path + "/quorum/parallel"); for (const String & quorum_part : quorum_parts) { auto quorum_part_info = MergeTreePartInfo::fromPartName(quorum_part, format_version); if (part_info.contains(quorum_part_info)) - updateQuorum(quorum_part); + updateQuorum(quorum_part, true); } } diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 2bc9265331d..8dbeaabce2c 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -492,7 +492,7 @@ private: /// With the quorum being tracked, add a replica to the quorum for the part. - void updateQuorum(const String & part_name); + void updateQuorum(const String & part_name, bool is_parallel); /// Deletes info from quorum/last_part node for particular partition_id. void cleanLastPartNode(const String & partition_id);