Merge pull request #57986 from Algunenano/zk_retries_quorum

Support keeper failures in quorum check
This commit is contained in:
Raúl Marín 2023-12-22 11:10:04 +01:00 committed by GitHub
commit 24f952b2b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 192 additions and 206 deletions

View File

@ -88,18 +88,19 @@ BackupEntriesCollector::BackupEntriesCollector(
, read_settings(read_settings_)
, context(context_)
, on_cluster_first_sync_timeout(context->getConfigRef().getUInt64("backups.on_cluster_first_sync_timeout", 180000))
, collect_metadata_timeout(context->getConfigRef().getUInt64("backups.collect_metadata_timeout", context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000)))
, collect_metadata_timeout(context->getConfigRef().getUInt64(
"backups.collect_metadata_timeout", context->getConfigRef().getUInt64("backups.consistent_metadata_snapshot_timeout", 600000)))
, attempts_to_collect_metadata_before_sleep(context->getConfigRef().getUInt("backups.attempts_to_collect_metadata_before_sleep", 2))
, min_sleep_before_next_attempt_to_collect_metadata(context->getConfigRef().getUInt64("backups.min_sleep_before_next_attempt_to_collect_metadata", 100))
, max_sleep_before_next_attempt_to_collect_metadata(context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000))
, min_sleep_before_next_attempt_to_collect_metadata(
context->getConfigRef().getUInt64("backups.min_sleep_before_next_attempt_to_collect_metadata", 100))
, max_sleep_before_next_attempt_to_collect_metadata(
context->getConfigRef().getUInt64("backups.max_sleep_before_next_attempt_to_collect_metadata", 5000))
, compare_collected_metadata(context->getConfigRef().getBool("backups.compare_collected_metadata", true))
, log(&Poco::Logger::get("BackupEntriesCollector"))
, global_zookeeper_retries_info(
"BackupEntriesCollector",
log,
context->getSettingsRef().backup_restore_keeper_max_retries,
context->getSettingsRef().backup_restore_keeper_retry_initial_backoff_ms,
context->getSettingsRef().backup_restore_keeper_retry_max_backoff_ms)
context->getSettingsRef().backup_restore_keeper_max_retries,
context->getSettingsRef().backup_restore_keeper_retry_initial_backoff_ms,
context->getSettingsRef().backup_restore_keeper_retry_max_backoff_ms)
, threadpool(threadpool_)
{
}
@ -572,7 +573,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> BackupEntriesCollector::findTablesInD
{
/// Database or table could be replicated - so may use ZooKeeper. We need to retry.
auto zookeeper_retries_info = global_zookeeper_retries_info;
ZooKeeperRetriesControl retries_ctl("getTablesForBackup", zookeeper_retries_info, nullptr);
ZooKeeperRetriesControl retries_ctl("getTablesForBackup", log, zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&](){ db_tables = database->getTablesForBackup(filter_by_table_name, context); });
}
catch (Exception & e)

View File

@ -20,22 +20,19 @@ WithRetries::KeeperSettings WithRetries::KeeperSettings::fromContext(ContextPtr
};
}
WithRetries::WithRetries(Poco::Logger * log_, zkutil::GetZooKeeper get_zookeeper_, const KeeperSettings & settings_, RenewerCallback callback_)
WithRetries::WithRetries(
Poco::Logger * log_, zkutil::GetZooKeeper get_zookeeper_, const KeeperSettings & settings_, RenewerCallback callback_)
: log(log_)
, get_zookeeper(get_zookeeper_)
, settings(settings_)
, callback(callback_)
, global_zookeeper_retries_info(
log->name(),
log,
settings.keeper_max_retries,
settings.keeper_retry_initial_backoff_ms,
settings.keeper_retry_max_backoff_ms)
settings.keeper_max_retries, settings.keeper_retry_initial_backoff_ms, settings.keeper_retry_max_backoff_ms)
{}
WithRetries::RetriesControlHolder::RetriesControlHolder(const WithRetries * parent, const String & name)
: info(parent->global_zookeeper_retries_info)
, retries_ctl(name, info, nullptr)
, retries_ctl(name, parent->log, info, nullptr)
, faulty_zookeeper(parent->getFaultyZooKeeper())
{}

View File

