relock block when sessin expired

This commit is contained in:
Sema Checherinda 2023-11-30 18:02:57 +01:00
parent e4becc01ba
commit 42f648f0d2
2 changed files with 76 additions and 46 deletions

View File

@ -593,7 +593,7 @@ struct CommitRetryContext
enum Stages
{
LOCK_BLOCK,
COMMIT,
CHECK_LOCK_AND_COMMIT,
DUPLICATED_PART,
UNCERTAIN_COMMIT,
SUCCESS,
@ -601,12 +601,16 @@ struct CommitRetryContext
};
/// Possible ways:
/// LOCK_BLOCK -> COMMIT
/// LOCK_BLOCK -> DUPLICATED_PART
/// LOCK_BLOCK -> CHECK_LOCK_AND_COMMIT
/// CHECK_LOCK_AND_COMMIT -> LOCK_BLOCK
/// CHECK_LOCK_AND_COMMIT -> DUPLICATED_PART
/// CHECK_LOCK_AND_COMMIT -> UNCERTAIN_COMMIT
/// CHECK_LOCK_AND_COMMIT -> SUCCESS
/// DUPLICATED_PART -> SUCCESS
/// COMMIT -> SUCCESS
/// COMMIT -> DUPLICATED_PART
/// COMMIT -> UNCERTAIN_COMMIT
/// UNCERTAIN_COMMIT -> SUCCESS
/// * -> ERROR
@ -640,7 +644,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info, context->getProcessListElement());
auto resolve_duplicate_stage = [&] () -> bool
auto resolve_duplicate_stage = [&] () -> CommitRetryContext::Stages
{
if constexpr (async_insert)
{
@ -666,19 +670,17 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
retry_context.part_was_deduplicated = true;
retry_context.stage = CommitRetryContext::SUCCESS;
return false;
return CommitRetryContext::SUCCESS;
}
};
auto resolve_uncertain_commit_stage = [&] ()
auto resolve_uncertain_commit_stage = [&] () -> CommitRetryContext::Stages
{
/// check that info about the part was actually written in zk
if (zookeeper->exists(fs::path(storage.replica_path) / "parts" / retry_context.actual_part_name))
{
LOG_DEBUG(log, "Part was successfully committed on previous iteration: part_id={}", part->name);
retry_context.stage = CommitRetryContext::SUCCESS;
return false;
return CommitRetryContext::SUCCESS;
}
else
{
@ -695,11 +697,12 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
"Insert failed due to zookeeper error. Please retry. Reason: {}",
retry_context.unsertain_keeper_error);
return false;
/// trigger next keeper retry
return CommitRetryContext::UNCERTAIN_COMMIT;
}
};
auto lock_part_number_stage = [&] ()
auto lock_part_number_stage = [&] () -> CommitRetryContext::Stages
{
if constexpr (async_insert)
{
@ -707,8 +710,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
retry_context.conflict_block_ids = detectConflictsInAsyncBlockIDs(block_id);
if (!retry_context.conflict_block_ids.empty())
{
retry_context.stage = CommitRetryContext::ERROR;
return false;
return CommitRetryContext::ERROR;
}
}
@ -722,8 +724,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
if (!retry_context.block_number_lock)
{
retry_context.stage = CommitRetryContext::DUPLICATED_PART;
return true;
return CommitRetryContext::DUPLICATED_PART;
}
if constexpr (async_insert)
@ -738,8 +739,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
LOG_TRACE(log, "Cannot get lock, the conflict path is {}", conflict_path);
retry_context.conflict_block_ids.push_back(conflict_path);
retry_context.stage = CommitRetryContext::ERROR;
return false;
return CommitRetryContext::ERROR;
}
}
@ -754,8 +754,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
part->setName(part->getNewName(part->info));
retry_context.actual_part_name = part->name;
retry_context.stage = CommitRetryContext::COMMIT;
return true;
return CommitRetryContext::CHECK_LOCK_AND_COMMIT;
};
auto get_quorum_ops = [&] (Coordination::Requests & ops)
@ -839,19 +838,27 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
zkutil::CreateMode::PersistentSequential));
};
auto commit_new_part_stage = [&] ()
auto commit_new_part_stage = [&] () -> CommitRetryContext::Stages
{
chassert(retry_context.block_number_lock.has_value());
/// Lock might be easily lost at some retry attempt due to connection loss
/// Redo the locking
if (!retry_context.block_number_lock->isLocked())
{
return CommitRetryContext::LOCK_BLOCK;
}
/// Prepare transaction to ZooKeeper
/// It will simultaneously add information about the part to all the necessary places in ZooKeeper and remove block_number_lock.
Coordination::Requests ops;
get_logs_ops(ops);
/// Deletes the information that the block number is used for writing.
size_t block_unlock_op_idx = ops.size();
retry_context.block_number_lock->getUnlockOp(ops);
get_logs_ops(ops);
get_quorum_ops(ops);
size_t shared_lock_ops_id_begin = ops.size();
@ -912,8 +919,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
/// Lock nodes have been already deleted, do not delete them in destructor
retry_context.block_number_lock->assumeUnlocked();
retry_context.stage = CommitRetryContext::SUCCESS;
return false;
return CommitRetryContext::SUCCESS;
}
if (Coordination::isHardwareError(multi_code))
@ -925,8 +931,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
*/
transaction.commit();
retry_context.stage = CommitRetryContext::UNCERTAIN_COMMIT;
return true;
return CommitRetryContext::UNCERTAIN_COMMIT;
}
if (!Coordination::isUserError(multi_code))
@ -956,10 +961,10 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
retry_context.conflict_block_ids = std::vector<String>({failed_op_path});
LOG_TRACE(log, "conflict when committing, the conflict block ids are {}",
toString(retry_context.conflict_block_ids));
return CommitRetryContext::ERROR;
}
retry_context.stage = CommitRetryContext::DUPLICATED_PART;
return true;
return CommitRetryContext::DUPLICATED_PART;
}
else if (multi_code == Coordination::Error::ZNONODE && failed_op_idx == block_unlock_op_idx)
{
@ -972,7 +977,8 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
{
transaction.rollback();
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Creating shared lock for part {} has failed with error: {}. It's a bug.",
"Creating shared lock for part {} has failed with error: {}. It's a bug. "
"No race is possible since it is a new part.",
part->name, multi_code);
}
else if (multi_code == Coordination::Error::ZNODEEXISTS && failed_op_path == quorum_info.status_path)
@ -1000,17 +1006,17 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
switch (retry_context.stage)
{
case CommitRetryContext::LOCK_BLOCK:
return lock_part_number_stage();
case CommitRetryContext::COMMIT:
return commit_new_part_stage();
retry_context.stage = lock_part_number_stage();
break;
case CommitRetryContext::CHECK_LOCK_AND_COMMIT:
retry_context.stage = commit_new_part_stage();
break;
case CommitRetryContext::DUPLICATED_PART:
return resolve_duplicate_stage();
retry_context.stage = resolve_duplicate_stage();
break;
case CommitRetryContext::UNCERTAIN_COMMIT:
return resolve_uncertain_commit_stage();
retry_context.stage = resolve_uncertain_commit_stage();
break;
case CommitRetryContext::SUCCESS:
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Operation is already succeed.");
@ -1029,6 +1035,8 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
retry_context.stage = CommitRetryContext::ERROR;
throw;
}
return retry_context.stage;
};
retries_ctl.retryLoop([&]()
@ -1051,9 +1059,24 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
}
}
while (stage_switcher())
while (true)
{
// no op;
const auto prev_stage = retry_context.stage;
stage_switcher();
if (prev_stage == retry_context.stage)
{
/// trigger next retry in retries_ctl.retryLoop when stage has not changed
return;
}
if (retry_context.stage == CommitRetryContext::SUCCESS
|| retry_context.stage == CommitRetryContext::ERROR)
{
/// operation is done
return;
}
}
},
[&zookeeper]() { zookeeper->cleanupEphemeralNodes(); });

