Reworked RWLockImpl::getLock() + phase-fairness

Timeout param added to getLock() method
This commit is contained in:
Alexander Kazakov 2020-04-07 02:44:45 +03:00
parent 2951ed4f1d
commit b98bc9afef
2 changed files with 185 additions and 155 deletions

View File

@ -29,19 +29,17 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int DEADLOCK_AVOIDED;
}
/** A single-use object that represents lock's ownership
/** A one-time-use-object that represents lock ownership
* For the purpose of exception safety guarantees LockHolder is to be used in two steps:
* 1. Create an instance (allocating all the memory needed)
* 1. Create an instance (allocating all the needed memory)
* 2. Associate the instance with the lock (attach to the lock and locking request group)
*/
class RWLockImpl::LockHolderImpl
{
bool bound{false};
Type lock_type;
String query_id;
CurrentMetrics::Increment active_client_increment;
RWLock parent;
@ -53,24 +51,30 @@ public:
/// Implicit memory allocation for query_id is done here
LockHolderImpl(const String & query_id_, Type type)
: lock_type{type}, query_id{query_id_},
active_client_increment{
: query_id{query_id_}
, active_client_increment{
type == Type::Read ? CurrentMetrics::RWLockActiveReaders : CurrentMetrics::RWLockActiveWriters}
{
}
~LockHolderImpl();
~LockHolderImpl()
{
if (bound && parent != nullptr)
parent->unlock(it_group, query_id);
else
active_client_increment.destroy();
}
private:
/// A separate method which binds the lock holder to the owned lock
/// N.B. It is very important that this method produces no allocations
bool bindWith(RWLock && parent_, GroupsContainer::iterator it_group_) noexcept
{
if (bound)
if (bound || parent_ == nullptr)
return false;
it_group = it_group_;
parent = std::move(parent_);
++it_group->refererrs;
++it_group->requests;
bound = true;
return true;
}
@ -79,56 +83,27 @@ private:
};
namespace
{
/// Global information about all read locks that query has. It is needed to avoid some type of deadlocks.
class QueryLockInfo
{
private:
mutable std::mutex mutex;
std::map<std::string, size_t> queries;
public:
void add(const String & query_id)
{
std::lock_guard lock(mutex);
const auto res = queries.emplace(query_id, 1); // may throw
if (!res.second)
++res.first->second;
}
void remove(const String & query_id) noexcept
{
std::lock_guard lock(mutex);
const auto query_it = queries.find(query_id);
if (query_it != queries.cend() && --query_it->second == 0)
queries.erase(query_it);
}
void check(const String & query_id) const
{
std::lock_guard lock(mutex);
if (queries.find(query_id) != queries.cend())
throw Exception("Possible deadlock avoided. Client should retry.", ErrorCodes::DEADLOCK_AVOIDED);
}
};
QueryLockInfo all_read_locks;
}
/** To guarantee that we do not get any piece of our data corrupted:
/** General algorithm:
* Step 1. Try the FastPath (for both Reads/Writes)
* Step 2. Find ourselves request group: attach to existing or create a new one
* Step 3. Wait/timed wait for ownership signal
* Step 3a. Check if we must handle timeout and exit
* Step 4. Persist lock ownership
*
* To guarantee that we do not get any piece of our data corrupted:
* 1. Perform all actions that include allocations before changing lock's internal state
* 2. Roll back any changes that make the state inconsistent
*
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
*/
RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id)
RWLockImpl::LockHolder
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms)
{
const auto lock_deadline_tp =
(lock_timeout_ms == std::chrono::milliseconds(0))
? std::chrono::time_point<std::chrono::steady_clock>::max()
: std::chrono::steady_clock::now() + lock_timeout_ms;
const bool request_has_query_id = query_id != NO_QUERY;
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
@ -145,100 +120,111 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
/// This object is placed above unique_lock, because it may lock in destructor.
auto lock_holder = std::make_shared<LockHolderImpl>(query_id, type);
std::unique_lock lock(mutex);
std::unique_lock state_lock(internal_state_mtx);
/// The FastPath:
/// Check if the same query_id already holds the required lock in which case we can proceed without waiting
if (request_has_query_id)
{
const auto it_query = owner_queries.find(query_id);
if (it_query != owner_queries.end())
const auto owner_query_it = owner_queries.find(query_id);
if (owner_query_it != owner_queries.end())
{
const auto current_owner_group = queue.begin();
if (wrlock_owner != writers_queue.end())
throw Exception(
"RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
ErrorCodes::LOGICAL_ERROR);
/// XXX: it means we can't upgrade lock from read to write!
/// Lock upgrading is not supported
if (type == Write)
throw Exception(
"RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked",
ErrorCodes::LOGICAL_ERROR);
if (current_owner_group->type == Write)
throw Exception(
"RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
ErrorCodes::LOGICAL_ERROR);
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
all_read_locks.add(query_id); /// SM1: may throw on insertion (nothing to roll back)
++it_query->second; /// SM2: nothrow
lock_holder->bindWith(shared_from_this(), current_owner_group); /// SM3: nothrow
++owner_query_it->second; /// SM1: nothrow
lock_holder->bindWith(shared_from_this(), rdlock_owner); /// SM2: nothrow
finalize_metrics();
return lock_holder;
}
}
/** If the query already has any active read lock and tries to acquire another read lock
* but it is not in front of the queue and has to wait, deadlock is possible:
*
* Example (four queries, two RWLocks - 'a' and 'b'):
*
* --> time -->
*
* q1: ra rb
* q2: wa
* q3: rb ra
* q4: wb
*
* We will throw an exception instead.
*/
if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)
if (type == Type::Write)
{
if (type == Type::Read && request_has_query_id && !queue.empty())
all_read_locks.check(query_id);
/// Create a new group of locking requests
queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
writers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
else if (request_has_query_id && queue.size() > 1)
all_read_locks.check(query_id);
else if (readers_queue.empty() ||
(rdlock_owner == readers_queue.begin() && !writers_queue.empty()))
{
readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
GroupsContainer::iterator it_group =
(type == Type::Write) ? std::prev(writers_queue.end()) : std::prev(readers_queue.end());
GroupsContainer::iterator it_group = std::prev(queue.end());
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
{
if (type == Type::Read)
{
rdlock_owner = it_group; /// SM2: nothrow
}
else
{
wrlock_owner = it_group; /// SM2: nothrow
}
}
else
{
/// Wait until our group becomes the lock owner
const auto predicate = [&] () { return it_group == (type == Read ? rdlock_owner : wrlock_owner); };
/// We need to reference the associated group before waiting to guarantee
/// that this group does not get deleted prematurely
++it_group->refererrs;
if (lock_deadline_tp == std::chrono::time_point<std::chrono::steady_clock>::max())
{
++it_group->requests;
it_group->cv.wait(state_lock, predicate);
--it_group->requests;
}
else
{
++it_group->requests;
const auto wait_result = it_group->cv.wait_until(state_lock, lock_deadline_tp, predicate);
--it_group->requests;
/// Wait a notification until we will be the only in the group.
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
/// Step 3a. Check if we must handle timeout and exit
if (!wait_result) /// Wait timed out!
{
if (it_group->requests == 0)
{
/// Roll back SM1
if (type == Read)
{
readers_queue.erase(it_group); /// Rollback(SM1): nothrow
}
else
{
writers_queue.erase(it_group); /// Rollback(SM1): nothrow
}
}
--it_group->refererrs;
return nullptr;
}
}
}
if (request_has_query_id)
{
try
{
if (type == Type::Read)
all_read_locks.add(query_id); /// SM2: may throw on insertion
/// and is safe to roll back unconditionally
const auto emplace_res =
owner_queries.emplace(query_id, 1); /// SM3: may throw on insertion
owner_queries.emplace(query_id, 1); /// SM2: may throw on insertion
if (!emplace_res.second)
++emplace_res.first->second; /// SM4: nothrow
++emplace_res.first->second; /// SM3: nothrow
}
catch (...)
{
/// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
/// We only need to roll back the changes to these objects: all_read_locks and the locking queue
if (type == Type::Read)
all_read_locks.remove(query_id); /// Rollback(SM2): nothrow
if (it_group->refererrs == 0)
{
const auto next = queue.erase(it_group); /// Rollback(SM1): nothrow
if (next != queue.end())
next->cv.notify_all();
}
/// We only need to roll back the changes to these objects: owner_queries and the readers/writers queue
if (it_group->requests == 0)
erase_group(it_group); /// Rollback(SM1): nothrow
throw;
}
@ -251,10 +237,9 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
}
/** The sequence points of acquiring lock's ownership by an instance of LockHolderImpl:
* 1. all_read_locks is updated
* 2. owner_queries is updated
* 3. request group is updated by LockHolderImpl which in turn becomes "bound"
/** The sequence points of acquiring lock ownership by an instance of LockHolderImpl:
* 1. owner_queries is updated
* 2. request group is updated by LockHolderImpl which in turn becomes "bound"
*
* If by the time when destructor of LockHolderImpl is called the instance has been "bound",
* it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
@ -262,38 +247,74 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
*
* We do not employ try-catch: if something bad happens, there is nothing we can do =(
*/
RWLockImpl::LockHolderImpl::~LockHolderImpl()
void RWLockImpl::unlock(GroupsContainer::iterator owner_group, const String & query_id) noexcept
{
if (!bound || parent == nullptr)
std::lock_guard state_lock(internal_state_mtx);
/// All of theses are Undefined behavior and nothing we can do!
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
return;
std::lock_guard lock(parent->mutex);
/// The associated group must exist (and be the beginning of the queue?)
if (parent->queue.empty() || it_group != parent->queue.begin())
if (rdlock_owner != readers_queue.end() && owner_group != rdlock_owner)
return;
if (wrlock_owner != writers_queue.end() && owner_group != wrlock_owner)
return;
/// If query_id is not empty it must be listed in parent->owner_queries
if (query_id != RWLockImpl::NO_QUERY)
if (query_id != NO_QUERY)
{
const auto owner_it = parent->owner_queries.find(query_id);
if (owner_it != parent->owner_queries.end())
const auto owner_query_it = owner_queries.find(query_id);
if (owner_query_it != owner_queries.end())
{
if (--owner_it->second == 0) /// SM: nothrow
parent->owner_queries.erase(owner_it); /// SM: nothrow
if (lock_type == RWLockImpl::Read)
all_read_locks.remove(query_id); /// SM: nothrow
if (--owner_query_it->second == 0) /// SM: nothrow
owner_queries.erase(owner_query_it); /// SM: nothrow
}
}
/// If we are the last remaining referrer, remove the group and notify the next group
if (--it_group->refererrs == 0) /// SM: nothrow
{
const auto next = parent->queue.erase(it_group); /// SM: nothrow
if (next != parent->queue.end())
next->cv.notify_all();
}
/// If we are the last remaining referrer, remove this QNode and notify the next one
if (--owner_group->requests == 0) /// SM: nothrow
erase_group(owner_group);
}
void RWLockImpl::erase_group(GroupsContainer::iterator group_it) noexcept
{
rdlock_owner = readers_queue.end();
wrlock_owner = writers_queue.end();
if (group_it->type == Read)
{
readers_queue.erase(group_it);
/// Prepare next phase
if (!writers_queue.empty())
{
wrlock_owner = writers_queue.begin();
}
else
{
rdlock_owner = readers_queue.begin();
}
}
else
{
writers_queue.erase(group_it);
/// Prepare next phase
if (!readers_queue.empty())
{
rdlock_owner = readers_queue.begin();
}
else
{
wrlock_owner = writers_queue.begin();
}
}
if (rdlock_owner != readers_queue.end())
{
rdlock_owner->cv.notify_all();
}
else if (wrlock_owner != writers_queue.end())
{
wrlock_owner->cv.notify_one();
}
}
}

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <chrono>
#include <list>
#include <vector>
#include <mutex>
@ -19,7 +20,8 @@ using RWLock = std::shared_ptr<RWLockImpl>;
/// Implements shared lock with FIFO service
/// Can be acquired recursively (several calls for the same query) in Read mode
/// (Phase Fair RWLock as suggested in https://www.cs.unc.edu/~anderson/papers/rtsj10-for-web.pdf)
/// Can be acquired recursively (for the same query) in Read mode
///
/// NOTE: it is important to allow acquiring the same lock in Read mode without waiting if it is already
/// acquired by another thread of the same query. Otherwise the following deadlock is possible:
@ -42,37 +44,44 @@ public:
friend class LockHolderImpl;
using LockHolder = std::shared_ptr<LockHolderImpl>;
/// Waits in the queue and returns appropriate lock
/// Empty query_id means the lock is acquired out of the query context (e.g. in a background thread).
LockHolder getLock(Type type, const String & query_id);
/// Empty query_id means the lock is acquired from outside of query context (e.g. in a background thread).
LockHolder getLock(Type type, const String & query_id,
const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0));
/// Use as query_id to acquire a lock outside the query context.
inline static const String NO_QUERY = String();
inline static const auto default_locking_timeout = std::chrono::milliseconds(120000);
private:
RWLockImpl() = default;
struct Group;
using GroupsContainer = std::list<Group>;
using OwnerQueryIds = std::unordered_map<String, size_t>;
/// Group of locking requests that should be granted concurrently
/// i.e. a group can contain several readers, but only one writer
/// Group of locking requests that should be granted simultaneously
/// i.e. one or several readers or a single writer
struct Group
{
const Type type;
size_t refererrs;
size_t requests;
std::condition_variable cv; /// all locking requests of the group wait on this condvar
explicit Group(Type type_) : type{type_}, refererrs{0} {}
explicit Group(Type type_) : type{type_}, requests{0} {}
};
GroupsContainer queue;
using GroupsContainer = std::list<Group>;
using OwnerQueryIds = std::unordered_map<String, size_t>;
private:
mutable std::mutex internal_state_mtx;
GroupsContainer readers_queue;
GroupsContainer writers_queue;
GroupsContainer::iterator rdlock_owner{readers_queue.end()}; /// equals to readers_queue.begin() in read phase
/// or readers_queue.end() otherwise
GroupsContainer::iterator wrlock_owner{writers_queue.end()}; /// equals to writers_queue.begin() in write phase
/// or writers_queue.end() otherwise
OwnerQueryIds owner_queries;
mutable std::mutex mutex;
private:
RWLockImpl() = default;
void unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept;
void erase_group(GroupsContainer::iterator group_it) noexcept;
};
}