@ -41,12 +41,9 @@ static ZooKeeperRetriesInfo getRetriesInfo()
{
const auto & config_ref = Context::getGlobalContextInstance()->getConfigRef();
return ZooKeeperRetriesInfo(
"DistributedDDL",
&Poco::Logger::get("DDLQueryStatusSource"),
config_ref.getInt("distributed_ddl_keeper_max_retries", 5),
config_ref.getInt("distributed_ddl_keeper_initial_backoff_ms", 100),
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000)
);
config_ref.getInt("distributed_ddl_keeper_max_backoff_ms", 5000));
}
bool isSupportedAlterTypeForOnClusterDDLQuery(int type)
@ -438,8 +435,8 @@ Chunk DDLQueryStatusSource::generate()
Strings tmp_active_hosts;
{
auto retries_info = getRetriesInfo();
auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info, context->getProcessListElement());
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster", &Poco::Logger::get("DDLQueryStatusSource"), getRetriesInfo(), context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
auto zookeeper = context->getZooKeeper();
@ -478,8 +475,11 @@ Chunk DDLQueryStatusSource::generate()
String status_data;
bool finished_exists = false;
auto retries_info = getRetriesInfo();
auto retries_ctl = ZooKeeperRetriesControl("executeDDLQueryOnCluster", retries_info, context->getProcessListElement());
auto retries_ctl = ZooKeeperRetriesControl(
"executeDDLQueryOnCluster",
&Poco::Logger::get("DDLQueryStatusSource"),
getRetriesInfo(),
context->getProcessListElement());
retries_ctl.retryLoop([&]()
{
finished_exists = context->getZooKeeper()->tryGet(fs::path(node_path) / "finished" / host_id, status_data);

View File

@ -40,8 +40,6 @@ namespace ErrorCodes
extern const int READONLY;
extern const int UNKNOWN_STATUS_OF_INSERT;
extern const int INSERT_WAS_DEDUPLICATED;
extern const int TIMEOUT_EXCEEDED;
extern const int NO_ACTIVE_REPLICAS;
extern const int DUPLICATE_DATA_PART;
extern const int PART_IS_TEMPORARILY_LOCKED;
extern const int LOGICAL_ERROR;
@ -160,7 +158,12 @@ size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const
size_t replicas_number = 0;
ZooKeeperRetriesControl quorum_retries_ctl("checkQuorumPrecondition", zookeeper_retries_info, context->getProcessListElement());
const auto & settings = context->getSettingsRef();
ZooKeeperRetriesControl quorum_retries_ctl(
"checkQuorumPrecondition",
log,
{settings.insert_keeper_max_retries, settings.insert_keeper_retry_initial_backoff_ms, settings.insert_keeper_retry_max_backoff_ms},
context->getProcessListElement());
quorum_retries_ctl.retryLoop(
[&]()
{
@ -255,12 +258,6 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
auto block = getHeader().cloneWithColumns(chunk.detachColumns());
const auto & settings = context->getSettingsRef();
zookeeper_retries_info = ZooKeeperRetriesInfo(
"ReplicatedMergeTreeSink::consume",
settings.insert_keeper_max_retries ? log : nullptr,
settings.insert_keeper_max_retries,
settings.insert_keeper_retry_initial_backoff_ms,
settings.insert_keeper_retry_max_backoff_ms);
ZooKeeperWithFaultInjectionPtr zookeeper = ZooKeeperWithFaultInjection::createInstance(
settings.insert_keeper_fault_injection_probability,
@ -636,7 +633,12 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
CommitRetryContext retry_context;
ZooKeeperRetriesControl retries_ctl("commitPart", zookeeper_retries_info, context->getProcessListElement());
const auto & settings = context->getSettingsRef();
ZooKeeperRetriesControl retries_ctl(
"commitPart",
log,
{settings.insert_keeper_max_retries, settings.insert_keeper_retry_initial_backoff_ms, settings.insert_keeper_retry_max_backoff_ms},
context->getProcessListElement());
auto resolve_duplicate_stage = [&] () -> CommitRetryContext::Stages
{
@ -910,12 +912,8 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
part->name, multi_code, MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER);
});
/// Independently of how many retries we had left we want to do at least one check of this inner retry
/// so a) we try to verify at least once if metadata was written and b) we set the proper final error
/// (UNKNOWN_STATUS_OF_INSERT) if we fail to reconnect to keeper
new_retry_controller.requestUnconditionalRetry();
bool node_exists = false;
/// The loop will be executed at least once
new_retry_controller.retryLoop([&]
{
fiu_do_on(FailPoints::replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault, { zookeeper->forceFailureBeforeOperation(); });
@ -1073,7 +1071,26 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
if (quorum_parallel)
quorum_info.status_path = storage.zookeeper_path + "/quorum/parallel/" + retry_context.actual_part_name;
waitForQuorum(zookeeper, retry_context.actual_part_name, quorum_info.status_path, quorum_info.is_active_node_version, replicas_num);
ZooKeeperRetriesControl new_retry_controller = retries_ctl;
new_retry_controller.actionAfterLastFailedRetry([&]
{
/// We do not know whether or not data has been inserted in other replicas
new_retry_controller.setUserError(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: {}",
new_retry_controller.getLastKeeperErrorMessage());
});
new_retry_controller.retryLoop([&]()
{
zookeeper->setKeeper(storage.getZooKeeper());
waitForQuorum(
zookeeper,
retry_context.actual_part_name,
quorum_info.status_path,
quorum_info.is_active_node_version,
replicas_num);
});
}
}
@ -1106,49 +1123,44 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::waitForQuorum(
/// We are waiting for quorum to be satisfied.
LOG_TRACE(log, "Waiting for quorum '{}' for part {}{}", quorum_path, part_name, quorumLogMessage(replicas_num));
try
fiu_do_on(FailPoints::replicated_merge_tree_insert_quorum_fail_0, { zookeeper->forceFailureBeforeOperation(); });
while (true)
{
fiu_do_on(FailPoints::replicated_merge_tree_insert_quorum_fail_0, { zookeeper->forceFailureBeforeOperation(); });
zkutil::EventPtr event = std::make_shared<Poco::Event>();
while (true)
{
zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string value;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if (!zookeeper->tryGet(quorum_path, value, nullptr, event))
break;
std::string value;
/// `get` instead of `exists` so that `watch` does not leak if the node is no longer there.
if (!zookeeper->tryGet(quorum_path, value, nullptr, event))
break;
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_path);
LOG_TRACE(log, "Quorum node {} still exists, will wait for updates", quorum_path);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
ReplicatedMergeTreeQuorumEntry quorum_entry(value);
/// If the node has time to disappear, and then appear again for the next insert.
if (quorum_entry.part_name != part_name)
break;
/// If the node has time to disappear, and then appear again for the next insert.
if (quorum_entry.part_name != part_name)
break;
if (!event->tryWait(quorum_timeout_ms))
throw Exception(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: "
"Timeout while waiting for quorum");
if (!event->tryWait(quorum_timeout_ms))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Timeout while waiting for quorum");
LOG_TRACE(log, "Quorum {} for part {} updated, will check quorum node still exists", quorum_path, part_name);
}
/// And what if it is possible that the current replica at this time has ceased to be active
/// and the quorum is marked as failed and deleted?
Coordination::Stat stat;
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, &stat)
|| stat.version != is_active_node_version)
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "Replica become inactive while waiting for quorum");
}
catch (...)
{
/// We do not know whether or not data has been inserted
/// - whether other replicas have time to download the part and mark the quorum as done.
throw Exception(ErrorCodes::UNKNOWN_STATUS_OF_INSERT, "Unknown status, client must retry. Reason: {}",
getCurrentExceptionMessage(false));
LOG_TRACE(log, "Quorum {} for part {} updated, will check quorum node still exists", quorum_path, part_name);
}
/// And what if it is possible that the current replica at this time has ceased to be active
/// and the quorum is marked as failed and deleted?
Coordination::Stat stat;
String value;
if (!zookeeper->tryGet(storage.replica_path + "/is_active", value, &stat) || stat.version != is_active_node_version)
throw Exception(
ErrorCodes::UNKNOWN_STATUS_OF_INSERT,
"Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: "
"Replica became inactive while waiting for quorum");
LOG_TRACE(log, "Quorum '{}' for part {} satisfied", quorum_path, part_name);
}

