ClickHouse/src/Common/RWLock.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

316 lines
11 KiB
C++
Raw Normal View History

2018-11-27 16:45:45 +00:00
#include "RWLock.h"
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event RWLockAcquiredReadLocks;
extern const Event RWLockAcquiredWriteLocks;
extern const Event RWLockReadersWaitMilliseconds;
extern const Event RWLockWritersWaitMilliseconds;
}
namespace CurrentMetrics
{
extern const Metric RWLockWaitingReaders;
extern const Metric RWLockWaitingWriters;
extern const Metric RWLockActiveReaders;
extern const Metric RWLockActiveWriters;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/** A one-time-use-object that represents lock ownership
2019-09-10 19:58:04 +00:00
* For the purpose of exception safety guarantees LockHolder is to be used in two steps:
* 1. Create an instance (allocating all the needed memory)
2019-09-10 19:58:04 +00:00
* 2. Associate the instance with the lock (attach to the lock and locking request group)
*/
class RWLockImpl::LockHolderImpl
{
bool bound{false};
2019-09-05 18:09:33 +00:00
String query_id;
CurrentMetrics::Increment active_client_increment;
RWLock parent;
GroupsContainer::iterator it_group;
public:
LockHolderImpl(const LockHolderImpl & other) = delete;
LockHolderImpl& operator=(const LockHolderImpl & other) = delete;
/// Implicit memory allocation for query_id is done here
LockHolderImpl(const String & query_id_, Type type)
: query_id{query_id_}
, active_client_increment{
type == Type::Read ? CurrentMetrics::RWLockActiveReaders : CurrentMetrics::RWLockActiveWriters}
{
}
~LockHolderImpl()
{
if (bound && parent != nullptr)
parent->unlock(it_group, query_id);
else
active_client_increment.destroy();
}
private:
2019-09-10 19:58:04 +00:00
/// A separate method which binds the lock holder to the owned lock
/// N.B. It is very important that this method produces no allocations
bool bindWith(RWLock && parent_, GroupsContainer::iterator it_group_) noexcept
{
if (bound || parent_ == nullptr)
return false;
it_group = it_group_;
parent = std::move(parent_);
++it_group->requests;
bound = true;
return true;
}
2018-11-27 16:45:45 +00:00
friend class RWLockImpl;
};
/** General algorithm:
* Step 1. Try the FastPath (for both Reads/Writes)
* Step 2. Find ourselves request group: attach to existing or create a new one
* Step 3. Wait/timed wait for ownership signal
* Step 3a. Check if we must handle timeout and exit
* Step 4. Persist lock ownership
*
* To guarantee that we do not get any piece of our data corrupted:
2019-09-10 19:58:04 +00:00
* 1. Perform all actions that include allocations before changing lock's internal state
* 2. Roll back any changes that make the state inconsistent
*
* Note: "SM" in the commentaries below stands for STATE MODIFICATION
*/
RWLockImpl::LockHolder
RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms)
{
const auto lock_deadline_tp =
(lock_timeout_ms == std::chrono::milliseconds(0))
? std::chrono::time_point<std::chrono::steady_clock>::max()
: std::chrono::steady_clock::now() + lock_timeout_ms;
2019-09-05 12:45:44 +00:00
const bool request_has_query_id = query_id != NO_QUERY;
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
CurrentMetrics::Increment waiting_client_increment((type == Read) ? CurrentMetrics::RWLockWaitingReaders
: CurrentMetrics::RWLockWaitingWriters);
auto finalize_metrics = [type, &watch] ()
{
ProfileEvents::increment((type == Read) ? ProfileEvents::RWLockAcquiredReadLocks
: ProfileEvents::RWLockAcquiredWriteLocks);
ProfileEvents::increment((type == Read) ? ProfileEvents::RWLockReadersWaitMilliseconds
: ProfileEvents::RWLockWritersWaitMilliseconds, watch.elapsedMilliseconds());
};
2019-09-02 00:12:01 +00:00
/// This object is placed above unique_lock, because it may lock in destructor.
2019-09-10 22:21:08 +00:00
auto lock_holder = std::make_shared<LockHolderImpl>(query_id, type);
2019-09-02 00:12:01 +00:00
std::unique_lock state_lock(internal_state_mtx);
/// The FastPath:
/// Check if the same query_id already holds the required lock in which case we can proceed without waiting
2019-09-05 12:45:44 +00:00
if (request_has_query_id)
2019-09-02 01:04:41 +00:00
{
const auto owner_query_it = owner_queries.find(query_id);
if (owner_query_it != owner_queries.end())
2019-09-05 12:20:10 +00:00
{
if (wrlock_owner != writers_queue.end())
2019-09-05 18:09:33 +00:00
throw Exception(
"RWLockImpl::getLock(): RWLock is already locked in exclusive mode",
2019-09-05 18:09:33 +00:00
ErrorCodes::LOGICAL_ERROR);
/// Lock upgrading is not supported
if (type == Write)
2019-09-05 18:09:33 +00:00
throw Exception(
"RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked",
2019-09-05 18:09:33 +00:00
ErrorCodes::LOGICAL_ERROR);
/// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator
++owner_query_it->second; /// SM1: nothrow
lock_holder->bindWith(shared_from_this(), rdlock_owner); /// SM2: nothrow
2019-09-05 18:09:33 +00:00
finalize_metrics();
return lock_holder;
2019-09-05 12:20:10 +00:00
}
}
if (type == Type::Write)
{
writers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
else if (readers_queue.empty() ||
(rdlock_owner == readers_queue.begin() && readers_queue.size() == 1 && !writers_queue.empty()))
{
readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
GroupsContainer::iterator it_group =
(type == Type::Write) ? std::prev(writers_queue.end()) : std::prev(readers_queue.end());
2019-09-06 15:13:22 +00:00
/// Lock is free to acquire
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
{
(type == Read ? rdlock_owner : wrlock_owner) = it_group; /// SM2: nothrow
}
else
{
/// Wait until our group becomes the lock owner
const auto predicate = [&] () { return it_group == (type == Read ? rdlock_owner : wrlock_owner); };
if (lock_deadline_tp == std::chrono::time_point<std::chrono::steady_clock>::max())
{
++it_group->requests;
it_group->cv.wait(state_lock, predicate);
--it_group->requests;
}
else
{
++it_group->requests;
const auto wait_result = it_group->cv.wait_until(state_lock, lock_deadline_tp, predicate);
--it_group->requests;
/// Step 3a. Check if we must handle timeout and exit
if (!wait_result) /// Wait timed out!
{
/// Rollback(SM1): nothrow
if (it_group->requests == 0)
{
/// When WRITE lock fails, we need to notify next read that is waiting,
/// to avoid handing request, hence next=true.
dropOwnerGroupAndPassOwnership(it_group, /* next= */ true);
}
return nullptr;
}
}
}
2019-09-05 12:45:44 +00:00
if (request_has_query_id)
2019-09-01 01:32:44 +00:00
{
try
{
const auto emplace_res =
owner_queries.emplace(query_id, 1); /// SM2: may throw on insertion
if (!emplace_res.second)
++emplace_res.first->second; /// SM3: nothrow
}
catch (...)
{
/// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
/// We only need to roll back the changes to these objects: owner_queries and the readers/writers queue
if (it_group->requests == 0)
dropOwnerGroupAndPassOwnership(it_group, /* next= */ false); /// Rollback(SM1): nothrow
throw;
}
2019-09-01 01:32:44 +00:00
}
lock_holder->bindWith(shared_from_this(), it_group); /// SM: nothrow
finalize_metrics();
return lock_holder;
}
/** The sequence points of acquiring lock ownership by an instance of LockHolderImpl:
* 1. owner_queries is updated
* 2. request group is updated by LockHolderImpl which in turn becomes "bound"
2019-09-10 19:58:04 +00:00
*
* If by the time when destructor of LockHolderImpl is called the instance has been "bound",
* it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
* With the mutex locked the order of steps to restore the lock's state can be arbitrary
*
* We do not employ try-catch: if something bad happens, there is nothing we can do =(
*/
2020-04-07 09:23:08 +00:00
void RWLockImpl::unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept
{
std::lock_guard state_lock(internal_state_mtx);
2020-08-08 00:47:03 +00:00
/// All of these are Undefined behavior and nothing we can do!
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
return;
2020-04-07 09:23:08 +00:00
if (rdlock_owner != readers_queue.end() && group_it != rdlock_owner)
return;
2020-04-07 09:23:08 +00:00
if (wrlock_owner != writers_queue.end() && group_it != wrlock_owner)
return;
/// If query_id is not empty it must be listed in parent->owner_queries
if (query_id != NO_QUERY)
{
const auto owner_query_it = owner_queries.find(query_id);
if (owner_query_it != owner_queries.end())
{
if (--owner_query_it->second == 0) /// SM: nothrow
owner_queries.erase(owner_query_it); /// SM: nothrow
}
}
/// If we are the last remaining referrer, remove this QNode and notify the next one
2020-04-07 09:23:08 +00:00
if (--group_it->requests == 0) /// SM: nothrow
dropOwnerGroupAndPassOwnership(group_it, /* next= */ false);
}
void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it, bool next) noexcept
{
rdlock_owner = readers_queue.end();
wrlock_owner = writers_queue.end();
if (group_it->type == Read)
{
readers_queue.erase(group_it);
/// Prepare next phase
if (!writers_queue.empty())
{
wrlock_owner = writers_queue.begin();
}
else
{
rdlock_owner = readers_queue.begin();
}
}
else
{
writers_queue.erase(group_it);
/// Prepare next phase
if (!readers_queue.empty())
{
if (next && readers_queue.size() > 1)
{
rdlock_owner = std::next(readers_queue.begin());
}
else
{
rdlock_owner = readers_queue.begin();
}
}
else
{
wrlock_owner = writers_queue.begin();
}
}
2019-09-01 01:32:44 +00:00
if (rdlock_owner != readers_queue.end())
{
rdlock_owner->cv.notify_all();
}
else if (wrlock_owner != writers_queue.end())
{
wrlock_owner->cv.notify_one();
}
}
}