diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 5026836f902..9a5cd062f1d 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -593,7 +593,7 @@ struct CommitRetryContext enum Stages { LOCK_BLOCK, - COMMIT, + CHECK_LOCK_AND_COMMIT, DUPLICATED_PART, UNCERTAIN_COMMIT, SUCCESS, @@ -601,12 +601,16 @@ struct CommitRetryContext }; /// Possible ways: - /// LOCK_BLOCK -> COMMIT + /// LOCK_BLOCK -> DUPLICATED_PART + /// LOCK_BLOCK -> CHECK_LOCK_AND_COMMIT + + /// CHECK_LOCK_AND_COMMIT -> LOCK_BLOCK + /// CHECK_LOCK_AND_COMMIT -> DUPLICATED_PART + /// CHECK_LOCK_AND_COMMIT -> UNCERTAIN_COMMIT + /// CHECK_LOCK_AND_COMMIT -> SUCCESS + /// DUPLICATED_PART -> SUCCESS - /// COMMIT -> SUCCESS - /// COMMIT -> DUPLICATED_PART - /// COMMIT -> UNCERTAIN_COMMIT /// UNCERTAIN_COMMIT -> SUCCESS /// * -> ERROR @@ -640,7 +644,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info, context->getProcessListElement()); - auto resolve_duplicate_stage = [&] () -> bool + auto resolve_duplicate_stage = [&] () -> CommitRetryContext::Stages { if constexpr (async_insert) { @@ -666,19 +670,17 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: retry_context.part_was_deduplicated = true; - retry_context.stage = CommitRetryContext::SUCCESS; - return false; + return CommitRetryContext::SUCCESS; } }; - auto resolve_uncertain_commit_stage = [&] () + auto resolve_uncertain_commit_stage = [&] () -> CommitRetryContext::Stages { /// check that info about the part was actually written in zk if (zookeeper->exists(fs::path(storage.replica_path) / "parts" / retry_context.actual_part_name)) { LOG_DEBUG(log, "Part was successfully committed on previous iteration: part_id={}", part->name); - retry_context.stage = CommitRetryContext::SUCCESS; - return false; + return CommitRetryContext::SUCCESS; } else { @@ -695,11 +697,12 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: "Insert failed due to zookeeper error. Please retry. Reason: {}", retry_context.unsertain_keeper_error); - return false; + /// trigger next keeper retry + return CommitRetryContext::UNCERTAIN_COMMIT; } }; - auto lock_part_number_stage = [&] () + auto lock_part_number_stage = [&] () -> CommitRetryContext::Stages { if constexpr (async_insert) { @@ -707,8 +710,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: retry_context.conflict_block_ids = detectConflictsInAsyncBlockIDs(block_id); if (!retry_context.conflict_block_ids.empty()) { - retry_context.stage = CommitRetryContext::ERROR; - return false; + return CommitRetryContext::ERROR; } } @@ -722,8 +724,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: if (!retry_context.block_number_lock) { - retry_context.stage = CommitRetryContext::DUPLICATED_PART; - return true; + return CommitRetryContext::DUPLICATED_PART; } if constexpr (async_insert) @@ -738,8 +739,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: LOG_TRACE(log, "Cannot get lock, the conflict path is {}", conflict_path); retry_context.conflict_block_ids.push_back(conflict_path); - retry_context.stage = CommitRetryContext::ERROR; - return false; + return CommitRetryContext::ERROR; } } @@ -754,8 +754,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: part->setName(part->getNewName(part->info)); retry_context.actual_part_name = part->name; - retry_context.stage = CommitRetryContext::COMMIT; - return true; + return CommitRetryContext::CHECK_LOCK_AND_COMMIT; }; auto get_quorum_ops = [&] (Coordination::Requests & ops) @@ -839,19 +838,27 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: zkutil::CreateMode::PersistentSequential)); }; - auto commit_new_part_stage = [&] () + auto commit_new_part_stage = [&] () -> CommitRetryContext::Stages { chassert(retry_context.block_number_lock.has_value()); + /// Lock might be easily lost at some retry attempt due to connection loss + /// Redo the locking + if (!retry_context.block_number_lock->isLocked()) + { + return CommitRetryContext::LOCK_BLOCK; + } + /// Prepare transaction to ZooKeeper /// It will simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock. Coordination::Requests ops; + get_logs_ops(ops); + /// Deletes the information that the block number is used for writing. size_t block_unlock_op_idx = ops.size(); retry_context.block_number_lock->getUnlockOp(ops); - get_logs_ops(ops); get_quorum_ops(ops); size_t shared_lock_ops_id_begin = ops.size(); @@ -912,8 +919,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: /// Lock nodes have been already deleted, do not delete them in destructor retry_context.block_number_lock->assumeUnlocked(); - retry_context.stage = CommitRetryContext::SUCCESS; - return false; + return CommitRetryContext::SUCCESS; } if (Coordination::isHardwareError(multi_code)) @@ -925,8 +931,7 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: */ transaction.commit(); - retry_context.stage = CommitRetryContext::UNCERTAIN_COMMIT; - return true; + return CommitRetryContext::UNCERTAIN_COMMIT; } if (!Coordination::isUserError(multi_code)) @@ -956,10 +961,10 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: retry_context.conflict_block_ids = std::vector({failed_op_path}); LOG_TRACE(log, "conflict when committing, the conflict block ids are {}", toString(retry_context.conflict_block_ids)); + return CommitRetryContext::ERROR; } - retry_context.stage = CommitRetryContext::DUPLICATED_PART; - return true; + return CommitRetryContext::DUPLICATED_PART; } else if (multi_code == Coordination::Error::ZNONODE && failed_op_idx == block_unlock_op_idx) { @@ -972,7 +977,8 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: { transaction.rollback(); throw Exception(ErrorCodes::LOGICAL_ERROR, - "Creating shared lock for part {} has failed with error: {}. It's a bug.", + "Creating shared lock for part {} has failed with error: {}. It's a bug. " + "No race is possible since it is a new part.", part->name, multi_code); } else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path) @@ -1000,17 +1006,17 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: switch (retry_context.stage) { case CommitRetryContext::LOCK_BLOCK: - return lock_part_number_stage(); - - case CommitRetryContext::COMMIT: - return commit_new_part_stage(); - + retry_context.stage = lock_part_number_stage(); + break; + case CommitRetryContext::CHECK_LOCK_AND_COMMIT: + retry_context.stage = commit_new_part_stage(); + break; case CommitRetryContext::DUPLICATED_PART: - return resolve_duplicate_stage(); - + retry_context.stage = resolve_duplicate_stage(); + break; case CommitRetryContext::UNCERTAIN_COMMIT: - return resolve_uncertain_commit_stage(); - + retry_context.stage = resolve_uncertain_commit_stage(); + break; case CommitRetryContext::SUCCESS: throw Exception(ErrorCodes::LOGICAL_ERROR, "Operation is already succeed."); @@ -1029,6 +1035,8 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: retry_context.stage = CommitRetryContext::ERROR; throw; } + + return retry_context.stage; }; retries_ctl.retryLoop([&]() @@ -1051,9 +1059,24 @@ std::pair, bool> ReplicatedMergeTreeSinkImpl:: } } - while (stage_switcher()) + while (true) { - // no op; + const auto prev_stage = retry_context.stage; + + stage_switcher(); + + if (prev_stage == retry_context.stage) + { + /// trigger next retry in retries_ctl.retryLoop when stage has not changed + return; + } + + if (retry_context.stage == CommitRetryContext::SUCCESS + || retry_context.stage == CommitRetryContext::ERROR) + { + /// operation is done + return; + } } }, [&zookeeper]() { zookeeper->cleanupEphemeralNodes(); }); diff --git a/tests/integration/test_quorum_inserts_parallel/test.py b/tests/integration/test_quorum_inserts_parallel/test.py index 7f8784d822c..6cfc130e359 100644 --- a/tests/integration/test_quorum_inserts_parallel/test.py +++ b/tests/integration/test_quorum_inserts_parallel/test.py @@ -109,15 +109,22 @@ def test_parallel_quorum_actually_quorum(started_cluster): def insert_value_to_node(node, settings): node.query("INSERT INTO q VALUES(3, 'Hi')", settings=settings) + def insert_fail_quorum_timeout(node, settings): + if "insert_quorum_timeout" not in settings: + settings["insert_quorum_timeout"] = "1000" + error = node.query_and_get_error("INSERT INTO q VALUES(3, 'Hi')", settings=settings) + assert "DB::Exception: Unknown status, client must retry." in error, error + assert "DB::Exception: Timeout while waiting for quorum. (TIMEOUT_EXCEEDED)" in error, error + p = Pool(2) res = p.apply_async( - insert_value_to_node, + insert_fail_quorum_timeout, ( node1, { "insert_quorum": "3", "insert_quorum_parallel": "1", - "insert_quorum_timeout": "60000", + "insert_quorum_timeout": "1000", }, ), ) @@ -139,14 +146,14 @@ def test_parallel_quorum_actually_quorum(started_cluster): ) # Insert to the second to satisfy quorum - insert_value_to_node( - node2, {"insert_quorum": "3", "insert_quorum_parallel": "1"} + insert_fail_quorum_timeout( + node2, {"insert_quorum": "3", "insert_quorum_parallel": "1", "insert_quorum_timeout": "1000"} ) res.get() assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "3") - assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "1") + assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0") assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "3") p.close()