Merge pull request #61953 from hanfei1991/hanfei/fix-quorum-retry

fix logical-error when undoing quorum insert transaction
This commit is contained in:
Han Fei 2024-03-28 20:49:32 +01:00 committed by GitHub
commit 419ac380f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 120 additions and 3 deletions

View File

@ -205,7 +205,7 @@ enum class AccessType
M(SYSTEM_FLUSH, "", GROUP, SYSTEM) \
M(SYSTEM_THREAD_FUZZER, "SYSTEM START THREAD FUZZER, SYSTEM STOP THREAD FUZZER, START THREAD FUZZER, STOP THREAD FUZZER", GLOBAL, SYSTEM) \
M(SYSTEM_UNFREEZE, "SYSTEM UNFREEZE", GLOBAL, SYSTEM) \
M(SYSTEM_FAILPOINT, "SYSTEM ENABLE FAILPOINT, SYSTEM DISABLE FAILPOINT", GLOBAL, SYSTEM) \
M(SYSTEM_FAILPOINT, "SYSTEM ENABLE FAILPOINT, SYSTEM DISABLE FAILPOINT, SYSTEM WAIT FAILPOINT", GLOBAL, SYSTEM) \
M(SYSTEM_LISTEN, "SYSTEM START LISTEN, SYSTEM STOP LISTEN", GLOBAL, SYSTEM) \
M(SYSTEM_JEMALLOC, "SYSTEM JEMALLOC PURGE, SYSTEM JEMALLOC ENABLE PROFILE, SYSTEM JEMALLOC DISABLE PROFILE, SYSTEM JEMALLOC FLUSH PROFILE", GLOBAL, SYSTEM) \
M(SYSTEM, "", GROUP, ALL) /* allows to execute SYSTEM {SHUTDOWN|RELOAD CONFIG|...} */ \

View File

@ -50,7 +50,9 @@ static struct InitFiu
REGULAR(check_table_query_delay_for_part) \
REGULAR(dummy_failpoint) \
REGULAR(prefetched_reader_pool_failpoint) \
PAUSEABLE_ONCE(dummy_pausable_failpoint_once) \
PAUSEABLE_ONCE(replicated_merge_tree_insert_retry_pause) \
PAUSEABLE_ONCE(finish_set_quorum_failed_parts) \
PAUSEABLE_ONCE(finish_clean_quorum_failed_parts) \
PAUSEABLE(dummy_pausable_failpoint) \
ONCE(execute_query_calling_empty_set_result_func_on_exception)

View File

@ -755,6 +755,14 @@ BlockIO InterpreterSystemQuery::execute()
FailPointInjection::disableFailPoint(query.fail_point_name);
break;
}
case Type::WAIT_FAILPOINT:
{
getContext()->checkAccess(AccessType::SYSTEM_FAILPOINT);
LOG_TRACE(log, "waiting for failpoint {}", query.fail_point_name);
FailPointInjection::pauseFailPoint(query.fail_point_name);
LOG_TRACE(log, "finished failpoint {}", query.fail_point_name);
break;
}
case Type::RESET_COVERAGE:
{
getContext()->checkAccess(AccessType::SYSTEM);
@ -1454,6 +1462,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::STOP_THREAD_FUZZER:
case Type::START_THREAD_FUZZER:
case Type::ENABLE_FAILPOINT:
case Type::WAIT_FAILPOINT:
case Type::DISABLE_FAILPOINT:
case Type::RESET_COVERAGE:
case Type::UNKNOWN:

View File

@ -364,6 +364,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState & s
}
case Type::ENABLE_FAILPOINT:
case Type::DISABLE_FAILPOINT:
case Type::WAIT_FAILPOINT:
{
settings.ostr << ' ';
print_identifier(fail_point_name);

View File

@ -87,6 +87,7 @@ public:
UNFREEZE,
ENABLE_FAILPOINT,
DISABLE_FAILPOINT,
WAIT_FAILPOINT,
SYNC_FILESYSTEM_CACHE,
STOP_PULLING_REPLICATION_LOG,
START_PULLING_REPLICATION_LOG,

View File

@ -263,6 +263,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
}
case Type::ENABLE_FAILPOINT:
case Type::DISABLE_FAILPOINT:
case Type::WAIT_FAILPOINT:
{
ASTPtr ast;
if (ParserIdentifier{}.parse(pos, ast, expected))

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumEntry.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAddress.h>
#include <Interpreters/Context.h>
#include <Common/FailPoint.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <Common/randomSeed.h>
#include <Core/ServerUUID.h>
@ -24,6 +25,11 @@ namespace ErrorCodes
extern const int REPLICA_IS_ALREADY_ACTIVE;
}
namespace FailPoints
{
extern const char finish_clean_quorum_failed_parts[];
};
/// Used to check whether it's us who set node `is_active`, or not.
static String generateActiveNodeIdentifier()
{
@ -241,6 +247,7 @@ void ReplicatedMergeTreeRestartingThread::removeFailedQuorumParts()
storage.queue.removeFailedQuorumPart(part->info);
}
}
FailPointInjection::disableFailPoint(FailPoints::finish_clean_quorum_failed_parts);
}

