Fix hanging DDL queries on Replicated database

There was a race condition when issuing a DDL query on a replica just
after a new replica was added.

If the DDL query is issued after the new replica adds itself to the
list of replicas, but before the new replica has finished its
recovery, then the first replica adds the new replica to the list of
replicas to wait to confirm the query was replicated.

Meanwhile, the new replica is still in recovery and applies queries
from the /metadata snapshot. When it's done, it bumps its log_ptr
without marking the corresponding log entries (if any) as finished.

The first replica then waits until distributed_ddl_task_timeout
expires and wrongly assumes the query was not replicated.

The issue is fixed by remembering the max_log_ptr at the exact point
where the replica adds itself to the list of replicas, then mark as
finished all queries that happened between that max_log_ptr and the
max_log_ptr of the metadata snapshot used in recovery.

The bug was randomly observed during a downstream test. It can be
reproduced more easily by inserting a sleep of a few seconds at the
end of createReplicaNodesInZooKeeper, enough to have time to issue a
DDL query on the first replica.
This commit is contained in:
Kevin Michel 2021-09-24 12:51:52 +02:00
parent 81de25d37f
commit aa3f4003c8
No known key found for this signature in database
GPG Key ID: 9F95C41F2EB138FC
2 changed files with 40 additions and 4 deletions

View File

@ -298,10 +298,30 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
/// Write host name to replica_path, it will protect from multiple replicas with the same name /// Write host name to replica_path, it will protect from multiple replicas with the same name
auto host_id = getHostID(getContext(), db_uuid); auto host_id = getHostID(getContext(), db_uuid);
Coordination::Requests ops; for (int attempts = 10; attempts > 0; --attempts)
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent)); {
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent)); Coordination::Stat stat;
current_zookeeper->multi(ops); String max_log_ptr_str = current_zookeeper->get(zookeeper_path + "/max_log_ptr", &stat);
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
/// In addition to creating the replica nodes, we record the max_log_ptr at the instant where
/// we declared ourself as an existing replica. We'll need this during recoverLostReplica to
/// notify other nodes that issued new queries while this node was recovering.
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/max_log_ptr", stat.version));
Coordination::Responses responses;
const auto code = current_zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK)
{
max_log_ptr_at_creation = parse<UInt32>(max_log_ptr_str);
break;
}
else if (code == Coordination::Error::ZNODEEXISTS || attempts == 1)
{
/// If its our last attempt, or if the replica already exists, fail immediately.
zkutil::KeeperMultiException::check(code, ops, responses);
}
}
createEmptyLogEntry(current_zookeeper); createEmptyLogEntry(current_zookeeper);
} }
@ -613,6 +633,21 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
InterpreterCreateQuery(query_ast, create_query_context).execute(); InterpreterCreateQuery(query_ast, create_query_context).execute();
} }
if (max_log_ptr_at_creation != 0)
{
/// If the replica is new and some of the queries applied during recovery
/// where issued after the replica was created, then other nodes might be
/// waiting for this node to notify them that the query was applied.
for (UInt32 ptr = max_log_ptr_at_creation; ptr <= max_log_ptr; ++ptr)
{
auto entry_name = DDLTaskBase::getLogEntryName(ptr);
auto path = fs::path(zookeeper_path) / "log" / entry_name / "finished" / getFullReplicaName();
auto status = ExecutionStatus(0).serializeText();
auto res = current_zookeeper->tryCreate(path, status, zkutil::CreateMode::Persistent);
if (res == Coordination::Error::ZOK)
LOG_INFO(log, "Marked recovered {} as finished", entry_name);
}
}
current_zookeeper->set(replica_path + "/log_ptr", toString(max_log_ptr)); current_zookeeper->set(replica_path + "/log_ptr", toString(max_log_ptr));
} }

View File

@ -90,6 +90,7 @@ private:
std::atomic_bool is_readonly = true; std::atomic_bool is_readonly = true;
std::unique_ptr<DatabaseReplicatedDDLWorker> ddl_worker; std::unique_ptr<DatabaseReplicatedDDLWorker> ddl_worker;
UInt32 max_log_ptr_at_creation = 0;
mutable ClusterPtr cluster; mutable ClusterPtr cluster;
}; };