diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp index 4aa8b12bd96..2b2570e0187 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.cpp @@ -218,6 +218,11 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( String temporary_part_name = part->name; + /// There is one case when we need to retry transaction in a loop. + /// But don't do it too many times - just as defensive measure. + size_t loop_counter = 0; + constexpr size_t max_iterations = 10; + while (true) { /// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem. @@ -229,6 +234,10 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( String block_id_path = deduplicate_block ? storage.zookeeper_path + "/blocks/" + block_id : ""; auto block_number_lock = storage.allocateBlockNumber(part->info.partition_id, zookeeper, block_id_path); + /// Prepare transaction to ZooKeeper + /// It will simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock. + Coordination::Requests ops; + Int64 block_number = 0; String existing_part_name; if (block_number_lock) @@ -242,6 +251,25 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( part->info.level = 0; part->name = part->getNewName(part->info); + + /// Will add log entry about new part. + + StorageReplicatedMergeTree::LogEntry log_entry; + log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART; + log_entry.create_time = time(nullptr); + log_entry.source_replica = storage.replica_name; + log_entry.new_part_name = part->name; + log_entry.quorum = quorum; + log_entry.block_id = block_id; + log_entry.new_part_type = part->getType(); + + ops.emplace_back(zkutil::makeCreateRequest( + storage.zookeeper_path + "/log/log-", + log_entry.toString(), + zkutil::CreateMode::PersistentSequential)); + + /// Deletes the information that the block number is used for writing. + block_number_lock->getUnlockOps(ops); } else { @@ -269,43 +297,21 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( part->name = existing_part_name; part->info = MergeTreePartInfo::fromPartName(existing_part_name, storage.format_version); + /// Used only for exception messages. block_number = part->info.min_block; /// Don't do subsequent duplicate check. block_id_path.clear(); } - StorageReplicatedMergeTree::LogEntry log_entry; - log_entry.type = StorageReplicatedMergeTree::LogEntry::GET_PART; - log_entry.create_time = time(nullptr); - log_entry.source_replica = storage.replica_name; - log_entry.new_part_name = part->name; - log_entry.quorum = quorum; - log_entry.block_id = block_id; - log_entry.new_part_type = part->getType(); - - /// Simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock. - /// Information about the part. - Coordination::Requests ops; - storage.getCommitPartOps(ops, part, block_id_path); - /// Replication log. - ops.emplace_back(zkutil::makeCreateRequest( - storage.zookeeper_path + "/log/log-", - log_entry.toString(), - zkutil::CreateMode::PersistentSequential)); - - /// Deletes the information that the block number is used for writing. - if (block_number_lock) - block_number_lock->getUnlockOps(ops); - - /** If you need a quorum - create a node in which the quorum is monitored. - * (If such a node already exists, then someone has managed to make another quorum record at the same time, - * but for it the quorum has not yet been reached. - * You can not do the next quorum record at this time.) - */ + /** If we need a quorum - create a node in which the quorum is monitored. + * (If such a node already exists, then someone has managed to make another quorum record at the same time, + * but for it the quorum has not yet been reached. + * You can not do the next quorum record at this time.) + */ if (quorum) /// TODO Duplicate blocks. { ReplicatedMergeTreeQuorumEntry quorum_entry; @@ -405,6 +411,9 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart( part->state = MergeTreeDataPartState::Temporary; part->renameTo(temporary_part_name, false); + ++loop_counter; + if (loop_counter == max_iterations) + throw Exception("Too many transaction retires - it may indicate an error", ErrorCodes::DUPLICATE_DATA_PART); continue; } else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)