Replace part_committed_locally_but_zookeeper with retries

This commit is contained in:
Raúl Marín 2023-11-20 12:34:07 +01:00
parent 20eb5d3251
commit 8217915064
6 changed files with 67 additions and 58 deletions

View File

@ -28,13 +28,14 @@ static struct InitFiu
/// We should define different types of failpoints here. There are four types of them:
/// - ONCE: the failpoint will only be triggered once.
/// - REGULAR: the failpoint will always be triggered util disableFailPoint is called.
/// - REGULAR: the failpoint will always be triggered until disableFailPoint is called.
/// - PAUSEABLE_ONCE: the failpoint will be blocked one time when pauseFailPoint is called, util disableFailPoint is called.
/// - PAUSEABLE: the failpoint will be blocked every time when pauseFailPoint is called, util disableFailPoint is called.
#define APPLY_FOR_FAILPOINTS(ONCE, REGULAR, PAUSEABLE_ONCE, PAUSEABLE) \
ONCE(replicated_merge_tree_commit_zk_fail_after_op) \
ONCE(replicated_merge_tree_insert_quorum_fail_0) \
REGULAR(replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault) \
REGULAR(use_delayed_remote_source) \
REGULAR(cluster_discovery_faults) \
REGULAR(check_table_query_delay_for_part) \

View File

