fix race between replicas

This commit is contained in:
Alexander Tokmakov 2021-11-22 19:46:34 +03:00
parent ba6adafae3
commit 5868f7590c
2 changed files with 23 additions and 19 deletions

View File

@ -53,8 +53,6 @@ namespace ErrorCodes
constexpr const char * TASK_PROCESSED_OUT_REASON = "Task has been already processed"; constexpr const char * TASK_PROCESSED_OUT_REASON = "Task has been already processed";
namespace
{
/** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases, /** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases,
* and highlights your poor understanding of distributed systems. * and highlights your poor understanding of distributed systems.
@ -104,6 +102,11 @@ public:
void unlock() void unlock()
{ {
if (!locked)
return;
locked = false;
if (zookeeper->expired()) if (zookeeper->expired())
{ {
LOG_WARNING(log, "Lock is lost, because session was expired. Path: {}, message: {}", lock_path, lock_message); LOG_WARNING(log, "Lock is lost, because session was expired. Path: {}, message: {}", lock_path, lock_message);
@ -129,18 +132,16 @@ public:
std::string dummy; std::string dummy;
Coordination::Error code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy); Coordination::Error code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy);
if (code == Coordination::Error::ZNODEEXISTS) if (code == Coordination::Error::ZOK)
{ {
return false; locked = true;
} }
else if (code == Coordination::Error::ZOK) else if (code != Coordination::Error::ZNODEEXISTS)
{
return true;
}
else
{ {
throw Coordination::Exception(code); throw Coordination::Exception(code);
} }
return locked;
} }
private: private:
@ -149,6 +150,7 @@ private:
std::string lock_path; std::string lock_path;
std::string lock_message; std::string lock_message;
Poco::Logger * log; Poco::Logger * log;
bool locked = false;
}; };
@ -158,8 +160,6 @@ std::unique_ptr<ZooKeeperLock> createSimpleZooKeeperLock(
return std::make_unique<ZooKeeperLock>(zookeeper, lock_prefix, lock_name, lock_message); return std::make_unique<ZooKeeperLock>(zookeeper, lock_prefix, lock_name, lock_message);
} }
}
DDLWorker::DDLWorker( DDLWorker::DDLWorker(
int pool_size_, int pool_size_,
@ -654,6 +654,10 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
zookeeper->create(active_node_path, {}, zkutil::CreateMode::Ephemeral); zookeeper->create(active_node_path, {}, zkutil::CreateMode::Ephemeral);
} }
/// We must hold the lock until task execution status is committed to ZooKeeper,
/// otherwise another replica may try to execute query again.
std::unique_ptr<ZooKeeperLock> execute_on_leader_lock;
/// Step 2: Execute query from the task. /// Step 2: Execute query from the task.
if (!task.was_executed) if (!task.was_executed)
{ {
@ -684,7 +688,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper)
if (task.execute_on_leader) if (task.execute_on_leader)
{ {
tryExecuteQueryOnLeaderReplica(task, storage, rewritten_query, task.entry_path, zookeeper); tryExecuteQueryOnLeaderReplica(task, storage, rewritten_query, task.entry_path, zookeeper, execute_on_leader_lock);
} }
else else
{ {
@ -771,7 +775,8 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
StoragePtr storage, StoragePtr storage,
const String & rewritten_query, const String & rewritten_query,
const String & /*node_path*/, const String & /*node_path*/,
const ZooKeeperPtr & zookeeper) const ZooKeeperPtr & zookeeper,
std::unique_ptr<ZooKeeperLock> & execute_on_leader_lock)
{ {
StorageReplicatedMergeTree * replicated_storage = dynamic_cast<StorageReplicatedMergeTree *>(storage.get()); StorageReplicatedMergeTree * replicated_storage = dynamic_cast<StorageReplicatedMergeTree *>(storage.get());
@ -809,7 +814,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
pcg64 rng(randomSeed()); pcg64 rng(randomSeed());
auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str); execute_on_leader_lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str);
Stopwatch stopwatch; Stopwatch stopwatch;
@ -839,7 +844,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot execute initial query on non-leader replica"); throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot execute initial query on non-leader replica");
/// Any replica which is leader tries to take lock /// Any replica which is leader tries to take lock
if (status.is_leader && lock->tryLock()) if (status.is_leader && execute_on_leader_lock->tryLock())
{ {
/// In replicated merge tree we can have multiple leaders. So we can /// In replicated merge tree we can have multiple leaders. So we can
/// be "leader" and took lock, but another "leader" replica may have /// be "leader" and took lock, but another "leader" replica may have
@ -868,8 +873,6 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica(
executed_by_us = true; executed_by_us = true;
break; break;
} }
lock->unlock();
} }
/// Waiting for someone who will execute query and change is_executed_path node /// Waiting for someone who will execute query and change is_executed_path node

View File

@ -38,7 +38,7 @@ struct DDLTaskBase;
using DDLTaskPtr = std::unique_ptr<DDLTaskBase>; using DDLTaskPtr = std::unique_ptr<DDLTaskBase>;
using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>; using ZooKeeperPtr = std::shared_ptr<zkutil::ZooKeeper>;
class AccessRightsElements; class AccessRightsElements;
class ZooKeeperLock;
class DDLWorker class DDLWorker
{ {
@ -94,7 +94,8 @@ protected:
StoragePtr storage, StoragePtr storage,
const String & rewritten_query, const String & rewritten_query,
const String & node_path, const String & node_path,
const ZooKeeperPtr & zookeeper); const ZooKeeperPtr & zookeeper,
std::unique_ptr<ZooKeeperLock> & execute_on_leader_lock);
bool tryExecuteQuery(const String & query, DDLTaskBase & task, const ZooKeeperPtr & zookeeper); bool tryExecuteQuery(const String & query, DDLTaskBase & task, const ZooKeeperPtr & zookeeper);