View File

@ -74,7 +74,6 @@ private:
using BlockIDsType = std::conditional_t<async_insert, std::vector<String>, String>;
ZooKeeperRetriesInfo zookeeper_retries_info;
struct QuorumInfo
{
String status_path;

View File

@ -5,6 +5,8 @@
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/logger_useful.h>
#include <memory>
namespace DB
{
@ -15,29 +17,31 @@ namespace ErrorCodes
struct ZooKeeperRetriesInfo
{
ZooKeeperRetriesInfo() = default;
ZooKeeperRetriesInfo(std::string name_, Poco::Logger * logger_, UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_)
: name(std::move(name_))
, logger(logger_)
, max_retries(max_retries_)
, curr_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_))
, max_backoff_ms(max_backoff_ms_)
ZooKeeperRetriesInfo(UInt64 max_retries_, UInt64 initial_backoff_ms_, UInt64 max_backoff_ms_)
: max_retries(max_retries_), initial_backoff_ms(std::min(initial_backoff_ms_, max_backoff_ms_)), max_backoff_ms(max_backoff_ms_)
{
}
std::string name;
Poco::Logger * logger = nullptr;
UInt64 max_retries = 0;
UInt64 curr_backoff_ms = 0;
UInt64 max_backoff_ms = 0;
UInt64 retry_count = 0;
UInt64 max_retries;
UInt64 initial_backoff_ms;
UInt64 max_backoff_ms;
};
class ZooKeeperRetriesControl
{
public:
ZooKeeperRetriesControl(std::string name_, ZooKeeperRetriesInfo & retries_info_, QueryStatusPtr elem)
: name(std::move(name_)), retries_info(retries_info_), process_list_element(elem)
ZooKeeperRetriesControl(std::string name_, Poco::Logger * logger_, ZooKeeperRetriesInfo retries_info_, QueryStatusPtr elem)
: name(std::move(name_)), logger(logger_), retries_info(retries_info_), process_list_element(elem)
{
}
ZooKeeperRetriesControl(const ZooKeeperRetriesControl & other)
: name(other.name)
, logger(other.logger)
, retries_info(other.retries_info)
, total_failures(other.total_failures)
, process_list_element(other.process_list_element)
, current_backoff_ms(other.current_backoff_ms)
{
}
@ -46,7 +50,7 @@ public:
retryLoop(f, []() {});
}
/// retryLoop() executes f() until it succeeds/max_retries is reached/non-retrialable error is encountered
/// retryLoop() executes f() until it succeeds/max_retries is reached/non-retryable error is encountered
///
/// the callable f() can provide feedback in terms of errors in two ways:
/// 1. throw KeeperException exception:
@ -56,10 +60,17 @@ public:
/// The idea is that if the caller has some semantics on top of non-hardware keeper errors,
/// then it can provide feedback to retries controller via user errors
///
/// It is possible to use it multiple times (it will share nº of errors over the total amount of calls)
/// Each retryLoop is independent and it will execute f at least once
void retryLoop(auto && f, auto && iteration_cleanup)
{
while (canTry())
current_iteration = 0;
current_backoff_ms = retries_info.initial_backoff_ms;
while (current_iteration == 0 || canTry())
{
/// reset the flag, it will be set to false in case of error
iteration_succeeded = true;
try
{
f();
@ -79,6 +90,7 @@ public:
iteration_cleanup();
throw;
}
current_iteration++;
}
}
@ -102,13 +114,11 @@ public:
void setUserError(std::exception_ptr exception, int code, std::string message)
{
if (retries_info.logger)
LOG_TRACE(
retries_info.logger, "ZooKeeperRetriesControl: {}/{}: setUserError: error={} message={}", retries_info.name, name, code, message);
if (logger)
LOG_TRACE(logger, "ZooKeeperRetriesControl: {}: setUserError: error={} message={}", name, code, message);
/// if current iteration is already failed, keep initial error
if (!iteration_succeeded)
return;
if (iteration_succeeded)
total_failures++;
iteration_succeeded = false;
user_error.code = code;
@ -136,13 +146,11 @@ public:
void setKeeperError(std::exception_ptr exception, Coordination::Error code, std::string message)
{
if (retries_info.logger)
LOG_TRACE(
retries_info.logger, "ZooKeeperRetriesControl: {}/{}: setKeeperError: error={} message={}", retries_info.name, name, code, message);
if (logger)
LOG_TRACE(logger, "ZooKeeperRetriesControl: {}: setKeeperError: error={} message={}", name, code, message);
/// if current iteration is already failed, keep initial error
if (!iteration_succeeded)
return;
if (iteration_succeeded)
total_failures++;
iteration_succeeded = false;
keeper_error.code = code;
@ -170,17 +178,19 @@ public:
void stopRetries() { stop_retries = true; }
void requestUnconditionalRetry() { unconditional_retry = true; }
bool isLastRetry() const { return total_failures >= retries_info.max_retries; }
bool isLastRetry() const { return retries_info.retry_count >= retries_info.max_retries; }
bool isRetry() const { return current_iteration > 1; }
bool isRetry() const { return retries_info.retry_count > 0; }
Coordination::Error getLastKeeperErrorCode() const { return keeper_error.code; }
const std::string & getLastKeeperErrorMessage() const { return keeper_error.message; }
/// action will be called only once and only after latest failed retry
void actionAfterLastFailedRetry(std::function<void()> f) { action_after_last_failed_retry = std::move(f); }
const std::string & getName() const { return name; }
Poco::Logger * getLogger() const { return logger; }
private:
struct KeeperError
{
@ -199,59 +209,42 @@ private:
bool canTry()
{
++iteration_count;
/// first iteration is ordinary execution, no further checks needed
if (0 == iteration_count)
return true;
if (process_list_element && !process_list_element->checkTimeLimitSoft())
return false;
if (unconditional_retry)
{
unconditional_retry = false;
return true;
}
/// iteration succeeded -> no need to retry
if (iteration_succeeded)
{
/// avoid unnecessary logs, - print something only in case of retries
if (retries_info.logger && iteration_count > 1)
if (logger && total_failures > 0)
LOG_DEBUG(
retries_info.logger,
"ZooKeeperRetriesControl: {}/{}: succeeded after: iterations={} total_retries={}",
retries_info.name,
logger,
"ZooKeeperRetriesControl: {}: succeeded after: Iterations={} Total keeper failures={}/{}",
name,
iteration_count,
retries_info.retry_count);
current_iteration,
total_failures,
retries_info.max_retries);
return false;
}
if (stop_retries)
{
logLastError("stop retries on request");
action_after_last_failed_retry();
logLastError("stop retries on request");
throwIfError();
return false;
}
if (retries_info.retry_count >= retries_info.max_retries)
if (total_failures > retries_info.max_retries)
{
logLastError("retry limit is reached");
action_after_last_failed_retry();
logLastError("retry limit is reached");
throwIfError();
return false;
}
if (process_list_element && !process_list_element->checkTimeLimitSoft())
return false;
/// retries
++retries_info.retry_count;
logLastError("will retry due to error");
sleepForMilliseconds(retries_info.curr_backoff_ms);
retries_info.curr_backoff_ms = std::min(retries_info.curr_backoff_ms * 2, retries_info.max_backoff_ms);
/// reset the flag, it will be set to false in case of error
iteration_succeeded = true;
sleepForMilliseconds(current_backoff_ms);
current_backoff_ms = std::min(current_backoff_ms * 2, retries_info.max_backoff_ms);
return true;
}
@ -265,49 +258,52 @@ private:
std::rethrow_exception(keeper_error.exception);
}
void logLastError(std::string_view header)
void logLastError(const std::string_view & header)
{
if (!logger)
return;
if (user_error.code == ErrorCodes::OK)
{
if (retries_info.logger)
LOG_DEBUG(
retries_info.logger,
"ZooKeeperRetriesControl: {}/{}: {}: retry_count={} timeout={}ms error={} message={}",
retries_info.name,
name,
header,
retries_info.retry_count,
retries_info.curr_backoff_ms,
keeper_error.code,
keeper_error.message);
LOG_DEBUG(
logger,
"ZooKeeperRetriesControl: {}: {}: retry_count={}/{} timeout={}ms error={} message={}",
name,
header,
current_iteration,
retries_info.max_retries,
current_backoff_ms,
keeper_error.code,
keeper_error.message);
}
else
{
if (retries_info.logger)
LOG_DEBUG(
retries_info.logger,
"ZooKeeperRetriesControl: {}/{}: {}: retry_count={} timeout={}ms error={} message={}",
retries_info.name,
name,
header,
retries_info.retry_count,
retries_info.curr_backoff_ms,
user_error.code,
user_error.message);
LOG_DEBUG(
logger,
"ZooKeeperRetriesControl: {}: {}: retry_count={}/{} timeout={}ms error={} message={}",
name,
header,
current_iteration,
retries_info.max_retries,
current_backoff_ms,
user_error.code,
user_error.message);
}
}
std::string name;
ZooKeeperRetriesInfo & retries_info;
Int64 iteration_count = -1;
Poco::Logger * logger = nullptr;
ZooKeeperRetriesInfo retries_info;
UInt64 total_failures = 0;
UserError user_error;
KeeperError keeper_error;
std::function<void()> action_after_last_failed_retry = []() {};
bool unconditional_retry = false;
bool iteration_succeeded = true;
bool stop_retries = false;
QueryStatusPtr process_list_element;
UInt64 current_iteration = 0;
UInt64 current_backoff_ms = 0;
};
}