@ -47,7 +47,7 @@ void ZooKeeperWithFaultInjection::injectFailureBeforeOperationThrow(const char *
void ZooKeeperWithFaultInjection::injectFailureAfterOperationThrow(const char * func_name, const String & path)
{
if (unlikely(fault_policy) && fault_policy->beforeOperation())
if (unlikely(fault_policy) && fault_policy->afterOperation())
{
keeper.reset();
if (logger)

View File

@ -30,7 +30,7 @@ public:
bool beforeOperation()
{
if (distribution(rndgen) || must_fail_before_op)
if (must_fail_before_op || distribution(rndgen))
{
must_fail_before_op = false;
return true;
@ -40,7 +40,7 @@ public:
bool afterOperation()
{
if (distribution(rndgen) || must_fail_after_op)
if (must_fail_after_op || distribution(rndgen))
{
must_fail_after_op = false;
return true;

View File

@ -29,6 +29,7 @@ namespace FailPoints
{
extern const char replicated_merge_tree_commit_zk_fail_after_op[];
extern const char replicated_merge_tree_insert_quorum_fail_0[];
extern const char replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault[];
}
namespace ErrorCodes
@ -568,9 +569,7 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
bool is_already_existing_part = false;
/// for retries due to keeper error
bool part_committed_locally_but_zookeeper = false;
bool part_was_deduplicated = false;
Coordination::Error write_part_info_keeper_error = Coordination::Error::ZOK;
std::vector<String> conflict_block_ids;
ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info, context->getProcessListElement());
@ -588,40 +587,12 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
/// For example during RESTORE REPLICA.
if (!writing_existing_part)
{
/// We have lost connection to all keepers but it might be recovered, so we use setUserError to keep retrying
retries_ctl.setUserError(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode: replica_path={}", storage.replica_path);
return;
}
}
if (retries_ctl.isRetry())
{
/// If we are retrying, check if last iteration was actually successful,
/// we could get network error on committing part to zk
/// but the operation could be completed by zk server
/// If this flag is true, then part is in Active state, and we'll not retry anymore
/// we only check if part was committed to zk and return success or failure correspondingly
/// Note: if commit to zk failed then cleanup thread will mark the part as Outdated later
if (part_committed_locally_but_zookeeper)
{
/// check that info about the part was actually written in zk
if (zookeeper->exists(fs::path(storage.replica_path) / "parts" / part->name))
{
LOG_DEBUG(log, "Part was successfully committed on previous iteration: part_id={}", part->name);
}
else
{
retries_ctl.setUserError(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Insert failed due to zookeeper error. Please retry. Reason: {}",
write_part_info_keeper_error);
}
retries_ctl.stopRetries();
return;
}
}
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
/// We remove the lock just after renaming the part. In case of exception, block number will be marked as abandoned.
/// Also, make deduplication check. If a duplicate is detected, no nodes are created.
@ -944,32 +915,49 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
}
else if (Coordination::isHardwareError(multi_code))
{
write_part_info_keeper_error = multi_code;
/** If the connection is lost, and we do not know if the changes were applied, we can not delete the local part
* if the changes were applied, the inserted block appeared in `/blocks/`, and it can not be inserted again.
*/
transaction.commit();
LOG_TRACE(
log, "Insert of part {} failed when committing to keeper (Reason: {}). Attempting to recover it", part->name, multi_code);
ZooKeeperRetriesControl new_retry_controller = retries_ctl;
/// Setting this flag is point of no return
/// On next retry, we'll just check if actually operation succeed or failed
/// and return ok or error correspondingly
part_committed_locally_but_zookeeper = true;
/// We are going to try to verify if the transaction was written into keeper
/// If we fail to do so (keeper unavailable) then we don't know if the changes were applied or not so
/// we can't delete the local part, as if the changes were applied then inserted block appeared in
/// `/blocks/`, and it can not be inserted again.
new_retry_controller.actionAfterLastFailedRetry([&]
{
transaction.commit();
storage.enqueuePartForCheck(part->name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown status of part {} (Reason: {}). Data was written locally but we don't know the status in keeper. It will be verified in ~{} seconds.",
part->name, multi_code, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
});
new_retry_controller.requestUnconditionalRetry();
/// if all retries will be exhausted by accessing zookeeper on fresh retry -> we'll add committed part to queue in the action
/// here lambda capture part name, it's ok since we'll not generate new one for this insert,
/// see comments around 'part_committed_locally_but_zookeeper' flag
retries_ctl.actionAfterLastFailedRetry(
[&my_storage = storage, part_name = part->name]
{
my_storage.enqueuePartForCheck(part_name, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
});
bool node_exists = false;
new_retry_controller.retryLoop([&]
{
fiu_do_on(FailPoints::replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault, { zookeeper->forceFailureBeforeOperation(); });
zookeeper->setKeeper(storage.getZooKeeper());
node_exists = zookeeper->exists(fs::path(storage.replica_path) / "parts" / part->name);
});
/// We do not know whether or not data has been inserted.
retries_ctl.setUserError(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown status, client must retry. Reason: {}",
multi_code);
return;
if (node_exists)
{
LOG_TRACE(log, "Insert of part {} recovered from keeper successfully. It will be committed", part->name);
part->new_part_was_committed_to_zookeeper_after_rename_on_disk = true;
transaction.commit();
storage.merge_selecting_task->schedule();
}
else
{
LOG_TRACE(log, "Insert of part {} was not committed to keeper. Will try again with a new block", part->name);
rename_part_to_temporary();
retries_ctl.setUserError(
ErrorCodes::UNEXPECTED_ZOOKEEPER_ERROR,
"Insert of part {} failed when committing to keeper (Reason: {}",
part->name,
multi_code);
}
}
else if (Coordination::isUserError(multi_code))
{

View File

@ -0,0 +1,20 @@
-- Tags: zookeeper, no-parallel
DROP TABLE IF EXISTS t_hardware_error NO DELAY;
CREATE TABLE t_hardware_error (
KeyID UInt32
) Engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/{database}/t_async_insert_dedup', '{replica}')
ORDER BY (KeyID);
insert into t_hardware_error values (1), (2), (3), (4), (5);
system enable failpoint replicated_merge_tree_commit_zk_fail_after_op;
system enable failpoint replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault;
insert into t_hardware_error values (6), (7), (8), (9), (10); -- {serverError UNKNOWN_STATUS_OF_INSERT}
system disable failpoint replicated_commit_zk_fail_after_op;
system disable failpoint replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault;
DROP TABLE t_hardware_error NO DELAY;