diff --git a/src/Common/RWLock.cpp b/src/Common/RWLock.cpp index 5dfc1b55c63..a282c1c6a91 100644 --- a/src/Common/RWLock.cpp +++ b/src/Common/RWLock.cpp @@ -29,19 +29,17 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; - extern const int DEADLOCK_AVOIDED; } -/** A single-use object that represents lock's ownership +/** A one-time-use-object that represents lock ownership * For the purpose of exception safety guarantees LockHolder is to be used in two steps: - * 1. Create an instance (allocating all the memory needed) + * 1. Create an instance (allocating all the needed memory) * 2. Associate the instance with the lock (attach to the lock and locking request group) */ class RWLockImpl::LockHolderImpl { bool bound{false}; - Type lock_type; String query_id; CurrentMetrics::Increment active_client_increment; RWLock parent; @@ -53,24 +51,30 @@ public: /// Implicit memory allocation for query_id is done here LockHolderImpl(const String & query_id_, Type type) - : lock_type{type}, query_id{query_id_}, - active_client_increment{ + : query_id{query_id_} + , active_client_increment{ type == Type::Read ? CurrentMetrics::RWLockActiveReaders : CurrentMetrics::RWLockActiveWriters} { } - ~LockHolderImpl(); + ~LockHolderImpl() + { + if (bound && parent != nullptr) + parent->unlock(it_group, query_id); + else + active_client_increment.destroy(); + } private: /// A separate method which binds the lock holder to the owned lock /// N.B. It is very important that this method produces no allocations bool bindWith(RWLock && parent_, GroupsContainer::iterator it_group_) noexcept { - if (bound) + if (bound || parent_ == nullptr) return false; it_group = it_group_; parent = std::move(parent_); - ++it_group->refererrs; + ++it_group->requests; bound = true; return true; } @@ -79,56 +83,27 @@ private: }; -namespace -{ - /// Global information about all read locks that query has. It is needed to avoid some type of deadlocks. - - class QueryLockInfo - { - private: - mutable std::mutex mutex; - std::map queries; - - public: - void add(const String & query_id) - { - std::lock_guard lock(mutex); - - const auto res = queries.emplace(query_id, 1); // may throw - if (!res.second) - ++res.first->second; - } - - void remove(const String & query_id) noexcept - { - std::lock_guard lock(mutex); - - const auto query_it = queries.find(query_id); - if (query_it != queries.cend() && --query_it->second == 0) - queries.erase(query_it); - } - - void check(const String & query_id) const - { - std::lock_guard lock(mutex); - - if (queries.find(query_id) != queries.cend()) - throw Exception("Possible deadlock avoided. Client should retry.", ErrorCodes::DEADLOCK_AVOIDED); - } - }; - - QueryLockInfo all_read_locks; -} - - -/** To guarantee that we do not get any piece of our data corrupted: +/** General algorithm: + * Step 1. Try the FastPath (for both Reads/Writes) + * Step 2. Find ourselves request group: attach to existing or create a new one + * Step 3. Wait/timed wait for ownership signal + * Step 3a. Check if we must handle timeout and exit + * Step 4. Persist lock ownership + * + * To guarantee that we do not get any piece of our data corrupted: * 1. Perform all actions that include allocations before changing lock's internal state * 2. Roll back any changes that make the state inconsistent * * Note: "SM" in the commentaries below stands for STATE MODIFICATION */ -RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id) +RWLockImpl::LockHolder +RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::chrono::milliseconds & lock_timeout_ms) { + const auto lock_deadline_tp = + (lock_timeout_ms == std::chrono::milliseconds(0)) + ? std::chrono::time_point::max() + : std::chrono::steady_clock::now() + lock_timeout_ms; + const bool request_has_query_id = query_id != NO_QUERY; Stopwatch watch(CLOCK_MONOTONIC_COARSE); @@ -145,100 +120,111 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & /// This object is placed above unique_lock, because it may lock in destructor. auto lock_holder = std::make_shared(query_id, type); - std::unique_lock lock(mutex); + std::unique_lock state_lock(internal_state_mtx); /// The FastPath: /// Check if the same query_id already holds the required lock in which case we can proceed without waiting if (request_has_query_id) { - const auto it_query = owner_queries.find(query_id); - if (it_query != owner_queries.end()) + const auto owner_query_it = owner_queries.find(query_id); + if (owner_query_it != owner_queries.end()) { - const auto current_owner_group = queue.begin(); + if (wrlock_owner != writers_queue.end()) + throw Exception( + "RWLockImpl::getLock(): RWLock is already locked in exclusive mode", + ErrorCodes::LOGICAL_ERROR); - /// XXX: it means we can't upgrade lock from read to write! + /// Lock upgrading is not supported if (type == Write) throw Exception( "RWLockImpl::getLock(): Cannot acquire exclusive lock while RWLock is already locked", ErrorCodes::LOGICAL_ERROR); - if (current_owner_group->type == Write) - throw Exception( - "RWLockImpl::getLock(): RWLock is already locked in exclusive mode", - ErrorCodes::LOGICAL_ERROR); - /// N.B. Type is Read here, query_id is not empty and it_query is a valid iterator - all_read_locks.add(query_id); /// SM1: may throw on insertion (nothing to roll back) - ++it_query->second; /// SM2: nothrow - lock_holder->bindWith(shared_from_this(), current_owner_group); /// SM3: nothrow + ++owner_query_it->second; /// SM1: nothrow + lock_holder->bindWith(shared_from_this(), rdlock_owner); /// SM2: nothrow finalize_metrics(); return lock_holder; } } - /** If the query already has any active read lock and tries to acquire another read lock - * but it is not in front of the queue and has to wait, deadlock is possible: - * - * Example (four queries, two RWLocks - 'a' and 'b'): - * - * --> time --> - * - * q1: ra rb - * q2: wa - * q3: rb ra - * q4: wb - * - * We will throw an exception instead. - */ - - if (type == Type::Write || queue.empty() || queue.back().type == Type::Write) + if (type == Type::Write) { - if (type == Type::Read && request_has_query_id && !queue.empty()) - all_read_locks.check(query_id); - - /// Create a new group of locking requests - queue.emplace_back(type); /// SM1: may throw (nothing to roll back) + writers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back) } - else if (request_has_query_id && queue.size() > 1) - all_read_locks.check(query_id); + else if (readers_queue.empty() || + (rdlock_owner == readers_queue.begin() && !writers_queue.empty())) + { + readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back) + } + GroupsContainer::iterator it_group = + (type == Type::Write) ? std::prev(writers_queue.end()) : std::prev(readers_queue.end()); - GroupsContainer::iterator it_group = std::prev(queue.end()); + if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end()) + { + if (type == Type::Read) + { + rdlock_owner = it_group; /// SM2: nothrow + } + else + { + wrlock_owner = it_group; /// SM2: nothrow + } + } + else + { + /// Wait until our group becomes the lock owner + const auto predicate = [&] () { return it_group == (type == Read ? rdlock_owner : wrlock_owner); }; - /// We need to reference the associated group before waiting to guarantee - /// that this group does not get deleted prematurely - ++it_group->refererrs; + if (lock_deadline_tp == std::chrono::time_point::max()) + { + ++it_group->requests; + it_group->cv.wait(state_lock, predicate); + --it_group->requests; + } + else + { + ++it_group->requests; + const auto wait_result = it_group->cv.wait_until(state_lock, lock_deadline_tp, predicate); + --it_group->requests; - /// Wait a notification until we will be the only in the group. - it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); }); + /// Step 3a. Check if we must handle timeout and exit + if (!wait_result) /// Wait timed out! + { + if (it_group->requests == 0) + { + /// Roll back SM1 + if (type == Read) + { + readers_queue.erase(it_group); /// Rollback(SM1): nothrow + } + else + { + writers_queue.erase(it_group); /// Rollback(SM1): nothrow + } + } - --it_group->refererrs; + return nullptr; + } + } + } if (request_has_query_id) { try { - if (type == Type::Read) - all_read_locks.add(query_id); /// SM2: may throw on insertion - /// and is safe to roll back unconditionally const auto emplace_res = - owner_queries.emplace(query_id, 1); /// SM3: may throw on insertion + owner_queries.emplace(query_id, 1); /// SM2: may throw on insertion if (!emplace_res.second) - ++emplace_res.first->second; /// SM4: nothrow + ++emplace_res.first->second; /// SM3: nothrow } catch (...) { /// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety - /// We only need to roll back the changes to these objects: all_read_locks and the locking queue - if (type == Type::Read) - all_read_locks.remove(query_id); /// Rollback(SM2): nothrow - - if (it_group->refererrs == 0) - { - const auto next = queue.erase(it_group); /// Rollback(SM1): nothrow - if (next != queue.end()) - next->cv.notify_all(); - } + /// We only need to roll back the changes to these objects: owner_queries and the readers/writers queue + if (it_group->requests == 0) + erase_group(it_group); /// Rollback(SM1): nothrow throw; } @@ -251,10 +237,9 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & } -/** The sequence points of acquiring lock's ownership by an instance of LockHolderImpl: - * 1. all_read_locks is updated - * 2. owner_queries is updated - * 3. request group is updated by LockHolderImpl which in turn becomes "bound" +/** The sequence points of acquiring lock ownership by an instance of LockHolderImpl: + * 1. owner_queries is updated + * 2. request group is updated by LockHolderImpl which in turn becomes "bound" * * If by the time when destructor of LockHolderImpl is called the instance has been "bound", * it is guaranteed that all three steps have been executed successfully and the resulting state is consistent. @@ -262,38 +247,74 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String & * * We do not employ try-catch: if something bad happens, there is nothing we can do =( */ -RWLockImpl::LockHolderImpl::~LockHolderImpl() +void RWLockImpl::unlock(GroupsContainer::iterator owner_group, const String & query_id) noexcept { - if (!bound || parent == nullptr) + std::lock_guard state_lock(internal_state_mtx); + + /// All of theses are Undefined behavior and nothing we can do! + if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end()) return; - - std::lock_guard lock(parent->mutex); - - /// The associated group must exist (and be the beginning of the queue?) - if (parent->queue.empty() || it_group != parent->queue.begin()) + if (rdlock_owner != readers_queue.end() && owner_group != rdlock_owner) + return; + if (wrlock_owner != writers_queue.end() && owner_group != wrlock_owner) return; /// If query_id is not empty it must be listed in parent->owner_queries - if (query_id != RWLockImpl::NO_QUERY) + if (query_id != NO_QUERY) { - const auto owner_it = parent->owner_queries.find(query_id); - if (owner_it != parent->owner_queries.end()) + const auto owner_query_it = owner_queries.find(query_id); + if (owner_query_it != owner_queries.end()) { - if (--owner_it->second == 0) /// SM: nothrow - parent->owner_queries.erase(owner_it); /// SM: nothrow - - if (lock_type == RWLockImpl::Read) - all_read_locks.remove(query_id); /// SM: nothrow + if (--owner_query_it->second == 0) /// SM: nothrow + owner_queries.erase(owner_query_it); /// SM: nothrow } } - /// If we are the last remaining referrer, remove the group and notify the next group - if (--it_group->refererrs == 0) /// SM: nothrow - { - const auto next = parent->queue.erase(it_group); /// SM: nothrow - if (next != parent->queue.end()) - next->cv.notify_all(); - } + /// If we are the last remaining referrer, remove this QNode and notify the next one + if (--owner_group->requests == 0) /// SM: nothrow + erase_group(owner_group); } + +void RWLockImpl::erase_group(GroupsContainer::iterator group_it) noexcept +{ + rdlock_owner = readers_queue.end(); + wrlock_owner = writers_queue.end(); + + if (group_it->type == Read) + { + readers_queue.erase(group_it); + /// Prepare next phase + if (!writers_queue.empty()) + { + wrlock_owner = writers_queue.begin(); + } + else + { + rdlock_owner = readers_queue.begin(); + } + } + else + { + writers_queue.erase(group_it); + /// Prepare next phase + if (!readers_queue.empty()) + { + rdlock_owner = readers_queue.begin(); + } + else + { + wrlock_owner = writers_queue.begin(); + } + } + + if (rdlock_owner != readers_queue.end()) + { + rdlock_owner->cv.notify_all(); + } + else if (wrlock_owner != writers_queue.end()) + { + wrlock_owner->cv.notify_one(); + } +} } diff --git a/src/Common/RWLock.h b/src/Common/RWLock.h index a7084720d6c..81b8551060a 100644 --- a/src/Common/RWLock.h +++ b/src/Common/RWLock.h @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -19,7 +20,8 @@ using RWLock = std::shared_ptr; /// Implements shared lock with FIFO service -/// Can be acquired recursively (several calls for the same query) in Read mode +/// (Phase Fair RWLock as suggested in https://www.cs.unc.edu/~anderson/papers/rtsj10-for-web.pdf) +/// Can be acquired recursively (for the same query) in Read mode /// /// NOTE: it is important to allow acquiring the same lock in Read mode without waiting if it is already /// acquired by another thread of the same query. Otherwise the following deadlock is possible: @@ -42,37 +44,44 @@ public: friend class LockHolderImpl; using LockHolder = std::shared_ptr; - /// Waits in the queue and returns appropriate lock - /// Empty query_id means the lock is acquired out of the query context (e.g. in a background thread). - LockHolder getLock(Type type, const String & query_id); + /// Empty query_id means the lock is acquired from outside of query context (e.g. in a background thread). + LockHolder getLock(Type type, const String & query_id, + const std::chrono::milliseconds & lock_timeout_ms = std::chrono::milliseconds(0)); /// Use as query_id to acquire a lock outside the query context. inline static const String NO_QUERY = String(); + inline static const auto default_locking_timeout = std::chrono::milliseconds(120000); private: - RWLockImpl() = default; - - struct Group; - using GroupsContainer = std::list; - using OwnerQueryIds = std::unordered_map; - - /// Group of locking requests that should be granted concurrently - /// i.e. a group can contain several readers, but only one writer + /// Group of locking requests that should be granted simultaneously + /// i.e. one or several readers or a single writer struct Group { const Type type; - size_t refererrs; + size_t requests; std::condition_variable cv; /// all locking requests of the group wait on this condvar - explicit Group(Type type_) : type{type_}, refererrs{0} {} + explicit Group(Type type_) : type{type_}, requests{0} {} }; - GroupsContainer queue; + using GroupsContainer = std::list; + using OwnerQueryIds = std::unordered_map; + +private: + mutable std::mutex internal_state_mtx; + + GroupsContainer readers_queue; + GroupsContainer writers_queue; + GroupsContainer::iterator rdlock_owner{readers_queue.end()}; /// equals to readers_queue.begin() in read phase + /// or readers_queue.end() otherwise + GroupsContainer::iterator wrlock_owner{writers_queue.end()}; /// equals to writers_queue.begin() in write phase + /// or writers_queue.end() otherwise OwnerQueryIds owner_queries; - mutable std::mutex mutex; +private: + RWLockImpl() = default; + void unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept; + void erase_group(GroupsContainer::iterator group_it) noexcept; }; - - }