mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
Merge pull request #6771 from yandex/remove-recursive-rwlock-by-thread
Remove recursive rwlock by thread
This commit is contained in:
commit
e2dbff73f6
@ -37,7 +37,6 @@ class RWLockImpl::LockHolderImpl
|
|||||||
RWLock parent;
|
RWLock parent;
|
||||||
GroupsContainer::iterator it_group;
|
GroupsContainer::iterator it_group;
|
||||||
ClientsContainer::iterator it_client;
|
ClientsContainer::iterator it_client;
|
||||||
ThreadToHolder::key_type thread_id;
|
|
||||||
QueryIdToHolder::key_type query_id;
|
QueryIdToHolder::key_type query_id;
|
||||||
CurrentMetrics::Increment active_client_increment;
|
CurrentMetrics::Increment active_client_increment;
|
||||||
|
|
||||||
@ -74,17 +73,12 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
|
|||||||
/// Check if the same query is acquiring previously acquired lock
|
/// Check if the same query is acquiring previously acquired lock
|
||||||
LockHolder existing_holder_ptr;
|
LockHolder existing_holder_ptr;
|
||||||
|
|
||||||
auto this_thread_id = std::this_thread::get_id();
|
|
||||||
auto it_thread = thread_to_holder.find(this_thread_id);
|
|
||||||
|
|
||||||
auto it_query = query_id_to_holder.end();
|
|
||||||
if (query_id != RWLockImpl::NO_QUERY)
|
if (query_id != RWLockImpl::NO_QUERY)
|
||||||
it_query = query_id_to_holder.find(query_id);
|
{
|
||||||
|
auto it_query = query_id_to_holder.find(query_id);
|
||||||
if (it_thread != thread_to_holder.end())
|
if (it_query != query_id_to_holder.end())
|
||||||
existing_holder_ptr = it_thread->second.lock();
|
|
||||||
else if (it_query != query_id_to_holder.end())
|
|
||||||
existing_holder_ptr = it_query->second.lock();
|
existing_holder_ptr = it_query->second.lock();
|
||||||
|
}
|
||||||
|
|
||||||
if (existing_holder_ptr)
|
if (existing_holder_ptr)
|
||||||
{
|
{
|
||||||
@ -125,10 +119,7 @@ RWLockImpl::LockHolder RWLockImpl::getLock(RWLockImpl::Type type, const String &
|
|||||||
/// Wait a notification until we will be the only in the group.
|
/// Wait a notification until we will be the only in the group.
|
||||||
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
|
it_group->cv.wait(lock, [&] () { return it_group == queue.begin(); });
|
||||||
|
|
||||||
/// Insert myself (weak_ptr to the holder) to threads set to implement recursive lock
|
/// Insert myself (weak_ptr to the holder) to queries set to implement recursive lock
|
||||||
thread_to_holder.emplace(this_thread_id, res);
|
|
||||||
res->thread_id = this_thread_id;
|
|
||||||
|
|
||||||
if (query_id != RWLockImpl::NO_QUERY)
|
if (query_id != RWLockImpl::NO_QUERY)
|
||||||
query_id_to_holder.emplace(query_id, res);
|
query_id_to_holder.emplace(query_id, res);
|
||||||
res->query_id = query_id;
|
res->query_id = query_id;
|
||||||
@ -143,7 +134,6 @@ RWLockImpl::LockHolderImpl::~LockHolderImpl()
|
|||||||
std::unique_lock lock(parent->mutex);
|
std::unique_lock lock(parent->mutex);
|
||||||
|
|
||||||
/// Remove weak_ptrs to the holder, since there are no owners of the current lock
|
/// Remove weak_ptrs to the holder, since there are no owners of the current lock
|
||||||
parent->thread_to_holder.erase(thread_id);
|
|
||||||
parent->query_id_to_holder.erase(query_id);
|
parent->query_id_to_holder.erase(query_id);
|
||||||
|
|
||||||
/// Removes myself from client list of our group
|
/// Removes myself from client list of our group
|
||||||
|
@ -6,7 +6,6 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <condition_variable>
|
#include <condition_variable>
|
||||||
#include <thread>
|
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
@ -19,7 +18,7 @@ using RWLock = std::shared_ptr<RWLockImpl>;
|
|||||||
|
|
||||||
|
|
||||||
/// Implements shared lock with FIFO service
|
/// Implements shared lock with FIFO service
|
||||||
/// Can be acquired recursively (several calls for the same query or the same OS thread) in Read mode
|
/// Can be acquired recursively (several calls for the same query) in Read mode
|
||||||
///
|
///
|
||||||
/// NOTE: it is important to allow acquiring the same lock in Read mode without waiting if it is already
|
/// NOTE: it is important to allow acquiring the same lock in Read mode without waiting if it is already
|
||||||
/// acquired by another thread of the same query. Otherwise the following deadlock is possible:
|
/// acquired by another thread of the same query. Otherwise the following deadlock is possible:
|
||||||
@ -55,7 +54,6 @@ private:
|
|||||||
struct Group;
|
struct Group;
|
||||||
using GroupsContainer = std::list<Group>;
|
using GroupsContainer = std::list<Group>;
|
||||||
using ClientsContainer = std::list<Type>;
|
using ClientsContainer = std::list<Type>;
|
||||||
using ThreadToHolder = std::map<std::thread::id, std::weak_ptr<LockHolderImpl>>;
|
|
||||||
using QueryIdToHolder = std::map<String, std::weak_ptr<LockHolderImpl>>;
|
using QueryIdToHolder = std::map<String, std::weak_ptr<LockHolderImpl>>;
|
||||||
|
|
||||||
/// Group of clients that should be executed concurrently
|
/// Group of clients that should be executed concurrently
|
||||||
@ -73,7 +71,6 @@ private:
|
|||||||
|
|
||||||
mutable std::mutex mutex;
|
mutable std::mutex mutex;
|
||||||
GroupsContainer queue;
|
GroupsContainer queue;
|
||||||
ThreadToHolder thread_to_holder;
|
|
||||||
QueryIdToHolder query_id_to_holder;
|
QueryIdToHolder query_id_to_holder;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -94,7 +94,7 @@ TEST(Common, RWLock_Recursive)
|
|||||||
{
|
{
|
||||||
for (int i = 0; i < 2 * cycles; ++i)
|
for (int i = 0; i < 2 * cycles; ++i)
|
||||||
{
|
{
|
||||||
auto lock = fifo_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY);
|
auto lock = fifo_lock->getLock(RWLockImpl::Write, "q1");
|
||||||
|
|
||||||
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
|
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
|
||||||
std::this_thread::sleep_for(sleep_for);
|
std::this_thread::sleep_for(sleep_for);
|
||||||
@ -105,17 +105,17 @@ TEST(Common, RWLock_Recursive)
|
|||||||
{
|
{
|
||||||
for (int i = 0; i < cycles; ++i)
|
for (int i = 0; i < cycles; ++i)
|
||||||
{
|
{
|
||||||
auto lock1 = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY);
|
auto lock1 = fifo_lock->getLock(RWLockImpl::Read, "q2");
|
||||||
|
|
||||||
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
|
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
|
||||||
std::this_thread::sleep_for(sleep_for);
|
std::this_thread::sleep_for(sleep_for);
|
||||||
|
|
||||||
auto lock2 = fifo_lock->getLock(RWLockImpl::Read, RWLockImpl::NO_QUERY);
|
auto lock2 = fifo_lock->getLock(RWLockImpl::Read, "q2");
|
||||||
|
|
||||||
EXPECT_ANY_THROW({fifo_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY);});
|
EXPECT_ANY_THROW({fifo_lock->getLock(RWLockImpl::Write, "q2");});
|
||||||
}
|
}
|
||||||
|
|
||||||
fifo_lock->getLock(RWLockImpl::Write, RWLockImpl::NO_QUERY);
|
fifo_lock->getLock(RWLockImpl::Write, "q2");
|
||||||
});
|
});
|
||||||
|
|
||||||
t1.join();
|
t1.join();
|
||||||
|
Loading…
Reference in New Issue
Block a user