WIP: Remove UNCERTAIN_COMMIT in INSERT

This commit is contained in:
Raúl Marín 2023-12-11 17:27:56 +01:00
parent 462cd0e4be
commit e1965bb6b5
2 changed files with 64 additions and 54 deletions

View File

@ -48,15 +48,15 @@ void ZooKeeperWithFaultInjection::resetKeeper()
{
keeper->remove(path_created);
}
catch (const Coordination::Exception &)
catch (const Coordination::Exception & e)
{
if (logger)
LOG_TRACE(logger, "Failed to delete ephemeral node ({}) during fault cleanup", path_created);
LOG_TRACE(logger, "Failed to delete ephemeral node ({}) during fault cleanup: {}", path_created, e.what());
}
}
}
session_ephemeral_nodes.clear();
keeper.reset();
}
@ -94,7 +94,6 @@ void ZooKeeperWithFaultInjection::injectFailureBeforeOperationThrow(const char *
if (unlikely(fault_policy) && fault_policy->beforeOperation())
{
resetKeeper();
if (logger)
LOG_TRACE(
logger,
@ -104,6 +103,7 @@ void ZooKeeperWithFaultInjection::injectFailureBeforeOperationThrow(const char *
path,
RandomFaultInjection::error_before_op,
RandomFaultInjection::msg_before_op);
resetKeeper();
throw zkutil::KeeperException::fromMessage(RandomFaultInjection::error_before_op, RandomFaultInjection::msg_before_op);
}
}
@ -112,7 +112,6 @@ void ZooKeeperWithFaultInjection::injectFailureAfterOperationThrow(const char *
{
if (unlikely(fault_policy) && fault_policy->afterOperation())
{
resetKeeper();
if (logger)
LOG_TRACE(
logger,
@ -122,6 +121,7 @@ void ZooKeeperWithFaultInjection::injectFailureAfterOperationThrow(const char *
path,
RandomFaultInjection::error_after_op,
RandomFaultInjection::msg_after_op);
resetKeeper();
throw zkutil::KeeperException::fromMessage(RandomFaultInjection::error_after_op, RandomFaultInjection::msg_after_op);
}
}
@ -159,10 +159,10 @@ bool ZooKeeperWithFaultInjection::injectFailureBeforeOperationPromise(const char
if (unlikely(fault_policy) && fault_policy->beforeOperation())
{
resetKeeper();
if (logger)
LOG_TRACE(
logger, "ZooKeeperWithFaultInjection injected fault before operation: seed={} func={} path={}", seed, func_name, path);
resetKeeper();
promise->set_exception(std::make_exception_ptr(
zkutil::KeeperException::fromMessage(RandomFaultInjection::error_before_op, RandomFaultInjection::msg_before_op)));
return true;
@ -175,11 +175,11 @@ bool ZooKeeperWithFaultInjection::injectFailureAfterOperationPromise(const char
{
if (unlikely(fault_policy) && fault_policy->afterOperation())
{
resetKeeper();
promise->set_exception(std::make_exception_ptr(
zkutil::KeeperException::fromMessage(RandomFaultInjection::error_after_op, RandomFaultInjection::msg_after_op)));
if (logger)
LOG_TRACE(logger, "ZooKeeperWithFaultInjection injected fault after operation: seed={} func={} path={}", seed, func_name, path);
resetKeeper();
return true;
}
return false;
@ -513,9 +513,9 @@ zkutil::ZooKeeper::FutureMulti ZooKeeperWithFaultInjection::asyncTryMultiNoThrow
if (!keeper || (unlikely(fault_policy) && fault_policy->beforeOperation()))
{
resetKeeper();
if (logger)
LOG_TRACE(logger, "ZooKeeperWithFaultInjection injected fault before operation: seed={} func={} path={}", seed, __func__, path);
resetKeeper();
Coordination::MultiResponse errors;
for (size_t i = 0; i < request_size; i++)
{
@ -532,7 +532,6 @@ zkutil::ZooKeeper::FutureMulti ZooKeeperWithFaultInjection::asyncTryMultiNoThrow
{
if (unlikely(fault_policy) && fault_policy->afterOperation())
{
resetKeeper();
if (logger)
LOG_TRACE(
logger,
@ -540,6 +539,7 @@ zkutil::ZooKeeper::FutureMulti ZooKeeperWithFaultInjection::asyncTryMultiNoThrow
seed,
function_name,
path);
resetKeeper();
Coordination::MultiResponse errors;
for (size_t i = 0; i < request_size; i++)
{
@ -593,9 +593,9 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr
if (!keeper || (unlikely(fault_policy) && fault_policy->beforeOperation()))
{
resetKeeper();
if (logger)
LOG_TRACE(logger, "ZooKeeperWithFaultInjection injected fault before operation: seed={} func={} path={}", seed, __func__, path);
resetKeeper();
Coordination::RemoveResponse r;
r.error = RandomFaultInjection::error_before_op;
promise->set_value(r);
@ -607,7 +607,6 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr
{
if (unlikely(fault_policy) && fault_policy->afterOperation())
{
resetKeeper();
if (logger)
LOG_TRACE(
logger,
@ -615,6 +614,7 @@ zkutil::ZooKeeper::FutureRemove ZooKeeperWithFaultInjection::asyncTryRemoveNoThr
seed,
function_name,
path);
resetKeeper();
Coordination::RemoveResponse r;
r.error = RandomFaultInjection::error_after_op;
promise->set_value(r);

View File

@ -29,6 +29,7 @@ namespace FailPoints
{
extern const char replicated_merge_tree_commit_zk_fail_after_op[];
extern const char replicated_merge_tree_insert_quorum_fail_0[];
extern const char replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault[];
}
namespace ErrorCodes
@ -594,7 +595,6 @@ struct CommitRetryContext
{
LOCK_AND_COMMIT,
DUPLICATED_PART,
UNCERTAIN_COMMIT,
SUCCESS,
ERROR
};
@ -602,19 +602,17 @@ struct CommitRetryContext
/// Possible ways:
/// LOCK_AND_COMMIT -> DUPLICATED_PART
/// LOCK_AND_COMMIT -> UNCERTAIN_COMMIT
/// LOCK_AND_COMMIT -> SUCCESS
/// LOCK_AND_COMMIT -> ERROR
/// DUPLICATED_PART -> SUCCESS
/// UNCERTAIN_COMMIT -> SUCCESS
/// * -> ERROR
/// DUPLICATED_PART -> ERROR
Stages stage = LOCK_AND_COMMIT;
String actual_part_name;
std::vector<String> conflict_block_ids;
bool part_was_deduplicated = false;
Coordination::Error uncertain_keeper_error = Coordination::Error::ZOK;
};
template<bool async_insert>
@ -667,34 +665,6 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
}
};
auto resolve_uncertain_commit_stage = [&] () -> CommitRetryContext::Stages
{
/// check that info about the part was actually written in zk
if (zookeeper->exists(fs::path(storage.replica_path) / "parts" / retry_context.actual_part_name))
{
LOG_DEBUG(log, "Part was successfully committed on previous iteration: part_id={}", part->name);
return CommitRetryContext::SUCCESS;
}
else
{
/// if all retries will be exhausted by accessing zookeeper on fresh retry -> we'll add committed part to queue in the action
/// here lambda capture part name, it's ok since we'll not generate new one for this insert,
retries_ctl.actionAfterLastFailedRetry(
[&] ()
{
storage.enqueuePartForCheck(retry_context.actual_part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
});
retries_ctl.setUserError(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Insert failed due to zookeeper error. Please retry. Reason: {}",
retry_context.uncertain_keeper_error);
/// trigger next keeper retry
return CommitRetryContext::UNCERTAIN_COMMIT;
}
};
auto get_quorum_ops = [&] (Coordination::Requests & ops)
{
/** If we need a quorum - create a node in which the quorum is monitored.
@ -916,14 +886,57 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
if (Coordination::isHardwareError(multi_code))
{
retry_context.uncertain_keeper_error = multi_code;
LOG_TRACE(
log, "Insert of part {} failed when committing to keeper (Reason: {}). Attempting to recover it", part->name, multi_code);
ZooKeeperRetriesControl new_retry_controller = retries_ctl;
/** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
*/
/// We are going to try to verify if the transaction was written into keeper
/// If we fail to do so (keeper unavailable) then we don't know if the changes were applied or not so
/// we can't delete the local part, as if the changes were applied then inserted block appeared in
/// `/blocks/`, and it can not be inserted again.
new_retry_controller.actionAfterLastFailedRetry([&]
{
transaction.commit();
storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown status of part {} (Reason: {}). Data was written locally but we don't know the status in keeper. "
"The status will be verified automatically in ~{} seconds (the part will be kept if present in keeper or dropped if not)",
part->name, multi_code, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
});
/// Independently of how many retries we had left we want to do at least one check of this inner retry
/// at least once so a) we try to verify at least once if metadata was written and b) we set the proper
/// final error (UNKNOWN_STATUS_OF_INSERT) if we fail to reconnect to keeper
new_retry_controller.requestUnconditionalRetry();
bool node_exists = false;
new_retry_controller.retryLoop([&]
{
fiu_do_on(FailPoints::replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault, { zookeeper->forceFailureBeforeOperation(); });
zookeeper->setKeeper(storage.getZooKeeper());
node_exists = zookeeper->exists(fs::path(storage.replica_path) / "parts" / part->name);
});
if (node_exists)
{
LOG_TRACE(log, "Insert of part {} recovered from keeper successfully. It will be committed", part->name);
part->new_part_was_committed_to_zookeeper_after_rename_on_disk = true;
sleep_before_commit_for_tests();
transaction.commit();
return CommitRetryContext::UNCERTAIN_COMMIT;
block_number_lock->assumeUnlocked();
return CommitRetryContext::SUCCESS;
}
else
{
// LOG_TRACE(log, "Insert of part {} was not committed to keeper. Will try again with a new block", part->name);
// rename_part_to_temporary();
// retries_ctl.setUserError(
// ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
// "Insert of part {} failed when committing to keeper (Reason: {})",
// part->name,
// multi_code);
throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_INSERT, "Need to implement full retry");
}
}
transaction.rollback();
@ -992,9 +1005,6 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
case CommitRetryContext::DUPLICATED_PART:
retry_context.stage = resolve_duplicate_stage();
break;
case CommitRetryContext::UNCERTAIN_COMMIT:
retry_context.stage = resolve_uncertain_commit_stage();
break;
case CommitRetryContext::SUCCESS:
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Operation is already succeed.");