View File

@ -109,15 +109,22 @@ def test_parallel_quorum_actually_quorum(started_cluster):
def insert_value_to_node(node, settings):
node.query("INSERT INTO q VALUES(3, 'Hi')", settings=settings)
def insert_fail_quorum_timeout(node, settings):
if "insert_quorum_timeout" not in settings:
settings["insert_quorum_timeout"] = "1000"
error = node.query_and_get_error("INSERT INTO q VALUES(3, 'Hi')", settings=settings)
assert "DB::Exception: Unknown status, client must retry." in error, error
assert "DB::Exception: Timeout while waiting for quorum. (TIMEOUT_EXCEEDED)" in error, error
p = Pool(2)
res = p.apply_async(
insert_value_to_node,
insert_fail_quorum_timeout,
(
node1,
{
"insert_quorum": "3",
"insert_quorum_parallel": "1",
"insert_quorum_timeout": "60000",
"insert_quorum_timeout": "1000",
},
),
)
@ -139,14 +146,14 @@ def test_parallel_quorum_actually_quorum(started_cluster):
)
# Insert to the second to satisfy quorum
insert_value_to_node(
node2, {"insert_quorum": "3", "insert_quorum_parallel": "1"}
insert_fail_quorum_timeout(
node2, {"insert_quorum": "3", "insert_quorum_parallel": "1", "insert_quorum_timeout": "1000"}
)
res.get()
assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "3")
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "1")
assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "3")
p.close()