View File

@ -30,6 +30,7 @@ namespace FailPoints
extern const char replicated_merge_tree_commit_zk_fail_after_op[];
extern const char replicated_merge_tree_insert_quorum_fail_0[];
extern const char replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault[];
extern const char replicated_merge_tree_insert_retry_pause[];
}
namespace ErrorCodes
@ -940,14 +941,27 @@ std::pair<std::vector<String>, bool> ReplicatedMergeTreeSinkImpl<async_insert>::
});
bool node_exists = false;
bool quorum_fail_exists = false;
/// The loop will be executed at least once
new_retry_controller.retryLoop([&]
{
fiu_do_on(FailPoints::replicated_merge_tree_commit_zk_fail_when_recovering_from_hw_fault, { zookeeper->forceFailureBeforeOperation(); });
FailPointInjection::pauseFailPoint(FailPoints::replicated_merge_tree_insert_retry_pause);
zookeeper->setKeeper(storage.getZooKeeper());
node_exists = zookeeper->exists(fs::path(storage.replica_path) / "parts" / part->name);
if (isQuorumEnabled())
quorum_fail_exists = zookeeper->exists(fs::path(storage.zookeeper_path) / "quorum" / "failed_parts" / part->name);
});
/// if it has quorum fail node, the restarting thread will clean the garbage.
if (quorum_fail_exists)
{
LOG_INFO(log, "Part {} fails to commit and will not retry or clean garbage. Restarting Thread will do everything.", part->name);
transaction.clear();
/// `quorum/failed_parts/part_name` exists because table is read only for a while, So we return table is read only.
throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode due to shutdown: replica_path={}", storage.replica_path);
}
if (node_exists)
{
LOG_DEBUG(log, "Insert of part {} recovered from keeper successfully. It will be committed", part->name);

View File

@ -141,6 +141,7 @@ namespace FailPoints
{
extern const char replicated_queue_fail_next_entry[];
extern const char replicated_queue_unfail_entries[];
extern const char finish_set_quorum_failed_parts[];
}
namespace ErrorCodes
@ -2221,6 +2222,7 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry, bool need_to_che
if (code == Coordination::Error::ZOK)
{
LOG_DEBUG(log, "Marked quorum for part {} as failed.", entry.new_part_name);
FailPointInjection::disableFailPoint(FailPoints::finish_set_quorum_failed_parts);
queue.removeFailedQuorumPart(part_info);
return true;
}

View File

@ -1,7 +1,9 @@
import concurrent
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
@ -361,3 +363,81 @@ def test_insert_quorum_with_ttl(started_cluster):
)
zero.query("DROP TABLE IF EXISTS test_insert_quorum_with_ttl ON CLUSTER cluster")
def test_insert_quorum_with_keeper_loss_connection():
zero.query(
"DROP TABLE IF EXISTS test_insert_quorum_with_keeper_fail ON CLUSTER cluster"
)
create_query = (
"CREATE TABLE test_insert_quorum_with_keeper_loss"
"(a Int8, d Date) "
"Engine = ReplicatedMergeTree('/clickhouse/tables/{table}', '{replica}') "
"ORDER BY a "
)
zero.query(create_query)
first.query(create_query)
first.query("SYSTEM STOP FETCHES test_insert_quorum_with_keeper_loss")
zero.query("SYSTEM ENABLE FAILPOINT replicated_merge_tree_commit_zk_fail_after_op")
zero.query("SYSTEM ENABLE FAILPOINT replicated_merge_tree_insert_retry_pause")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
insert_future = executor.submit(
lambda: zero.query(
"INSERT INTO test_insert_quorum_with_keeper_loss(a,d) VALUES(1, '2011-01-01')",
settings={"insert_quorum_timeout": 150000},
)
)
pm = PartitionManager()
pm.drop_instance_zk_connections(zero)
retries = 0
zk = cluster.get_kazoo_client("zoo1")
while True:
if (
zk.exists(
"/clickhouse/tables/test_insert_quorum_with_keeper_loss/replicas/zero/is_active"
)
is None
):
break
print("replica is still active")
time.sleep(1)
retries += 1
if retries == 120:
raise Exception("Can not wait cluster replica inactive")
first.query("SYSTEM ENABLE FAILPOINT finish_set_quorum_failed_parts")
quorum_fail_future = executor.submit(
lambda: first.query(
"SYSTEM WAIT FAILPOINT finish_set_quorum_failed_parts", timeout=300
)
)
first.query("SYSTEM START FETCHES test_insert_quorum_with_keeper_loss")
concurrent.futures.wait([quorum_fail_future])
assert quorum_fail_future.exception() is None
zero.query("SYSTEM ENABLE FAILPOINT finish_clean_quorum_failed_parts")
clean_quorum_fail_parts_future = executor.submit(
lambda: first.query(
"SYSTEM WAIT FAILPOINT finish_clean_quorum_failed_parts", timeout=300
)
)
pm.restore_instance_zk_connections(zero)
concurrent.futures.wait([clean_quorum_fail_parts_future])
assert clean_quorum_fail_parts_future.exception() is None
zero.query("SYSTEM DISABLE FAILPOINT replicated_merge_tree_insert_retry_pause")
concurrent.futures.wait([insert_future])
assert insert_future.exception() is not None
assert not zero.contains_in_log("LOGICAL_ERROR")
assert zero.contains_in_log(
"fails to commit and will not retry or clean garbage"
)

