From 3b9ea20c91b748ef05f926cee02d18bcf75f6a12 Mon Sep 17 00:00:00 2001 From: Alexey Arno Date: Thu, 3 Mar 2016 19:20:19 +0300 Subject: [PATCH] dbms: More locking improvements. [#METR-18510] --- dbms/src/Core/ErrorCodes.cpp | 3 +- .../Storages/MergeTree/ReshardingWorker.cpp | 24 +++++++++----- .../Storages/StorageReplicatedMergeTree.cpp | 4 ++- libs/libzkutil/src/RWLock.cpp | 31 ++++++++++++++++--- 4 files changed, 48 insertions(+), 14 deletions(-) diff --git a/dbms/src/Core/ErrorCodes.cpp b/dbms/src/Core/ErrorCodes.cpp index 49dd24dc2f3..6e38cb91881 100644 --- a/dbms/src/Core/ErrorCodes.cpp +++ b/dbms/src/Core/ErrorCodes.cpp @@ -337,7 +337,8 @@ namespace ErrorCodes extern const int RESHARDING_INVALID_QUERY = 331; extern const int RESHARDING_INITIATOR_CHECK_FAILED = 332; extern const int RWLOCK_ALREADY_HELD = 333; - extern const int BARRIER_TIMEOUT = 334; + extern const int RWLOCK_NO_SUCH_LOCK = 334; + extern const int BARRIER_TIMEOUT = 335; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp index a9781888174..f8d7cc6a08f 100644 --- a/dbms/src/Storages/MergeTree/ReshardingWorker.cpp +++ b/dbms/src/Storages/MergeTree/ReshardingWorker.cpp @@ -55,6 +55,7 @@ namespace ErrorCodes extern const int RESHARDING_COORDINATOR_DELETED; extern const int RESHARDING_DISTRIBUTED_JOB_ON_HOLD; extern const int RESHARDING_INVALID_QUERY; + extern const int RWLOCK_NO_SUCH_LOCK; } namespace @@ -383,6 +384,8 @@ void ReshardingWorker::perform(const Strings & job_nodes) zookeeper->remove(child_full_path); else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED) zookeeper->remove(child_full_path); + else if (ex.code() == ErrorCodes::RWLOCK_NO_SUCH_LOCK) + zookeeper->remove(child_full_path); else zookeeper->remove(child_full_path); } @@ -410,6 +413,15 @@ void ReshardingWorker::perform(const std::string & job_descriptor) current_job = ReshardingJob{job_descriptor}; + zkutil::RWLock deletion_lock; + + if (current_job.isCoordinated()) + deletion_lock = std::move(createDeletionLock(current_job.coordinator_id)); + + zkutil::RWLock::Guard guard{deletion_lock}; + if (!deletion_lock.ownsLock()) + throw Exception("Coordinator has been deleted", ErrorCodes::RESHARDING_COORDINATOR_DELETED); + StoragePtr generic_storage = context.getTable(current_job.database_name, current_job.table_name); auto & storage = typeid_cast(*(generic_storage.get())); current_job.storage = &storage; @@ -451,6 +463,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor) } else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_ERROR) { + deletion_lock.release(); hardCleanup(); } else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED) @@ -483,6 +496,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor) auto current_host = getFQDNOrHostName(); setStatus(current_job.coordinator_id, current_host, STATUS_ERROR); } + deletion_lock.release(); hardCleanup(); } } @@ -514,6 +528,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor) auto current_host = getFQDNOrHostName(); setStatus(current_job.coordinator_id, current_host, STATUS_ERROR); } + deletion_lock.release(); hardCleanup(); } } @@ -525,6 +540,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor) throw; } + deletion_lock.release(); hardCleanup(); LOG_DEBUG(log, "Resharding job successfully completed."); } @@ -1171,7 +1187,7 @@ bool ReshardingWorker::updateOfflineNodesCommon(const std::string & path, const offline.resize(end - offline.begin()); for (const auto & node : offline) - zookeeper->set(coordinator_id + "/status/" + node, + zookeeper->set(getCoordinatorPath(coordinator_id) + "/status/" + node, toString(static_cast(STATUS_ON_HOLD))); return !offline.empty(); @@ -1234,12 +1250,6 @@ void ReshardingWorker::attachJob() auto zookeeper = context.getZooKeeper(); - /// Check if the corresponding coordinator exists. If it doesn't, throw an exception, - /// silently ignore this job, and switch to the next job. - if (!zookeeper->exists(getCoordinatorPath(current_job.coordinator_id))) - throw Exception("Coordinator " + current_job.coordinator_id + " was deleted. Ignoring", - ErrorCodes::RESHARDING_COORDINATOR_DELETED); - auto status = getStatus(); if (status == STATUS_ERROR) diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index f9bd28fa7d4..890f21fa812 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -81,6 +81,7 @@ namespace ErrorCodes extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP; extern const int RESHARDING_ALREADY_SUBSCRIBED; extern const int RESHARDING_INVALID_QUERY; + extern const int RWLOCK_NO_SUCH_LOCK; } @@ -3675,7 +3676,8 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String & /// has willfully attempted to botch an ongoing distributed resharding job. /// Consequently we don't take them into account. } - else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED) + else if ((ex.code() == ErrorCodes::RWLOCK_NO_SUCH_LOCK) || + (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)) { /// nothing here } diff --git a/libs/libzkutil/src/RWLock.cpp b/libs/libzkutil/src/RWLock.cpp index 7724a5e61a2..84fba70c3b8 100644 --- a/libs/libzkutil/src/RWLock.cpp +++ b/libs/libzkutil/src/RWLock.cpp @@ -10,6 +10,7 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; extern const int RWLOCK_ALREADY_HELD; +extern const int RWLOCK_NO_SUCH_LOCK; extern const int ABORTED; } @@ -57,7 +58,12 @@ RWLock::RWLock(ZooKeeperPtr & zookeeper_, const std::string & path_) { int32_t code = zookeeper->tryCreate(path, "", CreateMode::Persistent); if ((code != ZOK) && (code != ZNODEEXISTS)) - throw KeeperException(code); + { + if (code == ZNONODE) + throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK); + else + throw KeeperException(code); + } } RWLock::operator bool() const @@ -117,13 +123,23 @@ void RWLock::acquireImpl(Mode mode) try { /// Enqueue a new request for a lock. - key = zookeeper->create(path + "/" + Prefix::name, - "", CreateMode::EphemeralSequential); + int32_t code = zookeeper->tryCreate(path + "/" + Prefix::name, + "", CreateMode::EphemeralSequential, key); + if (code == ZNONODE) + throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK); + else if (code != ZOK) + throw KeeperException(code); + key = key.substr(path.length() + 1); while (true) { - auto children = zookeeper->getChildren(path); + std::vector children; + int32_t code = zookeeper->tryGetChildren(path, children); + if (code == ZNONODE) + throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK); + else if (code != ZOK) + throw KeeperException(code); std::sort(children.begin(), children.end(), nodeQueueCmp); auto it = std::lower_bound(children.cbegin(), children.cend(), key, nodeQueueCmp); @@ -168,7 +184,12 @@ void RWLock::acquireImpl(Mode mode) if (mode == NonBlocking) { - zookeeper->remove(path + "/" + key); + int32_t code = zookeeper->tryRemove(path + "/" + key); + if (code == ZNONODE) + throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK); + else if (code != ZOK) + throw KeeperException(code); + key.clear(); break; }