mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 16:50:48 +00:00
dbms: More locking improvements. [#METR-18510]
This commit is contained in:
parent
d21060c35b
commit
3b9ea20c91
@ -337,7 +337,8 @@ namespace ErrorCodes
|
||||
extern const int RESHARDING_INVALID_QUERY = 331;
|
||||
extern const int RESHARDING_INITIATOR_CHECK_FAILED = 332;
|
||||
extern const int RWLOCK_ALREADY_HELD = 333;
|
||||
extern const int BARRIER_TIMEOUT = 334;
|
||||
extern const int RWLOCK_NO_SUCH_LOCK = 334;
|
||||
extern const int BARRIER_TIMEOUT = 335;
|
||||
|
||||
extern const int KEEPER_EXCEPTION = 999;
|
||||
extern const int POCO_EXCEPTION = 1000;
|
||||
|
@ -55,6 +55,7 @@ namespace ErrorCodes
|
||||
extern const int RESHARDING_COORDINATOR_DELETED;
|
||||
extern const int RESHARDING_DISTRIBUTED_JOB_ON_HOLD;
|
||||
extern const int RESHARDING_INVALID_QUERY;
|
||||
extern const int RWLOCK_NO_SUCH_LOCK;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -383,6 +384,8 @@ void ReshardingWorker::perform(const Strings & job_nodes)
|
||||
zookeeper->remove(child_full_path);
|
||||
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
|
||||
zookeeper->remove(child_full_path);
|
||||
else if (ex.code() == ErrorCodes::RWLOCK_NO_SUCH_LOCK)
|
||||
zookeeper->remove(child_full_path);
|
||||
else
|
||||
zookeeper->remove(child_full_path);
|
||||
}
|
||||
@ -410,6 +413,15 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
|
||||
|
||||
current_job = ReshardingJob{job_descriptor};
|
||||
|
||||
zkutil::RWLock deletion_lock;
|
||||
|
||||
if (current_job.isCoordinated())
|
||||
deletion_lock = std::move(createDeletionLock(current_job.coordinator_id));
|
||||
|
||||
zkutil::RWLock::Guard<zkutil::RWLock::Read, zkutil::RWLock::NonBlocking> guard{deletion_lock};
|
||||
if (!deletion_lock.ownsLock())
|
||||
throw Exception("Coordinator has been deleted", ErrorCodes::RESHARDING_COORDINATOR_DELETED);
|
||||
|
||||
StoragePtr generic_storage = context.getTable(current_job.database_name, current_job.table_name);
|
||||
auto & storage = typeid_cast<StorageReplicatedMergeTree &>(*(generic_storage.get()));
|
||||
current_job.storage = &storage;
|
||||
@ -451,6 +463,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
|
||||
}
|
||||
else if (ex.code() == ErrorCodes::RESHARDING_REMOTE_NODE_ERROR)
|
||||
{
|
||||
deletion_lock.release();
|
||||
hardCleanup();
|
||||
}
|
||||
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
|
||||
@ -483,6 +496,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
|
||||
auto current_host = getFQDNOrHostName();
|
||||
setStatus(current_job.coordinator_id, current_host, STATUS_ERROR);
|
||||
}
|
||||
deletion_lock.release();
|
||||
hardCleanup();
|
||||
}
|
||||
}
|
||||
@ -514,6 +528,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
|
||||
auto current_host = getFQDNOrHostName();
|
||||
setStatus(current_job.coordinator_id, current_host, STATUS_ERROR);
|
||||
}
|
||||
deletion_lock.release();
|
||||
hardCleanup();
|
||||
}
|
||||
}
|
||||
@ -525,6 +540,7 @@ void ReshardingWorker::perform(const std::string & job_descriptor)
|
||||
throw;
|
||||
}
|
||||
|
||||
deletion_lock.release();
|
||||
hardCleanup();
|
||||
LOG_DEBUG(log, "Resharding job successfully completed.");
|
||||
}
|
||||
@ -1171,7 +1187,7 @@ bool ReshardingWorker::updateOfflineNodesCommon(const std::string & path, const
|
||||
offline.resize(end - offline.begin());
|
||||
|
||||
for (const auto & node : offline)
|
||||
zookeeper->set(coordinator_id + "/status/" + node,
|
||||
zookeeper->set(getCoordinatorPath(coordinator_id) + "/status/" + node,
|
||||
toString(static_cast<UInt64>(STATUS_ON_HOLD)));
|
||||
|
||||
return !offline.empty();
|
||||
@ -1234,12 +1250,6 @@ void ReshardingWorker::attachJob()
|
||||
|
||||
auto zookeeper = context.getZooKeeper();
|
||||
|
||||
/// Check if the corresponding coordinator exists. If it doesn't, throw an exception,
|
||||
/// silently ignore this job, and switch to the next job.
|
||||
if (!zookeeper->exists(getCoordinatorPath(current_job.coordinator_id)))
|
||||
throw Exception("Coordinator " + current_job.coordinator_id + " was deleted. Ignoring",
|
||||
ErrorCodes::RESHARDING_COORDINATOR_DELETED);
|
||||
|
||||
auto status = getStatus();
|
||||
|
||||
if (status == STATUS_ERROR)
|
||||
|
@ -81,6 +81,7 @@ namespace ErrorCodes
|
||||
extern const int RESHARDING_NO_COORDINATOR_MEMBERSHIP;
|
||||
extern const int RESHARDING_ALREADY_SUBSCRIBED;
|
||||
extern const int RESHARDING_INVALID_QUERY;
|
||||
extern const int RWLOCK_NO_SUCH_LOCK;
|
||||
}
|
||||
|
||||
|
||||
@ -3675,7 +3676,8 @@ void StorageReplicatedMergeTree::reshardPartitions(ASTPtr query, const String &
|
||||
/// has willfully attempted to botch an ongoing distributed resharding job.
|
||||
/// Consequently we don't take them into account.
|
||||
}
|
||||
else if (ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED)
|
||||
else if ((ex.code() == ErrorCodes::RWLOCK_NO_SUCH_LOCK) ||
|
||||
(ex.code() == ErrorCodes::RESHARDING_COORDINATOR_DELETED))
|
||||
{
|
||||
/// nothing here
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ namespace ErrorCodes
|
||||
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int RWLOCK_ALREADY_HELD;
|
||||
extern const int RWLOCK_NO_SUCH_LOCK;
|
||||
extern const int ABORTED;
|
||||
|
||||
}
|
||||
@ -57,7 +58,12 @@ RWLock::RWLock(ZooKeeperPtr & zookeeper_, const std::string & path_)
|
||||
{
|
||||
int32_t code = zookeeper->tryCreate(path, "", CreateMode::Persistent);
|
||||
if ((code != ZOK) && (code != ZNODEEXISTS))
|
||||
throw KeeperException(code);
|
||||
{
|
||||
if (code == ZNONODE)
|
||||
throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK);
|
||||
else
|
||||
throw KeeperException(code);
|
||||
}
|
||||
}
|
||||
|
||||
RWLock::operator bool() const
|
||||
@ -117,13 +123,23 @@ void RWLock::acquireImpl(Mode mode)
|
||||
try
|
||||
{
|
||||
/// Enqueue a new request for a lock.
|
||||
key = zookeeper->create(path + "/" + Prefix<lock_type>::name,
|
||||
"", CreateMode::EphemeralSequential);
|
||||
int32_t code = zookeeper->tryCreate(path + "/" + Prefix<lock_type>::name,
|
||||
"", CreateMode::EphemeralSequential, key);
|
||||
if (code == ZNONODE)
|
||||
throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK);
|
||||
else if (code != ZOK)
|
||||
throw KeeperException(code);
|
||||
|
||||
key = key.substr(path.length() + 1);
|
||||
|
||||
while (true)
|
||||
{
|
||||
auto children = zookeeper->getChildren(path);
|
||||
std::vector<std::string> children;
|
||||
int32_t code = zookeeper->tryGetChildren(path, children);
|
||||
if (code == ZNONODE)
|
||||
throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK);
|
||||
else if (code != ZOK)
|
||||
throw KeeperException(code);
|
||||
|
||||
std::sort(children.begin(), children.end(), nodeQueueCmp);
|
||||
auto it = std::lower_bound(children.cbegin(), children.cend(), key, nodeQueueCmp);
|
||||
@ -168,7 +184,12 @@ void RWLock::acquireImpl(Mode mode)
|
||||
|
||||
if (mode == NonBlocking)
|
||||
{
|
||||
zookeeper->remove(path + "/" + key);
|
||||
int32_t code = zookeeper->tryRemove(path + "/" + key);
|
||||
if (code == ZNONODE)
|
||||
throw DB::Exception("No such lock", DB::ErrorCodes::RWLOCK_NO_SUCH_LOCK);
|
||||
else if (code != ZOK)
|
||||
throw KeeperException(code);
|
||||
|
||||
key.clear();
|
||||
break;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user