From 706d4db332f965012c97cdc77c89d2d4f0404f6b Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Mon, 4 Sep 2017 15:49:49 +0300 Subject: [PATCH] Made RWLockFIFO recursive. [#CLICKHOUSE-3246] --- dbms/src/Common/RWLockFIFO.cpp | 51 +++++-- dbms/src/Common/RWLockFIFO.h | 21 ++- .../Common/tests/gtest_rw_lock_fifo.cpp.cpp | 142 +++++++++++++----- 3 files changed, 160 insertions(+), 54 deletions(-) diff --git a/dbms/src/Common/RWLockFIFO.cpp b/dbms/src/Common/RWLockFIFO.cpp index 2367ee6f76f..ec5dc35c990 100644 --- a/dbms/src/Common/RWLockFIFO.cpp +++ b/dbms/src/Common/RWLockFIFO.cpp @@ -7,14 +7,38 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::Client client) { GroupsContainer::iterator it_group; ClientsContainer::iterator it_client; + auto this_thread_id = std::this_thread::get_id(); + std::unique_lock lock(mutex); + /// Check if the same thread is acquiring previously acquired lock + auto it_handler = thread_to_handler.find(this_thread_id); + if (it_handler != thread_to_handler.end()) + { + auto handler_ptr = it_handler->second.lock(); + + if (!handler_ptr) + throw Exception("Lock handler cannot be nullptr. This is a bug", ErrorCodes::LOGICAL_ERROR); + + if (type != Read || handler_ptr->it_group->type != Read) + throw Exception("Attempt to acquire exclusive lock recursively", ErrorCodes::LOGICAL_ERROR); + + handler_ptr->it_client->info += "; " + client.info; + + return handler_ptr; + } + if (type == Type::Write || queue.empty() || queue.back().type == Type::Write) { /// Create new group of clients @@ -44,7 +68,11 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C it_client->enqueue_time = time(nullptr); it_client->type = type; - LockHandler res = std::make_unique(shared_from_this(), it_group, it_client); + LockHandler res(new LockHandlerImpl(shared_from_this(), it_group, it_client)); + + /// Insert myself (weak_ptr to the handler) to threads set to implement recursive lock + it_handler = thread_to_handler.emplace(this_thread_id, res).first; + res->it_handler = it_handler; /// We are first, we should not wait anything /// If we are not the first client in the group, a notification could be already sent @@ -79,14 +107,18 @@ RWLockFIFO::Clients RWLockFIFO::getClientsInTheQueue() const } -void RWLockFIFO::LockHandlerImpl::unlock() +RWLockFIFO::LockHandlerImpl::~LockHandlerImpl() { std::unique_lock lock(parent->mutex); - auto & clients = it_group->clients; - clients.erase(it_client); + /// Remove weak_ptr to the handler, since there are no owners of the current lock + parent->thread_to_handler.erase(it_handler); - if (clients.empty()) + /// Removes myself from client list of our group + it_group->clients.erase(it_client); + + /// Remove the group if we were the last client and notify the next group + if (it_group->clients.empty()) { auto & queue = parent->queue; queue.erase(it_group); @@ -99,11 +131,8 @@ void RWLockFIFO::LockHandlerImpl::unlock() } -RWLockFIFO::LockHandlerImpl::~LockHandlerImpl() -{ - if (parent) - unlock(); -} - +RWLockFIFO::LockHandlerImpl::LockHandlerImpl(RWLockFIFOPtr && parent, RWLockFIFO::GroupsContainer::iterator it_group, + RWLockFIFO::ClientsContainer::iterator it_client) + : parent{std::move(parent)}, it_group{it_group}, it_client{it_client} {} } diff --git a/dbms/src/Common/RWLockFIFO.h b/dbms/src/Common/RWLockFIFO.h index 6353e4c6cbe..a106fa36edd 100644 --- a/dbms/src/Common/RWLockFIFO.h +++ b/dbms/src/Common/RWLockFIFO.h @@ -4,6 +4,8 @@ #include #include #include +#include +#include namespace DB @@ -14,7 +16,7 @@ using RWLockFIFOPtr = std::shared_ptr; /// Implements shared lock with FIFO service -/// It does not work as recursive mutex, so a deadlock will occur if you try to acquire 2 locks in the same thread +/// You could call it recursively (several calls from the same thread) in Read mode class RWLockFIFO : public std::enable_shared_from_this { public: @@ -35,6 +37,8 @@ public: { explicit Client(const std::string & info = {}) : info{info} {} + bool isStarted() { return start_time != 0; } + std::string info; int thread_number = 0; std::time_t enqueue_time = 0; @@ -42,8 +46,11 @@ public: Type type; }; + + /// Just use LockHandler::reset() to release the lock class LockHandlerImpl; - using LockHandler = std::unique_ptr; + using LockHandler = std::shared_ptr; + /// Waits in the queue and returns appropriate lock LockHandler getLock(Type type, Client client = Client{}); @@ -65,6 +72,7 @@ private: struct Group; using GroupsContainer = std::list; using ClientsContainer = std::list; + using ThreadToHandler = std::map>; /// Group of clients that should be executed concurrently /// i.e. a group could contain several readers, but only one writer @@ -85,16 +93,14 @@ public: RWLockFIFOPtr parent; GroupsContainer::iterator it_group; ClientsContainer::iterator it_client; + ThreadToHandler::iterator it_handler; + + LockHandlerImpl(RWLockFIFOPtr && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client); public: - LockHandlerImpl(RWLockFIFOPtr && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client) - : parent{std::move(parent)}, it_group{it_group}, it_client{it_client} {} LockHandlerImpl(const LockHandlerImpl & other) = delete; - /// Unlocks acquired lock - void unlock(); - ~LockHandlerImpl(); friend class RWLockFIFO; @@ -104,6 +110,7 @@ private: mutable std::mutex mutex; GroupsContainer queue; + ThreadToHandler thread_to_handler; }; diff --git a/dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp b/dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp index 63a29bbe567..70d9815d89c 100644 --- a/dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp +++ b/dbms/src/Common/tests/gtest_rw_lock_fifo.cpp.cpp @@ -11,8 +11,12 @@ using namespace DB; -static void execute_1(size_t threads, int round, int cycles) + +TEST(Common, RWLockFIFO_1) { + constexpr int cycles = 10000; + const std::vector pool_sizes{1, 2, 4, 8}; + static std::atomic readers{0}; static std::atomic writers{0}; @@ -21,43 +25,39 @@ static void execute_1(size_t threads, int round, int cycles) static __thread std::random_device rd; static __thread std::mt19937 gen(rd()); - for (int i = 0; i < cycles; ++i) - { - auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockFIFO::Read : RWLockFIFO::Write; - auto sleep_for = std::chrono::duration(std::uniform_int_distribution<>(1, 1000)(gen)); - - auto lock = fifo_lock->getLock(type, "RW"); - - if (type == RWLockFIFO::Write) + auto func = [&] (size_t threads, int round) { + for (int i = 0; i < cycles; ++i) { - ++writers; + auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockFIFO::Read : RWLockFIFO::Write; + auto sleep_for = std::chrono::duration(std::uniform_int_distribution<>(1, 100)(gen)); - ASSERT_EQ(writers, 1); - ASSERT_EQ(readers, 0); + auto lock = fifo_lock->getLock(type, "RW"); - std::this_thread::sleep_for(sleep_for); + if (type == RWLockFIFO::Write) + { + ++writers; - --writers; + ASSERT_EQ(writers, 1); + ASSERT_EQ(readers, 0); + + std::this_thread::sleep_for(sleep_for); + + --writers; + } + else + { + ++readers; + + ASSERT_EQ(writers, 0); + ASSERT_GE(readers, 1); + ASSERT_LE(readers, threads); + + std::this_thread::sleep_for(sleep_for); + + --readers; + } } - else - { - ++readers; - - ASSERT_EQ(writers, 0); - ASSERT_GE(readers, 1); - ASSERT_LE(readers, threads); - - std::this_thread::sleep_for(sleep_for); - - --readers; - } - } -} - -TEST(Common, RWLockFIFO_1) -{ - constexpr int cycles = 10000; - const std::vector pool_sizes{1, 2, 4, 8}; + }; for (auto pool_size : pool_sizes) { @@ -65,10 +65,9 @@ TEST(Common, RWLockFIFO_1) { Stopwatch watch(CLOCK_MONOTONIC_COARSE); - std::vector threads; - threads.reserve(pool_size); + std::list threads; for (int thread = 0; thread < pool_size; ++thread) - threads.emplace_back([=] () { execute_1(pool_size, round, cycles); }); + threads.emplace_back([=] () { func(pool_size, round); }); for (auto & thread : threads) thread.join(); @@ -78,3 +77,74 @@ TEST(Common, RWLockFIFO_1) } } } + +TEST(Common, RWLockFIFO_Recursive) +{ + constexpr auto cycles = 10000; + + static auto fifo_lock = RWLockFIFO::create(); + + static __thread std::random_device rd; + static __thread std::mt19937 gen(rd()); + + std::thread t1([&] () { + for (int i = 0; i < 2 * cycles; ++i) + { + auto lock = fifo_lock->getLock(RWLockFIFO::Write); + + auto sleep_for = std::chrono::duration(std::uniform_int_distribution<>(1, 100)(gen)); + std::this_thread::sleep_for(sleep_for); + } + }); + + std::thread t2([&] () { + for (int i = 0; i < cycles; ++i) + { + auto lock1 = fifo_lock->getLock(RWLockFIFO::Read); + + auto sleep_for = std::chrono::duration(std::uniform_int_distribution<>(1, 100)(gen)); + std::this_thread::sleep_for(sleep_for); + + auto lock2 = fifo_lock->getLock(RWLockFIFO::Read); + + EXPECT_ANY_THROW({fifo_lock->getLock(RWLockFIFO::Write);}); + } + + fifo_lock->getLock(RWLockFIFO::Write); + }); + + t1.join(); + t2.join(); +} + + +TEST(Common, RWLockFIFO_PerfTest_Readers) +{ + constexpr int cycles = 1000000; // 1 mln + const std::vector pool_sizes{1, 2, 4, 8}; + + static auto fifo_lock = RWLockFIFO::create(); + + for (auto pool_size : pool_sizes) + { + Stopwatch watch(CLOCK_MONOTONIC_COARSE); + + auto func = [&] () + { + for (auto i = 0; i < cycles; ++i) + { + auto lock = fifo_lock->getLock(RWLockFIFO::Read); + } + }; + + std::list threads; + for (int thread = 0; thread < pool_size; ++thread) + threads.emplace_back(func); + + for (auto & thread : threads) + thread.join(); + + auto total_time = watch.elapsedSeconds(); + std::cout << "Threads " << pool_size << ", total_time " << std::setprecision(2) << total_time << "\n"; + } +}