diff --git a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h index 3c7798a6fae..a667e6c675e 100644 --- a/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h +++ b/dbms/include/DB/Storages/MergeTree/ReshardingWorker.h @@ -6,7 +6,6 @@ #include #include -#include #include #include @@ -91,6 +90,8 @@ public: /// Получить статус заданной распределённой задачи. Status getStatus(); + zkutil::RWLock createDeletionLock(const std::string & coordinator_id); + private: /// Следить за появлением новых задач. Выполнить их последовательно. void pollAndExecute(); @@ -142,13 +143,20 @@ private: void waitForUploadCompletion(); size_t getPartitionCountUnlocked(const std::string & coordinator_id); + + bool updateOfflineNodes(const std::string & coordinator_id); bool updateOfflineNodes(); + bool updateOfflineNodesCommon(const std::string & path, const std::string & coordinator_id); + + Status getStatusCommon(const std::string & path, const std::string & coordinator_id); /// Функции, которые создают необходимые объекты для синхронизации /// распределённых задач. zkutil::RWLock createLock(); zkutil::RWLock createCoordinatorLock(const std::string & coordinator_id); - zkutil::Barrier createCheckBarrier(const std::string & coordinator_id); + + zkutil::SingleBarrier createSubscribeBarrier(const std::string & coordinator_id); + zkutil::SingleBarrier createCheckBarrier(const std::string & coordinator_id); zkutil::SingleBarrier createOptOutBarrier(const std::string & coordinator_id, size_t count); zkutil::SingleBarrier createRecoveryBarrier(const ReshardingJob & job); zkutil::SingleBarrier createUploadBarrier(const ReshardingJob & job); diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index 8566c8d5ac0..a9781888174 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -110,21 +110,23 @@ private: /// /// /lock: global distributed read/write lock; /// /online: currently online nodes; -/// /coordinators: one znode for each coordinator. +/// /coordination: one znode for each coordinator. /// /// A coordinator whose identifier is ${id} has the following layout -/// under the /coordinators/${id} znode: +/// under the /coordination/${id} znode: /// /// /lock: coordinator-specific distributed read/write lock; /// +/// /deletion_lock: for safe coordinator deletion +/// /// /query_hash: hash value obtained from the query that /// is sent to the participating nodes; /// /// /increment: unique block number allocator; /// -/// /status: coordinator status; +/// /status: coordinator status before its participating nodes have subscribed; /// -/// /status/${host}: distributed job status on a given participating node; +/// /status/${host}: status if an individual participating node; /// /// /cluster: cluster on which the distributed job is to be performed; /// @@ -137,6 +139,9 @@ private: /// /// /shards: the list of shards that have subscribed; /// +/// /subscribe_barrier: when all the participating nodes have subscribed +/// to their coordinator, proceed further +/// /// /check_barrier: when all the participating nodes have checked /// that they can perform resharding operations, proceed further; /// @@ -177,7 +182,7 @@ ReshardingWorker::ReshardingWorker(const Poco::Util::AbstractConfiguration & con zookeeper->createIfNotExists(distributed_online_path, ""); /// Notify that we are online. - zookeeper->create(distributed_online_path + "/" + current_host, "", + (void) zookeeper->tryCreate(distributed_online_path + "/" + current_host, "", zkutil::CreateMode::Ephemeral); distributed_lock_path = distributed_path + "/lock"; @@ -303,6 +308,7 @@ void ReshardingWorker::pollAndExecute() catch (...) { error = true; + tryLogCurrentException(__PRETTY_FUNCTION__); } if (error) @@ -389,6 +395,7 @@ void ReshardingWorker::perform(const Strings & job_nodes) } catch (...) { + tryLogCurrentException(__PRETTY_FUNCTION__); zookeeper->remove(child_full_path); throw; } @@ -752,6 +759,7 @@ void ReshardingWorker::applyChanges() } catch (...) { + tryLogCurrentException(__PRETTY_FUNCTION__); pool.wait(); throw; } @@ -822,6 +830,9 @@ std::string ReshardingWorker::createCoordinator(const Cluster & cluster) (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/lock", "", zkutil::CreateMode::Persistent); + (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/deletion_lock", + "", zkutil::CreateMode::Persistent); + (void) zookeeper->create(getCoordinatorPath(coordinator_id) + "/cluster", cluster_name, zkutil::CreateMode::Persistent); @@ -893,9 +904,14 @@ void ReshardingWorker::registerQuery(const std::string & coordinator_id, const s void ReshardingWorker::deleteCoordinator(const std::string & coordinator_id) { + /// We don't acquire a scoped lock because we delete everything including this lock. + auto deletion_lock = createDeletionLock(coordinator_id); + deletion_lock.acquireWrite(zkutil::RWLock::Blocking); + auto zookeeper = context.getZooKeeper(); if (zookeeper->exists(getCoordinatorPath(coordinator_id))) zookeeper->removeRecursive(getCoordinatorPath(coordinator_id)); + } UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std::string & query) @@ -978,7 +994,7 @@ UInt64 ReshardingWorker::subscribe(const std::string & coordinator_id, const std /// any guarantee that all the required nodes for this distributed job are online. /// We are inside a lightweight function, so it is not an issue. auto timeout = context.getSettingsRef().resharding_barrier_timeout; - createCheckBarrier(coordinator_id).enter(timeout); + createSubscribeBarrier(coordinator_id).enter(timeout); return block_number; } @@ -1107,7 +1123,7 @@ size_t ReshardingWorker::getNodeCount(const std::string & coordinator_id) void ReshardingWorker::waitForCheckCompletion(const std::string & coordinator_id) { - createCheckBarrier(coordinator_id).leave(); + createCheckBarrier(coordinator_id).enter(); } void ReshardingWorker::waitForOptOutCompletion(const std::string & coordinator_id, size_t count) @@ -1129,44 +1145,54 @@ void ReshardingWorker::setStatus(const std::string & coordinator_id, const std:: toString(static_cast(status))); } -ReshardingWorker::Status ReshardingWorker::getCoordinatorStatus(const std::string & coordinator_id) +bool ReshardingWorker::updateOfflineNodes(const std::string & coordinator_id) { - auto zookeeper = context.getZooKeeper(); - - auto status_str = zookeeper->get(getCoordinatorPath(coordinator_id) + "/status"); - return static_cast(std::stoull(status_str)); + return updateOfflineNodesCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id); } bool ReshardingWorker::updateOfflineNodes() +{ + return updateOfflineNodesCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id); +} + +bool ReshardingWorker::updateOfflineNodesCommon(const std::string & path, const std::string & coordinator_id) { auto zookeeper = context.getZooKeeper(); - auto job_nodes = zookeeper->getChildren(getPartitionPath(current_job) + "/nodes"); - std::sort(job_nodes.begin(), job_nodes.end()); + auto nodes = zookeeper->getChildren(path); + std::sort(nodes.begin(), nodes.end()); auto online = zookeeper->getChildren(distributed_path + "/online"); std::sort(online.begin(), online.end()); - std::vector offline(job_nodes.size()); - auto end = std::set_difference(job_nodes.begin(), job_nodes.end(), - online.begin(), online.end(), - offline.begin()); + std::vector offline(nodes.size()); + auto end = std::set_difference(nodes.begin(), nodes.end(), + online.begin(), online.end(), offline.begin()); offline.resize(end - offline.begin()); for (const auto & node : offline) - zookeeper->set(getCoordinatorPath(current_job.coordinator_id) + "/status/" + node, + zookeeper->set(coordinator_id + "/status/" + node, toString(static_cast(STATUS_ON_HOLD))); return !offline.empty(); } -/// Note: we don't need any synchronization for the status of a distributed job. -/// That's why we don't protect it with a read/write lock. +ReshardingWorker::Status ReshardingWorker::getCoordinatorStatus(const std::string & coordinator_id) +{ + return getStatusCommon(getCoordinatorPath(coordinator_id) + "/status", coordinator_id); +} + ReshardingWorker::Status ReshardingWorker::getStatus() { - auto zookeeper = context.getZooKeeper(); + return getStatusCommon(getPartitionPath(current_job) + "/nodes", current_job.coordinator_id); +} - auto coordinator_id = current_job.coordinator_id; +ReshardingWorker::Status ReshardingWorker::getStatusCommon(const std::string & path, const std::string & coordinator_id) +{ + /// Note: we don't need any synchronization for the status. + /// That's why we don't acquire any read/write lock. + + auto zookeeper = context.getZooKeeper(); auto status_str = zookeeper->get(getCoordinatorPath(coordinator_id) + "/status"); auto coordinator_status = std::stoull(status_str); @@ -1174,15 +1200,15 @@ ReshardingWorker::Status ReshardingWorker::getStatus() if (coordinator_status != STATUS_OK) return static_cast(coordinator_status); - (void) updateOfflineNodes(); + (void) updateOfflineNodesCommon(path, coordinator_id); - auto job_nodes = zookeeper->getChildren(getPartitionPath(current_job) + "/nodes"); + auto nodes = zookeeper->getChildren(path); bool has_error = false; bool has_on_hold = false; /// Determine the status. - for (const auto & node : job_nodes) + for (const auto & node : nodes) { auto status_str = zookeeper->get(getCoordinatorPath(coordinator_id) + "/status/" + node); auto status = std::stoull(status_str); @@ -1299,23 +1325,6 @@ void ReshardingWorker::abortPollingIfRequested() throw Exception("Cancelled resharding", ErrorCodes::ABORTED); } -void ReshardingWorker::abortCoordinatorIfRequested(const std::string & coordinator_id) -{ - bool must_abort; - - try - { - must_abort = must_stop || (getCoordinatorStatus(coordinator_id) != STATUS_OK); - } - catch (...) - { - must_abort = true; - } - - if (must_abort) - throw Exception("Cancelled resharding", ErrorCodes::ABORTED); -} - void ReshardingWorker::abortRecoveryIfRequested() { bool has_offline_nodes = false; @@ -1329,6 +1338,7 @@ void ReshardingWorker::abortRecoveryIfRequested() catch (...) { must_abort = true; + tryLogCurrentException(__PRETTY_FUNCTION__); } if (must_abort) @@ -1349,6 +1359,47 @@ void ReshardingWorker::abortRecoveryIfRequested() } } +void ReshardingWorker::abortCoordinatorIfRequested(const std::string & coordinator_id) +{ + bool is_remote_node_unavailable = false; + bool is_remote_node_error = false; + + bool cancellation_result = false; + + try + { + auto status = getCoordinatorStatus(coordinator_id); + if (status == STATUS_ON_HOLD) + is_remote_node_unavailable = true; + else if (status == STATUS_ERROR) + is_remote_node_error = true; + + cancellation_result = status != STATUS_OK; + } + catch (...) + { + cancellation_result = true; + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + bool must_abort = must_stop || cancellation_result; + + if (must_abort) + { + /// Important: always keep the following order. + if (must_stop) + throw Exception("Cancelled resharding", ErrorCodes::ABORTED); + else if (is_remote_node_unavailable) + throw Exception("Remote node unavailable", + ErrorCodes::RESHARDING_REMOTE_NODE_UNAVAILABLE); + else if (is_remote_node_error) + throw Exception("An error occurred on a remote node", + ErrorCodes::RESHARDING_REMOTE_NODE_ERROR); + else + throw Exception("An error occurred on local node", ErrorCodes::LOGICAL_ERROR); + } +} + void ReshardingWorker::abortJobIfRequested() { bool is_remote_node_unavailable = false; @@ -1370,6 +1421,7 @@ void ReshardingWorker::abortJobIfRequested() catch (...) { cancellation_result = true; + tryLogCurrentException(__PRETTY_FUNCTION__); } } @@ -1397,7 +1449,7 @@ zkutil::RWLock ReshardingWorker::createLock() { auto zookeeper = context.getZooKeeper(); - zkutil::RWLock lock(zookeeper, distributed_lock_path); + zkutil::RWLock lock{zookeeper, distributed_lock_path}; zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this); lock.setCancellationHook(hook); @@ -1408,8 +1460,7 @@ zkutil::RWLock ReshardingWorker::createCoordinatorLock(const std::string & coord { auto zookeeper = context.getZooKeeper(); - zkutil::RWLock lock(zookeeper, getCoordinatorPath(coordinator_id) + "/lock"); - + zkutil::RWLock lock{zookeeper, getCoordinatorPath(coordinator_id) + "/lock"}; zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this, coordinator_id); lock.setCancellationHook(hook); @@ -1417,17 +1468,42 @@ zkutil::RWLock ReshardingWorker::createCoordinatorLock(const std::string & coord return lock; } -zkutil::Barrier ReshardingWorker::createCheckBarrier(const std::string & coordinator_id) +zkutil::RWLock ReshardingWorker::createDeletionLock(const std::string & coordinator_id) +{ + auto zookeeper = context.getZooKeeper(); + + zkutil::RWLock lock{zookeeper, getCoordinatorPath(coordinator_id) + "/deletion_lock"}; + zkutil::RWLock::CancellationHook hook = std::bind(&ReshardingWorker::abortPollingIfRequested, this); + lock.setCancellationHook(hook); + + return lock; +} + +zkutil::SingleBarrier ReshardingWorker::createSubscribeBarrier(const std::string & coordinator_id) { auto zookeeper = context.getZooKeeper(); auto node_count = zookeeper->get(getCoordinatorPath(coordinator_id) + "/node_count"); - zkutil::Barrier check_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/check_barrier", - std::stoull(node_count)}; - zkutil::Barrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this, - coordinator_id - ); + zkutil::SingleBarrier subscribe_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/subscribe_barrier", + std::stoull(node_count)}; + zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this, + coordinator_id); + subscribe_barrier.setCancellationHook(hook); + + return subscribe_barrier; +} + +zkutil::SingleBarrier ReshardingWorker::createCheckBarrier(const std::string & coordinator_id) +{ + auto zookeeper = context.getZooKeeper(); + + auto node_count = zookeeper->get(getCoordinatorPath(coordinator_id) + "/node_count"); + + zkutil::SingleBarrier check_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/check_barrier", + std::stoull(node_count)}; + zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this, + coordinator_id); check_barrier.setCancellationHook(hook); return check_barrier; @@ -1440,7 +1516,6 @@ zkutil::SingleBarrier ReshardingWorker::createOptOutBarrier(const std::string & zkutil::SingleBarrier opt_out_barrier{zookeeper, getCoordinatorPath(coordinator_id) + "/opt_out_barrier", count}; - zkutil::SingleBarrier::CancellationHook hook = std::bind(&ReshardingWorker::abortCoordinatorIfRequested, this, coordinator_id); opt_out_barrier.setCancellationHook(hook); diff --git a/dbms/src/Storages/StorageDistributed.cpp b/dbms/src/Storages/StorageDistributed.cpp index 77e4bae1163..b840776b9a2 100644 --- a/dbms/src/Storages/StorageDistributed.cpp +++ b/dbms/src/Storages/StorageDistributed.cpp @@ -325,13 +325,20 @@ void StorageDistributed::reshardPartitions(ASTPtr query, const String & database throw Exception("StorageDistributed: Internal error", ErrorCodes::LOGICAL_ERROR); auto & stream = *stream_ptr; + stream.readPrefix(); + while (!stream.isCancelled() && stream.read()) ; + + stream.readSuffix(); } catch (...) { + tryLogCurrentException(__PRETTY_FUNCTION__); + try { + resharding_worker.setStatus(coordinator_id, ReshardingWorker::STATUS_ERROR); resharding_worker.deleteCoordinator(coordinator_id); } catch (...) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 686870b8035..877cbd63188 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -76,6 +76,7 @@ namespace ErrorCodes extern const int RESHARDING_INVALID_PARAMETERS; extern const int INVALID_SHARD_WEIGHT; extern const int DUPLICATE_SHARD_PATHS; + extern const int RESHARDING_COORDINATOR_DELETED; extern const int RESHARDING_NO_SUCH_COORDINATOR; extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP; extern const int RESHARDING_ALREADY_SUBSCRIBED; @@ -3486,11 +3487,6 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & std::string coordinator_id; UInt64 block_number = 0; - if (has_coordinator) - { - coordinator_id = coordinator.get(); - block_number = resharding_worker.subscribe(coordinator_id, queryToString(query)); - } /// List of local partitions that need to be resharded. ReshardingWorker::PartitionList partition_list; @@ -3506,6 +3502,21 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & try { + zkutil::RWLock deletion_lock; + + if (has_coordinator) + { + coordinator_id = coordinator.get(); + deletion_lock = std::move(resharding_worker.createDeletionLock(coordinator_id)); + } + + zkutil::RWLock::Guard guard{deletion_lock}; + if (!deletion_lock.ownsLock()) + throw Exception("Coordinator has been deleted", ErrorCodes::RESHARDING_COORDINATOR_DELETED); + + if (has_coordinator) + block_number = resharding_worker.subscribe(coordinator_id, queryToString(query)); + for (const auto & weighted_path : weighted_zookeeper_paths) { UInt64 weight = weighted_path.second; @@ -3665,6 +3676,11 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & /// has willfully attempted to botch an ongoing distributed resharding job. /// Consequently we don't take them into account. } + + else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED) + { + /// nothing here + } else { try @@ -3684,6 +3700,8 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & } catch (...) { + tryLogCurrentException(__PRETTY_FUNCTION__); + if (has_coordinator) { try diff --git a/libs/libzkutil/CMakeLists.txt b/libs/libzkutil/CMakeLists.txt index 61e5577ab8d..6f7d258d826 100644 --- a/libs/libzkutil/CMakeLists.txt +++ b/libs/libzkutil/CMakeLists.txt @@ -2,7 +2,6 @@ add_library(zkutil src/ZooKeeper.cpp src/Lock.cpp src/SingleBarrier.cpp - src/Barrier.cpp src/RWLock.cpp src/ZooKeeperHolder.cpp @@ -11,7 +10,6 @@ add_library(zkutil include/zkutil/KeeperException.h include/zkutil/Lock.h include/zkutil/SingleBarrier.h - include/zkutil/Barrier.h include/zkutil/RWLock.h include/zkutil/ZooKeeper.h include/zkutil/Types.h diff --git a/libs/libzkutil/include/zkutil/RWLock.h b/libs/libzkutil/include/zkutil/RWLock.h index a61e4f1854c..a0dd4341a0c 100644 --- a/libs/libzkutil/include/zkutil/RWLock.h +++ b/libs/libzkutil/include/zkutil/RWLock.h @@ -31,6 +31,8 @@ public: using CancellationHook = std::function; public: + RWLock() = default; + /// Create under the specified ZooKeeper path a queue for lock requests /// if it doesn't exist yet. RWLock(ZooKeeperPtr & zookeeper_, const std::string & path_); @@ -41,6 +43,8 @@ public: RWLock(RWLock &&) = default; RWLock & operator=(RWLock &&) = default; + operator bool() const; + /// Register a function that checks whether lock acquisition should be cancelled. void setCancellationHook(CancellationHook cancellation_hook_); diff --git a/libs/libzkutil/src/RWLock.cpp b/libs/libzkutil/src/RWLock.cpp index 530751fe839..7724a5e61a2 100644 --- a/libs/libzkutil/src/RWLock.cpp +++ b/libs/libzkutil/src/RWLock.cpp @@ -60,6 +60,11 @@ RWLock::RWLock(ZooKeeperPtr & zookeeper_, const std::string & path_) throw KeeperException(code); } +RWLock::operator bool() const +{ + return zookeeper && !path.empty(); +} + void RWLock::setCancellationHook(CancellationHook cancellation_hook_) { cancellation_hook = cancellation_hook_; @@ -79,11 +84,18 @@ void RWLock::release() { __sync_synchronize(); + if (!*this) + { + owns_lock = false; + return; + } + if (key.empty()) throw DB::Exception("RWLock: no lock is held", DB::ErrorCodes::LOGICAL_ERROR); zookeeper->remove(path + "/" + key); key.clear(); + owns_lock = false; } template @@ -93,6 +105,12 @@ void RWLock::acquireImpl(Mode mode) __sync_synchronize(); + if (!*this) + { + owns_lock = true; + return; + } + if (!key.empty()) throw DB::Exception("RWLock: lock already held", DB::ErrorCodes::RWLOCK_ALREADY_HELD);