View File

@ -155,7 +155,7 @@ SYSTEM FLUSH ASYNC INSERT QUEUE ['FLUSH ASYNC INSERT QUEUE'] GLOBAL SYSTEM FLUSH
SYSTEM FLUSH [] \N SYSTEM
SYSTEM THREAD FUZZER ['SYSTEM START THREAD FUZZER','SYSTEM STOP THREAD FUZZER','START THREAD FUZZER','STOP THREAD FUZZER'] GLOBAL SYSTEM
SYSTEM UNFREEZE ['SYSTEM UNFREEZE'] GLOBAL SYSTEM
SYSTEM FAILPOINT ['SYSTEM ENABLE FAILPOINT','SYSTEM DISABLE FAILPOINT'] GLOBAL SYSTEM
SYSTEM FAILPOINT ['SYSTEM ENABLE FAILPOINT','SYSTEM DISABLE FAILPOINT','SYSTEM WAIT FAILPOINT'] GLOBAL SYSTEM
SYSTEM LISTEN ['SYSTEM START LISTEN','SYSTEM STOP LISTEN'] GLOBAL SYSTEM
SYSTEM JEMALLOC ['SYSTEM JEMALLOC PURGE','SYSTEM JEMALLOC ENABLE PROFILE','SYSTEM JEMALLOC DISABLE PROFILE','SYSTEM JEMALLOC FLUSH PROFILE'] GLOBAL SYSTEM
SYSTEM [] \N ALL