fix race on enqueue query

This commit is contained in:
Alexander Tokmakov 2021-05-31 16:31:03 +03:00
parent 91cf7771ad
commit 0035997e02
4 changed files with 51 additions and 19 deletions

View File

@ -273,19 +273,11 @@ bool DatabaseReplicated::createDatabaseNodesInZooKeeper(const zkutil::ZooKeeperP
__builtin_unreachable();
}
void DatabaseReplicated::createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper)
void DatabaseReplicated::createEmptyLogEntry(const ZooKeeperPtr & current_zookeeper)
{
/// On replica creation add empty entry to log. Can be used to trigger some actions on other replicas (e.g. update cluster info).
DDLLogEntry entry{};
String query_path_prefix = zookeeper_path + "/log/query-";
String counter_prefix = zookeeper_path + "/counter/cnt-";
String counter_path = current_zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential);
String query_path = query_path_prefix + counter_path.substr(counter_prefix.size());
ops.emplace_back(zkutil::makeCreateRequest(query_path, entry.toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(query_path + "/committed", getFullReplicaName(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1));
DatabaseReplicatedDDLWorker::enqueueQueryImpl(current_zookeeper, entry, this, true);
}
void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPtr & current_zookeeper)
@ -296,8 +288,8 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(replica_path, host_id, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(replica_path + "/log_ptr", "0", zkutil::CreateMode::Persistent));
createEmptyLogEntry(ops, current_zookeeper);
current_zookeeper->multi(ops);
createEmptyLogEntry(current_zookeeper);
}
void DatabaseReplicated::loadStoredObjects(ContextPtr local_context, bool has_force_restore_data_flag, bool force_attach)
@ -659,10 +651,8 @@ ASTPtr DatabaseReplicated::parseQueryFromMetadataInZooKeeper(const String & node
void DatabaseReplicated::drop(ContextPtr context_)
{
auto current_zookeeper = getZooKeeper();
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(replica_path, DROPPED_MARK, -1));
createEmptyLogEntry(ops, current_zookeeper);
current_zookeeper->multi(ops);
current_zookeeper->set(replica_path, DROPPED_MARK, -1);
createEmptyLogEntry(current_zookeeper);
DatabaseAtomic::drop(context_);

View File

@ -78,7 +78,7 @@ private:
ClusterPtr getClusterImpl() const;
void setCluster(ClusterPtr && new_cluster);
void createEmptyLogEntry(Coordination::Requests & ops, const ZooKeeperPtr & current_zookeeper);
void createEmptyLogEntry(const ZooKeeperPtr & current_zookeeper);
String zookeeper_path;
String shard_name;

View File

@ -1,6 +1,7 @@
#include <Databases/DatabaseReplicatedWorker.h>
#include <Databases/DatabaseReplicated.h>
#include <Interpreters/DDLTask.h>
#include <Common/ZooKeeper/KeeperException.h>
namespace DB
{
@ -69,25 +70,63 @@ void DatabaseReplicatedDDLWorker::initializeReplication()
String DatabaseReplicatedDDLWorker::enqueueQuery(DDLLogEntry & entry)
{
auto zookeeper = getAndSetZooKeeper();
const String query_path_prefix = queue_dir + "/query-";
return enqueueQueryImpl(zookeeper, entry, database);
}
String DatabaseReplicatedDDLWorker::enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
DatabaseReplicated * const database, bool committed)
{
const String query_path_prefix = database->zookeeper_path + "/log/query-";
/// We cannot create sequential node and it's ephemeral child in a single transaction, so allocate sequential number another way
String counter_prefix = database->zookeeper_path + "/counter/cnt-";
String counter_path = zookeeper->create(counter_prefix, "", zkutil::CreateMode::EphemeralSequential);
String counter_lock_path = database->zookeeper_path + "/counter_lock";
String counter_path;
size_t iters = 1000;
while (--iters)
{
Coordination::Requests ops;
ops.emplace_back(zkutil::makeCreateRequest(counter_lock_path, database->getFullReplicaName(), zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(counter_prefix, "", zkutil::CreateMode::EphemeralSequential));
Coordination::Responses res;
Coordination::Error code = zookeeper->tryMulti(ops, res);
if (code == Coordination::Error::ZOK)
{
counter_path = dynamic_cast<const Coordination::CreateResponse &>(*res.back()).path_created;
break;
}
else if (code != Coordination::Error::ZNODEEXISTS)
zkutil::KeeperMultiException::check(code, ops, res);
}
if (iters == 0)
throw Exception(ErrorCodes::UNFINISHED,
"Cannot enqueue query, because some replica are trying to enqueue another query. "
"It may happen on high queries rate or, in rare cases, after connection loss. Client should retry.");
String node_path = query_path_prefix + counter_path.substr(counter_prefix.size());
/// Now create task in queue
Coordination::Requests ops;
/// Query is not committed yet, but we have to write it into log to avoid reordering
ops.emplace_back(zkutil::makeCreateRequest(node_path, entry.toString(), zkutil::CreateMode::Persistent));
/// '/try' will be replaced with '/committed' or will be removed due to expired session or other error
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/try", database->getFullReplicaName(), zkutil::CreateMode::Ephemeral));
if (committed)
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/committed", database->getFullReplicaName(), zkutil::CreateMode::Persistent));
else
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/try", database->getFullReplicaName(), zkutil::CreateMode::Ephemeral));
/// We don't need it anymore
ops.emplace_back(zkutil::makeRemoveRequest(counter_path, -1));
/// Unlock counters
ops.emplace_back(zkutil::makeRemoveRequest(counter_lock_path, -1));
/// Create status dirs
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/active", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(node_path + "/finished", "", zkutil::CreateMode::Persistent));
zookeeper->multi(ops);
return node_path;
}

View File

@ -29,6 +29,9 @@ public:
void shutdown() override;
static String enqueueQueryImpl(const ZooKeeperPtr & zookeeper, DDLLogEntry & entry,
DatabaseReplicated * const database, bool committed = false);
private:
bool initializeMainThread() override;
void initializeReplication();