View File

@ -10258,7 +10258,7 @@ void StorageReplicatedMergeTree::backupData(
bool exists = false;
Strings mutation_ids;
{
ZooKeeperRetriesControl retries_ctl("getMutations", zookeeper_retries_info, nullptr);
ZooKeeperRetriesControl retries_ctl("getMutations", log, zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&]()
{
if (!zookeeper || zookeeper->expired())
@ -10277,7 +10277,7 @@ void StorageReplicatedMergeTree::backupData(
bool mutation_id_exists = false;
String mutation;
ZooKeeperRetriesControl retries_ctl("getMutation", zookeeper_retries_info, nullptr);
ZooKeeperRetriesControl retries_ctl("getMutation", log, zookeeper_retries_info, nullptr);
retries_ctl.retryLoop([&]()
{
if (!zookeeper || zookeeper->expired())

View File

@ -115,9 +115,8 @@ def test_parallel_quorum_actually_quorum(started_cluster):
error = node.query_and_get_error(
"INSERT INTO q VALUES(3, 'Hi')", settings=settings
)
assert "DB::Exception: Unknown status, client must retry." in error, error
assert (
"DB::Exception: Timeout while waiting for quorum. (TIMEOUT_EXCEEDED)"
"DB::Exception: Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: Timeout while waiting for quorum"
in error
), error

View File

@ -20,7 +20,6 @@ SET select_sequential_consistency=1;
SELECT x FROM quorum1 ORDER BY x;
SELECT x FROM quorum2 ORDER BY x;
SET insert_keeper_fault_injection_probability=0;
SET insert_quorum=2, insert_quorum_parallel=0;
INSERT INTO quorum1 VALUES (4, '1990-11-15');

View File

@ -11,7 +11,6 @@ CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/t
SET insert_quorum=2, insert_quorum_parallel=0;
SET select_sequential_consistency=1;
SET insert_keeper_fault_injection_probability=0;
INSERT INTO quorum1 VALUES (1, '2018-11-15');
INSERT INTO quorum1 VALUES (2, '2018-11-15');

View File

@ -11,7 +11,6 @@ CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/t
SET insert_quorum=2, insert_quorum_parallel=0;
SET select_sequential_consistency=1;
SET insert_keeper_fault_injection_probability=0;
SET insert_quorum_timeout=0;

View File

@ -17,7 +17,6 @@ SYSTEM SYNC REPLICA quorum2;
SET select_sequential_consistency=1;
SET insert_quorum=2, insert_quorum_parallel=0;
SET insert_keeper_fault_injection_probability=0;
SET insert_quorum_timeout=0;

View File

@ -11,7 +11,6 @@ CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/t
SET insert_quorum=2, insert_quorum_parallel=0;
SET select_sequential_consistency=1;
SET insert_keeper_fault_injection_probability=0;
INSERT INTO quorum1 VALUES (1, '2018-11-15');
INSERT INTO quorum1 VALUES (2, '2018-11-15');

View File

@ -11,7 +11,6 @@ CREATE TABLE quorum2(x UInt32, y Date) ENGINE ReplicatedMergeTree('/clickhouse/t
SET insert_quorum=2, insert_quorum_parallel=0;
SET select_sequential_consistency=1;
SET insert_keeper_fault_injection_probability=0;
INSERT INTO quorum1 VALUES (1, '2018-11-15');
INSERT INTO quorum1 VALUES (2, '2018-11-15');

View File

@ -9,7 +9,6 @@ CREATE TABLE mutations_and_quorum2 (`server_date` Date, `something` String) ENGI
-- Should not be larger then 600e6 (default timeout in clickhouse-test)
SET insert_quorum=2, insert_quorum_parallel=0, insert_quorum_timeout=300e3;
SET insert_keeper_fault_injection_probability=0;
INSERT INTO mutations_and_quorum1 VALUES ('2019-01-01', 'test1'), ('2019-02-01', 'test2'), ('2019-03-01', 'test3'), ('2019-04-01', 'test4'), ('2019-05-01', 'test1'), ('2019-06-01', 'test2'), ('2019-07-01', 'test3'), ('2019-08-01', 'test4'), ('2019-09-01', 'test1'), ('2019-10-01', 'test2'), ('2019-11-01', 'test3'), ('2019-12-01', 'test4');

View File

@ -1,6 +1,5 @@
-- Tags: long, replica, no-replicated-database
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET replication_alter_partitions_sync = 2;
@ -10,7 +9,7 @@ DROP TABLE IF EXISTS replica2;
CREATE TABLE replica1 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/01451/quorum', 'r1') order by tuple() settings max_replicated_merges_in_queue = 0;
CREATE TABLE replica2 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/test/01451/quorum', 'r2') order by tuple() settings max_replicated_merges_in_queue = 0;
INSERT INTO replica1 VALUES (0);
INSERT INTO replica1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (0);
SYSTEM SYNC REPLICA replica2;
@ -27,7 +26,7 @@ ALTER TABLE replica2 DROP PARTITION ID 'all';
SET insert_quorum = 2, insert_quorum_parallel = 0;
INSERT INTO replica2 VALUES (1);
INSERT INTO replica2 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (1);
SYSTEM SYNC REPLICA replica2;
@ -39,7 +38,7 @@ SELECT COUNT() FROM replica1;
SET insert_quorum_parallel=1;
INSERT INTO replica2 VALUES (2);
INSERT INTO replica2 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (2);
-- should work, parallel quorum nodes exists only during insert
ALTER TABLE replica1 DROP PART 'all_3_3_0';

View File

@ -1,7 +1,6 @@
-- Tags: long, replica, no-replicated-database
-- Tag no-replicated-database: Fails due to additional replicas or shards
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
SET replication_alter_partitions_sync = 2;
DROP TABLE IF EXISTS replica1 SYNC;
@ -10,9 +9,9 @@ DROP TABLE IF EXISTS replica2 SYNC;
CREATE TABLE replica1 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/'||currentDatabase()||'test/01451/attach', 'r1') order by tuple() settings max_replicated_merges_in_queue = 0;
CREATE TABLE replica2 (v UInt8) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{database}/'||currentDatabase()||'test/01451/attach', 'r2') order by tuple() settings max_replicated_merges_in_queue = 0;
INSERT INTO replica1 VALUES (0);
INSERT INTO replica1 VALUES (1);
INSERT INTO replica1 VALUES (2);
INSERT INTO replica1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (0);
INSERT INTO replica1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (1);
INSERT INTO replica1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (2);
ALTER TABLE replica1 DETACH PART 'all_100_100_0'; -- { serverError 232 }
@ -25,7 +24,7 @@ SELECT v FROM replica1 ORDER BY v;
SELECT name FROM system.detached_parts WHERE table = 'replica2' AND database = currentDatabase();
ALTER TABLE replica2 ATTACH PART 'all_1_1_0';
ALTER TABLE replica2 ATTACH PART 'all_1_1_0' SETTINGS insert_keeper_fault_injection_probability=0;
SYSTEM SYNC REPLICA replica1;
SELECT v FROM replica1 ORDER BY v;

View File

@ -20,10 +20,6 @@ function thread {
for x in {0..99}; do
# sometimes we can try to commit obsolete part if fetches will be quite fast,
# so supress warning messages like "Tried to commit obsolete part ... covered by ..."
# (2) keeper fault injection for inserts because
# it can be a cause of deduplicated parts be visible to SELECTs for sometime (until cleanup thread remove them),
# so the same SELECT on different replicas can return different results, i.e. test output will be non-deterministic
# (see #9712)
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r$1 SELECT $x % $NUM_REPLICAS = $1 ? $x - 1 : $x" 2>/dev/null # Replace some records as duplicates so they will be written by other replicas
done
}

View File

@ -24,7 +24,7 @@ function thread {
while true; do
$CLICKHOUSE_CLIENT --query "DETACH TABLE r$1"
$CLICKHOUSE_CLIENT --query "ATTACH TABLE r$1"
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 0 --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r$1 SELECT $x" 2>&1 | grep -qE "$valid_exceptions_to_retry" || break
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 0 --query "INSERT INTO r$1 SELECT $x" 2>&1 | grep -qE "$valid_exceptions_to_retry" || break
done
done
}

View File

@ -20,7 +20,7 @@ done
function thread {
i=0 retries=300
while [[ $i -lt $retries ]]; do # server can be dead
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 1 --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r$1 SELECT $2" && break
$CLICKHOUSE_CLIENT --insert_quorum 3 --insert_quorum_parallel 1 --query "INSERT INTO r$1 SELECT $2" && break
((++i))
sleep 0.1
done

View File

@ -21,7 +21,7 @@ done
$CLICKHOUSE_CLIENT -n -q "SYSTEM STOP REPLICATION QUEUES r2;"
function thread {
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --insert_keeper_fault_injection_probability=0 --query "INSERT INTO r1 SELECT $1"
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --query "INSERT INTO r1 SELECT $1"
}
for i in $(seq 1 $NUM_INSERTS); do

View File

@ -20,10 +20,9 @@ $CLICKHOUSE_CLIENT -q "CREATE TABLE parallel_q2 (x UInt64) ENGINE=ReplicatedMerg
$CLICKHOUSE_CLIENT -q "SYSTEM STOP REPLICATION QUEUES parallel_q2"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "INSERT INTO parallel_q1 VALUES (1)"
# disable keeper fault injection during insert since test checks part names. Part names can differ in case of retries during insert
$CLICKHOUSE_CLIENT --insert_quorum 2 --insert_quorum_parallel 1 --insert_keeper_fault_injection_probability=0 --query="INSERT INTO parallel_q1 VALUES (2)" &
# This test depends on part names and those aren't deterministic with faults
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "INSERT INTO parallel_q1 VALUES (1)"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 --insert_quorum 2 --insert_quorum_parallel 1 --query="INSERT INTO parallel_q1 VALUES (2)" &
part_count=$($CLICKHOUSE_CLIENT --query="SELECT COUNT() FROM system.parts WHERE table='parallel_q1' and database='${CLICKHOUSE_DATABASE}'")

View File

@ -16,8 +16,6 @@ CREATE TABLE r2 (
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/01509_parallel_quorum_insert_no_replicas', '2')
ORDER BY tuple();
SET insert_keeper_fault_injection_probability=0;
SET insert_quorum_parallel=1;
SET insert_quorum=3;

View File

@ -20,7 +20,6 @@ SYSTEM SYNC REPLICA quorum3;
SET select_sequential_consistency=0;
SET optimize_trivial_count_query=1;
SET insert_quorum=2, insert_quorum_parallel=0;
SET insert_keeper_fault_injection_probability=0;
SYSTEM STOP FETCHES quorum1;

View File

@ -2,8 +2,6 @@
-- Tag no-replicated-database: Fails due to additional replicas or shards
-- Tag no-parallel: static zk path
SET insert_keeper_fault_injection_probability=0; -- disable fault injection; part ids are non-deterministic in case of insert retries
DROP TABLE IF EXISTS execute_on_single_replica_r1 SYNC;
DROP TABLE IF EXISTS execute_on_single_replica_r2 SYNC;
@ -11,7 +9,7 @@ DROP TABLE IF EXISTS execute_on_single_replica_r2 SYNC;
CREATE TABLE execute_on_single_replica_r1 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r1') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10;
CREATE TABLE execute_on_single_replica_r2 (x UInt64) ENGINE=ReplicatedMergeTree('/clickhouse/tables/test_01532/execute_on_single_replica', 'r2') ORDER BY tuple() SETTINGS execute_merges_on_single_replica_time_threshold=10;
INSERT INTO execute_on_single_replica_r1 VALUES (1);
INSERT INTO execute_on_single_replica_r1 SETTINGS insert_keeper_fault_injection_probability=0 VALUES (1);
SYSTEM SYNC REPLICA execute_on_single_replica_r2;
SET optimize_throw_if_noop=1;

View File

@ -4,7 +4,7 @@ DROP TABLE IF EXISTS test_01640;
DROP TABLE IF EXISTS restore_01640;
CREATE TABLE test_01640(i Int64, d Date, s String)
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/test_01640','{replica}')
ENGINE = ReplicatedMergeTree('/clickhouse/{database}/{shard}/tables/test_01640','{replica}')
PARTITION BY toYYYYMM(d) ORDER BY i
SETTINGS allow_remote_fs_zero_copy_replication=0;
@ -16,13 +16,13 @@ PARTITION BY toYYYYMM(d) ORDER BY i
SETTINGS allow_remote_fs_zero_copy_replication=0;
ALTER TABLE restore_01640 FETCH PARTITION tuple(toYYYYMM(toDate('2021-01-01')))
FROM '/clickhouse/{database}/{shard}/tables/test_01640';
FROM '/clickhouse/{database}/{shard}/tables/test_01640' SETTINGS insert_keeper_fault_injection_probability=0;
SELECT partition_id
FROM system.detached_parts
WHERE (table = 'restore_01640') AND (database = currentDatabase());
ALTER TABLE restore_01640 ATTACH PARTITION tuple(toYYYYMM(toDate('2021-01-01')));
ALTER TABLE restore_01640 ATTACH PARTITION tuple(toYYYYMM(toDate('2021-01-01'))) SETTINGS insert_keeper_fault_injection_probability=0;;
SELECT partition_id
FROM system.detached_parts

View File

@ -7,6 +7,7 @@ CREATE TABLE quorum1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/{d
CREATE TABLE quorum2(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/{database}/test_02887/quorum', '2') ORDER BY x;
SET insert_keeper_fault_injection_probability=0;
SET insert_keeper_max_retries = 0;
SET insert_quorum = 2;
system enable failpoint replicated_merge_tree_insert_quorum_fail_0;