Merge pull request #52688 from Algunenano/better_zk_retries_on_insert

Improve insert retries on keeper session expiration
This commit is contained in:
Igor Nikonov 2023-08-31 17:00:02 +02:00 committed by GitHub
commit c3caead859
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 68 deletions

View File

@ -34,7 +34,6 @@ namespace ErrorCodes
extern const int TOO_FEW_LIVE_REPLICAS;
extern const int UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE;
extern const int UNEXPECTED_ZOOKEEPER_ERROR;
extern const int NO_ZOOKEEPER;
extern const int READONLY;
extern const int UNKNOWN_STATUS_OF_INSERT;
extern const int INSERT_WAS_DEDUPLICATED;
@ -150,81 +149,86 @@ ReplicatedMergeTreeSinkImpl<async_insert>::ReplicatedMergeTreeSinkImpl(
template<bool async_insert>
ReplicatedMergeTreeSinkImpl<async_insert>::~ReplicatedMergeTreeSinkImpl() = default;
/// Allow to verify that the session in ZooKeeper is still alive.
static void assertSessionIsNotExpired(const zkutil::ZooKeeperPtr & zookeeper)
{
if (!zookeeper)
throw Exception(ErrorCodes::NO_ZOOKEEPER, "No ZooKeeper session.");
if (zookeeper->expired())
throw Exception(ErrorCodes::NO_ZOOKEEPER, "ZooKeeper session has been expired.");
}
template<bool async_insert>
size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper)
{
if (!isQuorumEnabled())
return 0;
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
size_t replicas_number = 0;
Strings replicas = zookeeper->getChildren(fs::path(storage.zookeeper_path) / "replicas");
ZooKeeperRetriesControl quorum_retries_ctl("checkQuorumPrecondition", zookeeper_retries_info, context->getProcessListElement());
quorum_retries_ctl.retryLoop(
[&]()
{
zookeeper->setKeeper(storage.getZooKeeper());
Strings exists_paths;
exists_paths.reserve(replicas.size());
for (const auto & replica : replicas)
if (replica != storage.replica_name)
exists_paths.emplace_back(fs::path(storage.zookeeper_path) / "replicas" / replica / "is_active");
quorum_info.status_path = storage.zookeeper_path + "/quorum/status";
auto exists_result = zookeeper->exists(exists_paths);
auto get_results = zookeeper->get(Strings{storage.replica_path + "/is_active", storage.replica_path + "/host"});
Strings replicas = zookeeper->getChildren(fs::path(storage.zookeeper_path) / "replicas");
Coordination::Error keeper_error = Coordination::Error::ZOK;
size_t active_replicas = 1; /// Assume current replica is active (will check below)
for (size_t i = 0; i < exists_paths.size(); ++i)
{
auto error = exists_result[i].error;
if (error == Coordination::Error::ZOK)
++active_replicas;
else if (Coordination::isHardwareError(error))
keeper_error = error;
}
Strings exists_paths;
exists_paths.reserve(replicas.size());
for (const auto & replica : replicas)
if (replica != storage.replica_name)
exists_paths.emplace_back(fs::path(storage.zookeeper_path) / "replicas" / replica / "is_active");
size_t replicas_number = replicas.size();
size_t quorum_size = getQuorumSize(replicas_number);
auto exists_result = zookeeper->exists(exists_paths);
auto get_results = zookeeper->get(Strings{storage.replica_path + "/is_active", storage.replica_path + "/host"});
if (active_replicas < quorum_size)
{
if (Coordination::isHardwareError(keeper_error))
throw Coordination::Exception::fromMessage(keeper_error, "Failed to check number of alive replicas");
Coordination::Error keeper_error = Coordination::Error::ZOK;
size_t active_replicas = 1; /// Assume current replica is active (will check below)
for (size_t i = 0; i < exists_paths.size(); ++i)
{
auto error = exists_result[i].error;
if (error == Coordination::Error::ZOK)
++active_replicas;
else if (Coordination::isHardwareError(error))
keeper_error = error;
}
throw Exception(ErrorCodes::TOO_FEW_LIVE_REPLICAS, "Number of alive replicas ({}) is less than requested quorum ({}/{}).",
active_replicas, quorum_size, replicas_number);
}
replicas_number = replicas.size();
size_t quorum_size = getQuorumSize(replicas_number);
/** Is there a quorum for the last part for which a quorum is needed?
* Write of all the parts with the included quorum is linearly ordered.
* This means that at any time there can be only one part,
* for which you need, but not yet reach the quorum.
* Information about this part will be located in `/quorum/status` node.
* If the quorum is reached, then the node is deleted.
*/
if (active_replicas < quorum_size)
{
if (Coordination::isHardwareError(keeper_error))
throw Coordination::Exception::fromMessage(keeper_error, "Failed to check number of alive replicas");
String quorum_status;
if (!quorum_parallel && zookeeper->tryGet(quorum_info.status_path, quorum_status))
throw Exception(ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE,
"Quorum for previous write has not been satisfied yet. Status: {}", quorum_status);
throw Exception(
ErrorCodes::TOO_FEW_LIVE_REPLICAS,
"Number of alive replicas ({}) is less than requested quorum ({}/{}).",
active_replicas,
quorum_size,
replicas_number);
}
/// Both checks are implicitly made also later (otherwise there would be a race condition).
/** Is there a quorum for the last part for which a quorum is needed?
* Write of all the parts with the included quorum is linearly ordered.
* This means that at any time there can be only one part,
* for which you need, but not yet reach the quorum.
* Information about this part will be located in `/quorum/status` node.
* If the quorum is reached, then the node is deleted.
*/
auto is_active = get_results[0];
auto host = get_results[1];
String quorum_status;
if (!quorum_parallel && zookeeper->tryGet(quorum_info.status_path, quorum_status))
throw Exception(
ErrorCodes::UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE,
"Quorum for previous write has not been satisfied yet. Status: {}",
quorum_status);
if (is_active.error == Coordination::Error::ZNONODE || host.error == Coordination::Error::ZNONODE)
throw Exception(ErrorCodes::READONLY, "Replica is not active right now");
/// Both checks are implicitly made also later (otherwise there would be a race condition).
quorum_info.is_active_node_version = is_active.stat.version;
quorum_info.host_node_version = host.stat.version;
auto is_active = get_results[0];
auto host = get_results[1];
if (is_active.error == Coordination::Error::ZNONODE || host.error == Coordination::Error::ZNONODE)
throw Exception(ErrorCodes::READONLY, "Replica is not active right now");
quorum_info.is_active_node_version = is_active.stat.version;
quorum_info.host_node_version = host.stat.version;
});
return replicas_number;
}
@ -257,14 +261,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
* And also check that during the insertion, the replica was not reinitialized or disabled (by the value of `is_active` node).
* TODO Too complex logic, you can do better.
*/
size_t replicas_num = 0;
ZooKeeperRetriesControl quorum_retries_ctl("checkQuorumPrecondition", zookeeper_retries_info, context->getProcessListElement());
quorum_retries_ctl.retryLoop(
[&]()
{
zookeeper->setKeeper(storage.getZooKeeper());
replicas_num = checkQuorumPrecondition(zookeeper);
});
size_t replicas_num = checkQuorumPrecondition(zookeeper);
if (!storage_snapshot->object_columns.empty())
convertDynamicColumnsToTuples(block, storage_snapshot);
@ -483,7 +480,6 @@ bool ReplicatedMergeTreeSinkImpl<false>::writeExistingPart(MergeTreeData::Mutabl
{
/// NOTE: No delay in this case. That's Ok.
auto origin_zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(origin_zookeeper);
auto zookeeper = std::make_shared<ZooKeeperWithFaultInjection>(origin_zookeeper);
size_t replicas_num = checkQuorumPrecondition(zookeeper);
@ -1058,7 +1054,6 @@ template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::onFinish()
{
auto zookeeper = storage.getZooKeeper();
assertSessionIsNotExpired(zookeeper);
finishDelayedChunk(std::make_shared<ZooKeeperWithFaultInjection>(zookeeper));
}

View File

@ -2948,7 +2948,7 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
# while materialized view is working to inject zookeeper failure
pm.drop_instance_zk_connections(instance)
instance.wait_for_log_line(
"Error.*(session has been expired|Connection loss).*while pushing to view"
"Error.*(Connection loss|Coordination::Exception).*while pushing to view"
)
pm.heal_all()
instance.wait_for_log_line("Committed offset 22")