mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
Dropped weak_ptrs!
This commit is contained in:
parent
9b55e66755
commit
32c77cd933
@ -39,7 +39,7 @@ class RWLockImpl::LockHolderImpl
|
||||
{
|
||||
RWLock parent;
|
||||
GroupsContainer::iterator it_group;
|
||||
QueryIdToHolder::key_type query_id;
|
||||
String query_id;
|
||||
CurrentMetrics::Increment active_client_increment;
|
||||
|
||||
LockHolderImpl(RWLock && parent, GroupsContainer::iterator it_group);
|
||||
@ -116,26 +116,34 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
|
||||
/// This FastPath tries to create a "light copy" of a lock without creating a separate LockHolderImpl
|
||||
if (request_has_query_id)
|
||||
{
|
||||
const auto it_query = query_id_to_holder.find(query_id);
|
||||
if (it_query != query_id_to_holder.end())
|
||||
const auto it_query = owner_queries.find(query_id);
|
||||
if (it_query != owner_queries.end())
|
||||
{
|
||||
res = it_query->second.lock();
|
||||
if (res)
|
||||
{
|
||||
/// XXX: it means we can't upgrade lock from read to write - with proper waiting!
|
||||
if (type == Write)
|
||||
throw Exception(
|
||||
"RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
const auto current_owner_group = queue.begin();
|
||||
|
||||
if (res->it_group->type == Write)
|
||||
throw Exception(
|
||||
"RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
/// XXX: it means we can't upgrade lock from read to write - with proper waiting!
|
||||
if (type == Write)
|
||||
throw Exception(
|
||||
"RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
finalize_metrics();
|
||||
return res;
|
||||
}
|
||||
if (current_owner_group->type == Write)
|
||||
throw Exception(
|
||||
"RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
res.reset(new LockHolderImpl(shared_from_this(), current_owner_group));
|
||||
res.make_shared(shared_from_this(), current_owner_group);
|
||||
++current_owner_group->referers;
|
||||
|
||||
++owner_queries[query_id];
|
||||
if (type == Type::Read)
|
||||
all_read_locks.add(query_id);
|
||||
|
||||
res->query_id = query_id;
|
||||
|
||||
finalize_metrics();
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
||||
@ -174,19 +182,18 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
|
||||
/// Will append myself to last group
|
||||
it_group = std::prev(queue.end());
|
||||
}
|
||||
++it_group->referers;
|
||||
res.reset(new LockHolderImpl(shared_from_this(), it_group));
|
||||
++it_group->referers;
|
||||
|
||||
/// Wait a notification until we will be the only in the group.
|
||||
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
|
||||
|
||||
/// Insert myself (weak_ptr to the holder) to queries set to implement recursive lock
|
||||
if (request_has_query_id)
|
||||
{
|
||||
query_id_to_holder.emplace(query_id, res);
|
||||
|
||||
++owner_queries[query_id];
|
||||
if (type == Type::Read)
|
||||
all_read_locks.add(query_id);
|
||||
|
||||
res->query_id = query_id;
|
||||
}
|
||||
|
||||
@ -201,18 +208,8 @@ RWLockImpl::LockHolderImpl::~LockHolderImpl()
|
||||
|
||||
if (query_id != RWLockImpl::NO_QUERY)
|
||||
{
|
||||
/// Since every access to query_id_to_holder is syncronized, expired() == true means
|
||||
/// that the current lock holder being destroyed is the last remaining to be associated with query_id
|
||||
/// (query_id_to_holder is only updated in PointA)
|
||||
/// so we can safely erase query_id, otherwise there still are other lock holders for this query_id
|
||||
///
|
||||
/// XXX: There might be other destructors as well running concurrently that might have already erased this query_id
|
||||
const auto it_query_id = parent->query_id_to_holder.find(query_id);
|
||||
if (it_query_id != parent->query_id_to_holder.end())
|
||||
{
|
||||
if (it_query_id->second.expired())
|
||||
parent->query_id_to_holder.erase(it_query_id);
|
||||
}
|
||||
if (--parent->owner_queries.at(query_id) == 0)
|
||||
parent->owner_queries.erase(query_id);
|
||||
|
||||
if (it_group->type == RWLockImpl::Read)
|
||||
all_read_locks.remove(query_id);
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -53,7 +54,7 @@ private:
|
||||
|
||||
struct Group;
|
||||
using GroupsContainer = std::list<Group>;
|
||||
using QueryIdToHolder = std::map<String, std::weak_ptr<LockHolderImpl>>;
|
||||
using OwnerQueryIds = std::unordered_map<String, size_t>;
|
||||
|
||||
/// Group of locking requests that should be granted concurrently
|
||||
/// i.e. a group can contain several readers, but only one writer
|
||||
@ -67,9 +68,10 @@ private:
|
||||
explicit Group(Type type_) : type{type_}, referers{0} {}
|
||||
};
|
||||
|
||||
mutable std::mutex mutex;
|
||||
GroupsContainer queue;
|
||||
QueryIdToHolder query_id_to_holder;
|
||||
OwnerQueryIds owner_queries;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
};
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user