Merge pull request #1192 from yandex/CLICKHOUSE-3246

Made RWLockFIFO recursive
This commit is contained in:
alexey-milovidov 2017-09-04 20:09:38 +03:00 committed by GitHub
commit be49273843
3 changed files with 160 additions and 54 deletions

View File

@ -7,14 +7,38 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::Client client)
{
GroupsContainer::iterator it_group;
ClientsContainer::iterator it_client;
auto this_thread_id = std::this_thread::get_id();
std::unique_lock<std::mutex> lock(mutex);
/// Check if the same thread is acquiring previously acquired lock
auto it_handler = thread_to_handler.find(this_thread_id);
if (it_handler != thread_to_handler.end())
{
auto handler_ptr = it_handler->second.lock();
if (!handler_ptr)
throw Exception("Lock handler cannot be nullptr. This is a bug", ErrorCodes::LOGICAL_ERROR);
if (type != Read || handler_ptr->it_group->type != Read)
throw Exception("Attempt to acquire exclusive lock recursively", ErrorCodes::LOGICAL_ERROR);
handler_ptr->it_client->info += "; " + client.info;
return handler_ptr;
}
if (type == Type::Write || queue.empty() || queue.back().type == Type::Write)
{
/// Create new group of clients
@ -44,7 +68,11 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C
it_client->enqueue_time = time(nullptr);
it_client->type = type;
LockHandler res = std::make_unique<LockHandlerImpl>(shared_from_this(), it_group, it_client);
LockHandler res(new LockHandlerImpl(shared_from_this(), it_group, it_client));
/// Insert myself (weak_ptr to the handler) to threads set to implement recursive lock
it_handler = thread_to_handler.emplace(this_thread_id, res).first;
res->it_handler = it_handler;
/// We are first, we should not wait anything
/// If we are not the first client in the group, a notification could be already sent
@ -79,14 +107,18 @@ RWLockFIFO::Clients RWLockFIFO::getClientsInTheQueue() const
}
void RWLockFIFO::LockHandlerImpl::unlock()
RWLockFIFO::LockHandlerImpl::~LockHandlerImpl()
{
std::unique_lock<std::mutex> lock(parent->mutex);
auto & clients = it_group->clients;
clients.erase(it_client);
/// Remove weak_ptr to the handler, since there are no owners of the current lock
parent->thread_to_handler.erase(it_handler);
if (clients.empty())
/// Removes myself from client list of our group
it_group->clients.erase(it_client);
/// Remove the group if we were the last client and notify the next group
if (it_group->clients.empty())
{
auto & queue = parent->queue;
queue.erase(it_group);
@ -99,11 +131,8 @@ void RWLockFIFO::LockHandlerImpl::unlock()
}
RWLockFIFO::LockHandlerImpl::~LockHandlerImpl()
{
if (parent)
unlock();
}
RWLockFIFO::LockHandlerImpl::LockHandlerImpl(RWLockFIFOPtr && parent, RWLockFIFO::GroupsContainer::iterator it_group,
RWLockFIFO::ClientsContainer::iterator it_client)
: parent{std::move(parent)}, it_group{it_group}, it_client{it_client} {}
}

View File

@ -4,6 +4,8 @@
#include <vector>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <map>
#include <string>
@ -15,7 +17,7 @@ using RWLockFIFOPtr = std::shared_ptr<RWLockFIFO>;
/// Implements shared lock with FIFO service
/// It does not work as recursive mutex, so a deadlock will occur if you try to acquire 2 locks in the same thread
/// You could call it recursively (several calls from the same thread) in Read mode
class RWLockFIFO : public std::enable_shared_from_this<RWLockFIFO>
{
public:
@ -36,6 +38,8 @@ public:
{
explicit Client(const std::string & info = {}) : info{info} {}
bool isStarted() { return start_time != 0; }
std::string info;
int thread_number = 0;
std::time_t enqueue_time = 0;
@ -43,8 +47,11 @@ public:
Type type;
};
/// Just use LockHandler::reset() to release the lock
class LockHandlerImpl;
using LockHandler = std::unique_ptr<LockHandlerImpl>;
using LockHandler = std::shared_ptr<LockHandlerImpl>;
/// Waits in the queue and returns appropriate lock
LockHandler getLock(Type type, Client client = Client{});
@ -66,6 +73,7 @@ private:
struct Group;
using GroupsContainer = std::list<Group>;
using ClientsContainer = std::list<Client>;
using ThreadToHandler = std::map<std::thread::id, std::weak_ptr<LockHandlerImpl>>;
/// Group of clients that should be executed concurrently
/// i.e. a group could contain several readers, but only one writer
@ -86,16 +94,14 @@ public:
RWLockFIFOPtr parent;
GroupsContainer::iterator it_group;
ClientsContainer::iterator it_client;
ThreadToHandler::iterator it_handler;
LockHandlerImpl(RWLockFIFOPtr && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client);
public:
LockHandlerImpl(RWLockFIFOPtr && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client)
: parent{std::move(parent)}, it_group{it_group}, it_client{it_client} {}
LockHandlerImpl(const LockHandlerImpl & other) = delete;
/// Unlocks acquired lock
void unlock();
~LockHandlerImpl();
friend class RWLockFIFO;
@ -105,6 +111,7 @@ private:
mutable std::mutex mutex;
GroupsContainer queue;
ThreadToHandler thread_to_handler;
};

View File

@ -11,8 +11,12 @@
using namespace DB;
static void execute_1(size_t threads, int round, int cycles)
TEST(Common, RWLockFIFO_1)
{
constexpr int cycles = 10000;
const std::vector<size_t> pool_sizes{1, 2, 4, 8};
static std::atomic<int> readers{0};
static std::atomic<int> writers{0};
@ -21,43 +25,39 @@ static void execute_1(size_t threads, int round, int cycles)
static thread_local std::random_device rd;
static thread_local std::mt19937 gen(rd());
for (int i = 0; i < cycles; ++i)
{
auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockFIFO::Read : RWLockFIFO::Write;
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 1000)(gen));
auto lock = fifo_lock->getLock(type, "RW");
if (type == RWLockFIFO::Write)
auto func = [&] (size_t threads, int round) {
for (int i = 0; i < cycles; ++i)
{
++writers;
auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockFIFO::Read : RWLockFIFO::Write;
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
ASSERT_EQ(writers, 1);
ASSERT_EQ(readers, 0);
auto lock = fifo_lock->getLock(type, "RW");
std::this_thread::sleep_for(sleep_for);
if (type == RWLockFIFO::Write)
{
++writers;
--writers;
ASSERT_EQ(writers, 1);
ASSERT_EQ(readers, 0);
std::this_thread::sleep_for(sleep_for);
--writers;
}
else
{
++readers;
ASSERT_EQ(writers, 0);
ASSERT_GE(readers, 1);
ASSERT_LE(readers, threads);
std::this_thread::sleep_for(sleep_for);
--readers;
}
}
else
{
++readers;
ASSERT_EQ(writers, 0);
ASSERT_GE(readers, 1);
ASSERT_LE(readers, threads);
std::this_thread::sleep_for(sleep_for);
--readers;
}
}
}
TEST(Common, RWLockFIFO_1)
{
constexpr int cycles = 10000;
const std::vector<size_t> pool_sizes{1, 2, 4, 8};
};
for (auto pool_size : pool_sizes)
{
@ -65,10 +65,9 @@ TEST(Common, RWLockFIFO_1)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
std::vector<std::thread> threads;
threads.reserve(pool_size);
std::list<std::thread> threads;
for (int thread = 0; thread < pool_size; ++thread)
threads.emplace_back([=] () { execute_1(pool_size, round, cycles); });
threads.emplace_back([=] () { func(pool_size, round); });
for (auto & thread : threads)
thread.join();
@ -78,3 +77,74 @@ TEST(Common, RWLockFIFO_1)
}
}
}
TEST(Common, RWLockFIFO_Recursive)
{
constexpr auto cycles = 10000;
static auto fifo_lock = RWLockFIFO::create();
static __thread std::random_device rd;
static __thread std::mt19937 gen(rd());
std::thread t1([&] () {
for (int i = 0; i < 2 * cycles; ++i)
{
auto lock = fifo_lock->getLock(RWLockFIFO::Write);
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
std::this_thread::sleep_for(sleep_for);
}
});
std::thread t2([&] () {
for (int i = 0; i < cycles; ++i)
{
auto lock1 = fifo_lock->getLock(RWLockFIFO::Read);
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
std::this_thread::sleep_for(sleep_for);
auto lock2 = fifo_lock->getLock(RWLockFIFO::Read);
EXPECT_ANY_THROW({fifo_lock->getLock(RWLockFIFO::Write);});
}
fifo_lock->getLock(RWLockFIFO::Write);
});
t1.join();
t2.join();
}
TEST(Common, RWLockFIFO_PerfTest_Readers)
{
constexpr int cycles = 1000000; // 1 mln
const std::vector<size_t> pool_sizes{1, 2, 4, 8};
static auto fifo_lock = RWLockFIFO::create();
for (auto pool_size : pool_sizes)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
auto func = [&] ()
{
for (auto i = 0; i < cycles; ++i)
{
auto lock = fifo_lock->getLock(RWLockFIFO::Read);
}
};
std::list<std::thread> threads;
for (int thread = 0; thread < pool_size; ++thread)
threads.emplace_back(func);
for (auto & thread : threads)
thread.join();
auto total_time = watch.elapsedSeconds();
std::cout << "Threads " << pool_size << ", total_time " << std::setprecision(2) << total_time << "\n";
}
}