diff --git a/src/Interpreters/DDLWorker.cpp b/src/Interpreters/DDLWorker.cpp index 80ce8db7e8b..27bb4906f1a 100644 --- a/src/Interpreters/DDLWorker.cpp +++ b/src/Interpreters/DDLWorker.cpp @@ -53,8 +53,6 @@ namespace ErrorCodes constexpr const char * TASK_PROCESSED_OUT_REASON = "Task has been already processed"; -namespace -{ /** Caveats: usage of locks in ZooKeeper is incorrect in 99% of cases, * and highlights your poor understanding of distributed systems. @@ -104,6 +102,11 @@ public: void unlock() { + if (!locked) + return; + + locked = false; + if (zookeeper->expired()) { LOG_WARNING(log, "Lock is lost, because session was expired. Path: {}, message: {}", lock_path, lock_message); @@ -129,18 +132,16 @@ public: std::string dummy; Coordination::Error code = zookeeper->tryCreate(lock_path, lock_message, zkutil::CreateMode::Ephemeral, dummy); - if (code == Coordination::Error::ZNODEEXISTS) + if (code == Coordination::Error::ZOK) { - return false; + locked = true; } - else if (code == Coordination::Error::ZOK) - { - return true; - } - else + else if (code != Coordination::Error::ZNODEEXISTS) { throw Coordination::Exception(code); } + + return locked; } private: @@ -149,6 +150,7 @@ private: std::string lock_path; std::string lock_message; Poco::Logger * log; + bool locked = false; }; @@ -158,8 +160,6 @@ std::unique_ptr createSimpleZooKeeperLock( return std::make_unique(zookeeper, lock_prefix, lock_name, lock_message); } -} - DDLWorker::DDLWorker( int pool_size_, @@ -654,6 +654,10 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) zookeeper->create(active_node_path, {}, zkutil::CreateMode::Ephemeral); } + /// We must hold the lock until task execution status is committed to ZooKeeper, + /// otherwise another replica may try to execute query again. + std::unique_ptr execute_on_leader_lock; + /// Step 2: Execute query from the task. if (!task.was_executed) { @@ -684,7 +688,7 @@ void DDLWorker::processTask(DDLTaskBase & task, const ZooKeeperPtr & zookeeper) if (task.execute_on_leader) { - tryExecuteQueryOnLeaderReplica(task, storage, rewritten_query, task.entry_path, zookeeper); + tryExecuteQueryOnLeaderReplica(task, storage, rewritten_query, task.entry_path, zookeeper, execute_on_leader_lock); } else { @@ -771,7 +775,8 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( StoragePtr storage, const String & rewritten_query, const String & /*node_path*/, - const ZooKeeperPtr & zookeeper) + const ZooKeeperPtr & zookeeper, + std::unique_ptr & execute_on_leader_lock) { StorageReplicatedMergeTree * replicated_storage = dynamic_cast(storage.get()); @@ -809,7 +814,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( pcg64 rng(randomSeed()); - auto lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str); + execute_on_leader_lock = createSimpleZooKeeperLock(zookeeper, shard_path, "lock", task.host_id_str); Stopwatch stopwatch; @@ -839,7 +844,7 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( throw Exception(ErrorCodes::NOT_A_LEADER, "Cannot execute initial query on non-leader replica"); /// Any replica which is leader tries to take lock - if (status.is_leader && lock->tryLock()) + if (status.is_leader && execute_on_leader_lock->tryLock()) { /// In replicated merge tree we can have multiple leaders. So we can /// be "leader" and took lock, but another "leader" replica may have @@ -868,8 +873,6 @@ bool DDLWorker::tryExecuteQueryOnLeaderReplica( executed_by_us = true; break; } - - lock->unlock(); } /// Waiting for someone who will execute query and change is_executed_path node diff --git a/src/Interpreters/DDLWorker.h b/src/Interpreters/DDLWorker.h index d2b7c9d169d..0b8b0a4a4d8 100644 --- a/src/Interpreters/DDLWorker.h +++ b/src/Interpreters/DDLWorker.h @@ -38,7 +38,7 @@ struct DDLTaskBase; using DDLTaskPtr = std::unique_ptr; using ZooKeeperPtr = std::shared_ptr; class AccessRightsElements; - +class ZooKeeperLock; class DDLWorker { @@ -94,7 +94,8 @@ protected: StoragePtr storage, const String & rewritten_query, const String & node_path, - const ZooKeeperPtr & zookeeper); + const ZooKeeperPtr & zookeeper, + std::unique_ptr & execute_on_leader_lock); bool tryExecuteQuery(const String & query, DDLTaskBase & task, const ZooKeeperPtr & zookeeper);