mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #6854 from Akazz/rwlock-cumulative-update-1
Rwlock sanitary update 1: removed weak_ptrs from implementation + added strong exception safety guarantee
This commit is contained in:
commit
3ca084ec57
@ -4,8 +4,6 @@
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
#include <cassert>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
@ -35,22 +33,48 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
/** A single-use object that represents lock's ownership
|
||||
* For the purpose of exception safety guarantees LockHolder is to be used in two steps:
|
||||
* 1. Create an instance (allocating all the memory needed)
|
||||
* 2. Associate the instance with the lock (attach to the lock and locking request group)
|
||||
*/
|
||||
class RWLockImpl::LockHolderImpl
|
||||
{
|
||||
bool bound{false};
|
||||
Type lock_type;
|
||||
String query_id;
|
||||
CurrentMetrics::Increment active_client_increment;
|
||||
RWLock parent;
|
||||
GroupsContainer::iterator it_group;
|
||||
ClientsContainer::iterator it_client;
|
||||
QueryIdToHolder::key_type query_id;
|
||||
CurrentMetrics::Increment active_client_increment;
|
||||
|
||||
LockHolderImpl(RWLock && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client);
|
||||
|
||||
public:
|
||||
|
||||
LockHolderImpl(const LockHolderImpl & other) = delete;
|
||||
LockHolderImpl& operator=(const LockHolderImpl & other) = delete;
|
||||
|
||||
/// Implicit memory allocation for query_id is done here
|
||||
LockHolderImpl(const String & query_id_, Type type)
|
||||
: lock_type{type}, query_id{query_id_},
|
||||
active_client_increment{
|
||||
type == Type::Read ? CurrentMetrics::RWLockActiveReaders : CurrentMetrics::RWLockActiveWriters}
|
||||
{
|
||||
}
|
||||
|
||||
~LockHolderImpl();
|
||||
|
||||
private:
|
||||
/// A separate method which binds the lock holder to the owned lock
|
||||
/// N.B. It is very important that this method produces no allocations
|
||||
bool bind_with(RWLock && parent_, GroupsContainer::iterator it_group_) noexcept
|
||||
{
|
||||
if (bound)
|
||||
return false;
|
||||
it_group = it_group_;
|
||||
parent = std::move(parent_);
|
||||
++it_group->refererrs;
|
||||
bound = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
friend class RWLockImpl;
|
||||
};
|
||||
|
||||
@ -62,29 +86,33 @@ namespace
|
||||
class QueryLockInfo
|
||||
{
|
||||
private:
|
||||
std::mutex mutex;
|
||||
mutable std::mutex mutex;
|
||||
std::map<std::string, size_t> queries;
|
||||
|
||||
public:
|
||||
void add(const String & query_id)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
++queries[query_id];
|
||||
|
||||
const auto res = queries.emplace(query_id, 1); // may throw
|
||||
if (!res.second)
|
||||
++res.first->second;
|
||||
}
|
||||
|
||||
void remove(const String & query_id)
|
||||
void remove(const String & query_id) noexcept
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = queries.find(query_id);
|
||||
assert(it != queries.end());
|
||||
if (--it->second == 0)
|
||||
queries.erase(it);
|
||||
|
||||
const auto query_it = queries.find(query_id);
|
||||
if (query_it != queries.cend() && --query_it->second == 0)
|
||||
queries.erase(query_it);
|
||||
}
|
||||
|
||||
void check(const String & query_id)
|
||||
void check(const String & query_id) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
if (queries.count(query_id))
|
||||
|
||||
if (queries.find(query_id) != queries.cend())
|
||||
throw Exception("Possible deadlock avoided. Client should retry.", ErrorCodes::DEADLOCK_AVOIDED);
|
||||
}
|
||||
};
|
||||
@ -93,8 +121,16 @@ namespace
|
||||
}
|
||||
|
||||
|
||||
/** To guarantee that we do not get any piece of our data corrupted:
|
||||
* 1. Perform all actions that include allocations before changing lock's internal state
|
||||
* 2. Roll back any changes that make the state inconsistent
|
||||
*
|
||||
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
|
||||
*/
|
||||
RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id)
|
||||
{
|
||||
const bool request_has_query_id = query_id != NO_QUERY;
|
||||
|
||||
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
|
||||
CurrentMetrics::Increment waiting_client_increment((type == Read) ? CurrentMetrics::RWLockWaitingReaders
|
||||
: CurrentMetrics::RWLockWaitingWriters);
|
||||
@ -106,29 +142,39 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
|
||||
: ProfileEvents::RWLockWritersWaitMilliseconds, watch.elapsedMilliseconds());
|
||||
};
|
||||
|
||||
GroupsContainer::iterator it_group;
|
||||
ClientsContainer::iterator it_client;
|
||||
|
||||
/// This object is placed above unique_lock, because it may lock in destructor.
|
||||
LockHolder res;
|
||||
auto lock_holder = std::make_shared<LockHolderImpl>(query_id, type);
|
||||
|
||||
std::unique_lock lock(mutex);
|
||||
|
||||
/// Check if the same query is acquiring previously acquired lock
|
||||
if (query_id != RWLockImpl::NO_QUERY)
|
||||
/// The FastPath:
|
||||
/// Check if the same query_id already holds the required lock in which case we can proceed without waiting
|
||||
if (request_has_query_id)
|
||||
{
|
||||
auto it_query = query_id_to_holder.find(query_id);
|
||||
if (it_query != query_id_to_holder.end())
|
||||
res = it_query->second.lock();
|
||||
}
|
||||
const auto it_query = owner_queries.find(query_id);
|
||||
if (it_query != owner_queries.end())
|
||||
{
|
||||
const auto current_owner_group = queue.begin();
|
||||
|
||||
if (res)
|
||||
{
|
||||
/// XXX: it means we can't upgrade lock from read to write - with proper waiting!
|
||||
if (type != Read || res->it_group->type != Read)
|
||||
throw Exception("Attempt to acquire exclusive lock recursively", ErrorCodes::LOGICAL_ERROR);
|
||||
else
|
||||
return res;
|
||||
/// XXX: it means we can't upgrade lock from read to write!
|
||||
if (type == Write)
|
||||
throw Exception(
|
||||
"RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
if (current_owner_group->type == Write)
|
||||
throw Exception(
|
||||
"RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
|
||||
all_read_locks.add(query_id); /// SM1: may throw on insertion (nothing to roll back)
|
||||
++it_query->second; /// SM2: nothrow
|
||||
lock_holder->bind_with(shared_from_this(), current_owner_group); /// SM3: nothrow
|
||||
|
||||
finalize_metrics();
|
||||
return lock_holder;
|
||||
}
|
||||
}
|
||||
|
||||
/** If the query already has any active read lock and tries to acquire another read lock
|
||||
@ -148,86 +194,106 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
|
||||
|
||||
if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)
|
||||
{
|
||||
if (type == Type::Read && !queue.empty() && queue.back().type == Type::Write && query_id != RWLockImpl::NO_QUERY)
|
||||
if (type == Type::Read && request_has_query_id && !queue.empty())
|
||||
all_read_locks.check(query_id);
|
||||
|
||||
/// Create new group of clients
|
||||
it_group = queue.emplace(queue.end(), type);
|
||||
/// Create a new group of locking requests
|
||||
queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Will append myself to last group
|
||||
it_group = std::prev(queue.end());
|
||||
else if (request_has_query_id && queue.size() > 1)
|
||||
all_read_locks.check(query_id);
|
||||
|
||||
if (it_group != queue.begin() && query_id != RWLockImpl::NO_QUERY)
|
||||
all_read_locks.check(query_id);
|
||||
}
|
||||
GroupsContainer::iterator it_group = std::prev(queue.end());
|
||||
|
||||
/// Append myself to the end of chosen group
|
||||
auto & clients = it_group->clients;
|
||||
try
|
||||
{
|
||||
it_client = clients.emplace(clients.end(), type);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Remove group if it was the first client in the group and an error occurred
|
||||
if (clients.empty())
|
||||
queue.erase(it_group);
|
||||
throw;
|
||||
}
|
||||
|
||||
res.reset(new LockHolderImpl(shared_from_this(), it_group, it_client));
|
||||
/// We need to reference the associated group before waiting to guarantee
|
||||
/// that this group does not get deleted prematurely
|
||||
++it_group->refererrs;
|
||||
|
||||
/// Wait a notification until we will be the only in the group.
|
||||
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
|
||||
|
||||
/// Insert myself (weak_ptr to the holder) to queries set to implement recursive lock
|
||||
if (query_id != RWLockImpl::NO_QUERY)
|
||||
{
|
||||
query_id_to_holder.emplace(query_id, res);
|
||||
--it_group->refererrs;
|
||||
|
||||
if (type == Type::Read)
|
||||
all_read_locks.add(query_id);
|
||||
if (request_has_query_id)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (type == Type::Read)
|
||||
all_read_locks.add(query_id); /// SM2: may throw on insertion
|
||||
/// and is safe to roll back unconditionally
|
||||
const auto emplace_res =
|
||||
owner_queries.emplace(query_id, 1); /// SM3: may throw on insertion
|
||||
if (!emplace_res.second)
|
||||
++emplace_res.first->second; /// SM4: nothrow
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
|
||||
/// We only need to roll back the changes to these objects: all_read_locks and the locking queue
|
||||
if (type == Type::Read)
|
||||
all_read_locks.remove(query_id); /// Rollback(SM2): nothrow
|
||||
|
||||
if (it_group->refererrs == 0)
|
||||
{
|
||||
const auto next = queue.erase(it_group); /// Rollback(SM1): nothrow
|
||||
if (next != queue.end())
|
||||
next->cv.notify_all();
|
||||
}
|
||||
|
||||
throw;
|
||||
}
|
||||
}
|
||||
res->query_id = query_id;
|
||||
|
||||
lock_holder->bind_with(shared_from_this(), it_group); /// SM: nothrow
|
||||
|
||||
finalize_metrics();
|
||||
return res;
|
||||
return lock_holder;
|
||||
}
|
||||
|
||||
|
||||
/** The sequence points of acquiring lock's ownership by an instance of LockHolderImpl:
|
||||
* 1. all_read_locks is updated
|
||||
* 2. owner_queries is updated
|
||||
* 3. request group is updated by LockHolderImpl which in turn becomes "bound"
|
||||
*
|
||||
* If by the time when destructor of LockHolderImpl is called the instance has been "bound",
|
||||
* it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
|
||||
* With the mutex locked the order of steps to restore the lock's state can be arbitrary
|
||||
*
|
||||
* We do not employ try-catch: if something bad happens, there is nothing we can do =(
|
||||
*/
|
||||
RWLockImpl::LockHolderImpl::~LockHolderImpl()
|
||||
{
|
||||
if (!bound || parent == nullptr)
|
||||
return;
|
||||
|
||||
std::lock_guard lock(parent->mutex);
|
||||
|
||||
/// Remove weak_ptrs to the holder, since there are no owners of the current lock
|
||||
parent->query_id_to_holder.erase(query_id);
|
||||
/// The associated group must exist (and be the beginning of the queue?)
|
||||
if (parent->queue.empty() || it_group != parent->queue.begin())
|
||||
return;
|
||||
|
||||
if (*it_client == RWLockImpl::Read && query_id != RWLockImpl::NO_QUERY)
|
||||
all_read_locks.remove(query_id);
|
||||
|
||||
/// Removes myself from client list of our group
|
||||
it_group->clients.erase(it_client);
|
||||
|
||||
/// Remove the group if we were the last client and notify the next group
|
||||
if (it_group->clients.empty())
|
||||
/// If query_id is not empty it must be listed in parent->owner_queries
|
||||
if (query_id != RWLockImpl::NO_QUERY)
|
||||
{
|
||||
auto & parent_queue = parent->queue;
|
||||
parent_queue.erase(it_group);
|
||||
const auto owner_it = parent->owner_queries.find(query_id);
|
||||
if (owner_it != parent->owner_queries.end())
|
||||
{
|
||||
if (--owner_it->second == 0) /// SM: nothrow
|
||||
parent->owner_queries.erase(owner_it); /// SM: nothrow
|
||||
|
||||
if (!parent_queue.empty())
|
||||
parent_queue.front().cv.notify_all();
|
||||
if (lock_type == RWLockImpl::Read)
|
||||
all_read_locks.remove(query_id); /// SM: nothrow
|
||||
}
|
||||
}
|
||||
|
||||
/// If we are the last remaining referrer, remove the group and notify the next group
|
||||
if (--it_group->refererrs == 0) /// SM: nothrow
|
||||
{
|
||||
const auto next = parent->queue.erase(it_group); /// SM: nothrow
|
||||
if (next != parent->queue.end())
|
||||
next->cv.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
RWLockImpl::LockHolderImpl::LockHolderImpl(RWLock && parent_, RWLockImpl::GroupsContainer::iterator it_group_,
|
||||
RWLockImpl::ClientsContainer::iterator it_client_)
|
||||
: parent{std::move(parent_)}, it_group{it_group_}, it_client{it_client_},
|
||||
active_client_increment{(*it_client == RWLockImpl::Read) ? CurrentMetrics::RWLockActiveReaders
|
||||
: CurrentMetrics::RWLockActiveWriters}
|
||||
{
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -8,6 +8,7 @@
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -53,25 +54,24 @@ private:
|
||||
|
||||
struct Group;
|
||||
using GroupsContainer = std::list<Group>;
|
||||
using ClientsContainer = std::list<Type>;
|
||||
using QueryIdToHolder = std::map<String, std::weak_ptr<LockHolderImpl>>;
|
||||
using OwnerQueryIds = std::unordered_map<String, size_t>;
|
||||
|
||||
/// Group of clients that should be executed concurrently
|
||||
/// i.e. a group could contain several readers, but only one writer
|
||||
/// Group of locking requests that should be granted concurrently
|
||||
/// i.e. a group can contain several readers, but only one writer
|
||||
struct Group
|
||||
{
|
||||
// FIXME: there is only redundant |type| information inside |clients|.
|
||||
const Type type;
|
||||
ClientsContainer clients;
|
||||
size_t refererrs;
|
||||
|
||||
std::condition_variable cv; /// all clients of the group wait group condvar
|
||||
std::condition_variable cv; /// all locking requests of the group wait on this condvar
|
||||
|
||||
explicit Group(Type type_) : type{type_} {}
|
||||
explicit Group(Type type_) : type{type_}, refererrs{0} {}
|
||||
};
|
||||
|
||||
mutable std::mutex mutex;
|
||||
GroupsContainer queue;
|
||||
QueryIdToHolder query_id_to_holder;
|
||||
OwnerQueryIds owner_queries;
|
||||
|
||||
mutable std::mutex mutex;
|
||||
};
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user