Minor refactoring

This commit is contained in:
Ivan Lezhankin 2018-11-27 19:45:45 +03:00
parent 11ee7797b4
commit 470f96ce19
6 changed files with 9960 additions and 9993 deletions

View File

@ -1,4 +1,4 @@
#include "RWLockFIFO.h" #include "RWLock.h"
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Poco/Ext/ThreadNumber.h> #include <Poco/Ext/ThreadNumber.h>
@ -33,15 +33,15 @@ namespace ErrorCodes
} }
class RWLockFIFO::LockHandlerImpl class RWLockImpl::LockHandlerImpl
{ {
RWLockFIFOPtr parent; RWLock parent;
GroupsContainer::iterator it_group; GroupsContainer::iterator it_group;
ClientsContainer::iterator it_client; ClientsContainer::iterator it_client;
ThreadToHandler::iterator it_handler; ThreadToHandler::iterator it_handler;
CurrentMetrics::Increment active_client_increment; CurrentMetrics::Increment active_client_increment;
LockHandlerImpl(RWLockFIFOPtr && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client); LockHandlerImpl(RWLock && parent, GroupsContainer::iterator it_group, ClientsContainer::iterator it_client);
public: public:
@ -49,11 +49,11 @@ public:
~LockHandlerImpl(); ~LockHandlerImpl();
friend class RWLockFIFO; friend class RWLockImpl;
}; };
RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::Client client) RWLockImpl::LockHandler RWLockImpl::getLock(RWLockImpl::Type type, RWLockImpl::Client client)
{ {
Stopwatch watch(CLOCK_MONOTONIC_COARSE); Stopwatch watch(CLOCK_MONOTONIC_COARSE);
CurrentMetrics::Increment waiting_client_increment((type == Read) ? CurrentMetrics::RWLockWaitingReaders CurrentMetrics::Increment waiting_client_increment((type == Read) ? CurrentMetrics::RWLockWaitingReaders
@ -142,24 +142,7 @@ RWLockFIFO::LockHandler RWLockFIFO::getLock(RWLockFIFO::Type type, RWLockFIFO::C
} }
RWLockFIFO::Clients RWLockFIFO::getClientsInTheQueue() const RWLockImpl::LockHandlerImpl::~LockHandlerImpl()
{
std::unique_lock<std::mutex> lock(mutex);
Clients res;
for (const auto & group : queue)
{
for (const auto & client : group.clients)
{
res.emplace_back(client);
}
}
return res;
}
RWLockFIFO::LockHandlerImpl::~LockHandlerImpl()
{ {
std::unique_lock<std::mutex> lock(parent->mutex); std::unique_lock<std::mutex> lock(parent->mutex);
@ -183,10 +166,10 @@ RWLockFIFO::LockHandlerImpl::~LockHandlerImpl()
} }
RWLockFIFO::LockHandlerImpl::LockHandlerImpl(RWLockFIFOPtr && parent, RWLockFIFO::GroupsContainer::iterator it_group, RWLockImpl::LockHandlerImpl::LockHandlerImpl(RWLock && parent, RWLockImpl::GroupsContainer::iterator it_group,
RWLockFIFO::ClientsContainer::iterator it_client) RWLockImpl::ClientsContainer::iterator it_client)
: parent{std::move(parent)}, it_group{it_group}, it_client{it_client}, : parent{std::move(parent)}, it_group{it_group}, it_client{it_client},
active_client_increment{(it_client->type == RWLockFIFO::Read) ? CurrentMetrics::RWLockActiveReaders active_client_increment{(it_client->type == RWLockImpl::Read) ? CurrentMetrics::RWLockActiveReaders
: CurrentMetrics::RWLockActiveWriters} : CurrentMetrics::RWLockActiveWriters}
{} {}

View File

@ -12,19 +12,19 @@
namespace DB namespace DB
{ {
class RWLockFIFO; class RWLockImpl;
using RWLockFIFOPtr = std::shared_ptr<RWLockFIFO>; using RWLock = std::shared_ptr<RWLockImpl>;
/// Implements shared lock with FIFO service /// Implements shared lock with FIFO service
/// You could call it recursively (several calls from the same thread) in Read mode /// Can be acquired recursively (several calls from the same thread) in Read mode
class RWLockFIFO : public std::enable_shared_from_this<RWLockFIFO> class RWLockImpl : public std::enable_shared_from_this<RWLockImpl>
{ {
public: public:
enum Type enum Type
{ {
Read, Read,
Write Write,
}; };
private: private:
@ -44,11 +44,7 @@ private:
}; };
public: public:
static RWLockFIFOPtr create() static RWLock create() { return RWLock(new RWLockImpl); }
{
return RWLockFIFOPtr(new RWLockFIFO);
}
/// Just use LockHandler::reset() to release the lock /// Just use LockHandler::reset() to release the lock
class LockHandlerImpl; class LockHandlerImpl;
@ -58,19 +54,10 @@ public:
/// Waits in the queue and returns appropriate lock /// Waits in the queue and returns appropriate lock
LockHandler getLock(Type type, Client client = Client{}); LockHandler getLock(Type type, Client client = Client{});
LockHandler getLock(Type type, const std::string & who) { return getLock(type, Client(who)); }
LockHandler getLock(Type type, const std::string & who)
{
return getLock(type, Client(who));
}
using Clients = std::vector<Client>;
/// Returns list of executing and waiting clients
Clients getClientsInTheQueue() const;
private: private:
RWLockFIFO() = default; RWLockImpl() = default;
struct Group; struct Group;
using GroupsContainer = std::list<Group>; using GroupsContainer = std::list<Group>;

View File

@ -4,7 +4,7 @@
#endif #endif
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <Common/RWLockFIFO.h> #include <Common/RWLock.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <common/Types.h> #include <common/Types.h>
#include <common/ThreadPool.h> #include <common/ThreadPool.h>
@ -18,7 +18,7 @@
using namespace DB; using namespace DB;
TEST(Common, RWLockFIFO_1) TEST(Common, RWLock_1)
{ {
constexpr int cycles = 1000; constexpr int cycles = 1000;
const std::vector<size_t> pool_sizes{1, 2, 4, 8}; const std::vector<size_t> pool_sizes{1, 2, 4, 8};
@ -26,7 +26,7 @@ TEST(Common, RWLockFIFO_1)
static std::atomic<int> readers{0}; static std::atomic<int> readers{0};
static std::atomic<int> writers{0}; static std::atomic<int> writers{0};
static auto fifo_lock = RWLockFIFO::create(); static auto fifo_lock = RWLockImpl::create();
static thread_local std::random_device rd; static thread_local std::random_device rd;
static thread_local pcg64 gen(rd()); static thread_local pcg64 gen(rd());
@ -35,12 +35,12 @@ TEST(Common, RWLockFIFO_1)
{ {
for (int i = 0; i < cycles; ++i) for (int i = 0; i < cycles; ++i)
{ {
auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockFIFO::Read : RWLockFIFO::Write; auto type = (std::uniform_int_distribution<>(0, 9)(gen) >= round) ? RWLockImpl::Read : RWLockImpl::Write;
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen)); auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
auto lock = fifo_lock->getLock(type, "RW"); auto lock = fifo_lock->getLock(type, "RW");
if (type == RWLockFIFO::Write) if (type == RWLockImpl::Write)
{ {
++writers; ++writers;
@ -85,11 +85,11 @@ TEST(Common, RWLockFIFO_1)
} }
} }
TEST(Common, RWLockFIFO_Recursive) TEST(Common, RWLock_Recursive)
{ {
constexpr auto cycles = 10000; constexpr auto cycles = 10000;
static auto fifo_lock = RWLockFIFO::create(); static auto fifo_lock = RWLockImpl::create();
static thread_local std::random_device rd; static thread_local std::random_device rd;
static thread_local pcg64 gen(rd()); static thread_local pcg64 gen(rd());
@ -98,7 +98,7 @@ TEST(Common, RWLockFIFO_Recursive)
{ {
for (int i = 0; i < 2 * cycles; ++i) for (int i = 0; i < 2 * cycles; ++i)
{ {
auto lock = fifo_lock->getLock(RWLockFIFO::Write); auto lock = fifo_lock->getLock(RWLockImpl::Write);
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen)); auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
std::this_thread::sleep_for(sleep_for); std::this_thread::sleep_for(sleep_for);
@ -109,17 +109,17 @@ TEST(Common, RWLockFIFO_Recursive)
{ {
for (int i = 0; i < cycles; ++i) for (int i = 0; i < cycles; ++i)
{ {
auto lock1 = fifo_lock->getLock(RWLockFIFO::Read); auto lock1 = fifo_lock->getLock(RWLockImpl::Read);
auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen)); auto sleep_for = std::chrono::duration<int, std::micro>(std::uniform_int_distribution<>(1, 100)(gen));
std::this_thread::sleep_for(sleep_for); std::this_thread::sleep_for(sleep_for);
auto lock2 = fifo_lock->getLock(RWLockFIFO::Read); auto lock2 = fifo_lock->getLock(RWLockImpl::Read);
EXPECT_ANY_THROW({fifo_lock->getLock(RWLockFIFO::Write);}); EXPECT_ANY_THROW({fifo_lock->getLock(RWLockImpl::Write);});
} }
fifo_lock->getLock(RWLockFIFO::Write); fifo_lock->getLock(RWLockImpl::Write);
}); });
t1.join(); t1.join();
@ -127,12 +127,12 @@ TEST(Common, RWLockFIFO_Recursive)
} }
TEST(Common, RWLockFIFO_PerfTest_Readers) TEST(Common, RWLock_PerfTest_Readers)
{ {
constexpr int cycles = 100000; // 100k constexpr int cycles = 100000; // 100k
const std::vector<size_t> pool_sizes{1, 2, 4, 8}; const std::vector<size_t> pool_sizes{1, 2, 4, 8};
static auto fifo_lock = RWLockFIFO::create(); static auto fifo_lock = RWLockImpl::create();
for (auto pool_size : pool_sizes) for (auto pool_size : pool_sizes)
{ {
@ -142,7 +142,7 @@ TEST(Common, RWLockFIFO_PerfTest_Readers)
{ {
for (auto i = 0; i < cycles; ++i) for (auto i = 0; i < cycles; ++i)
{ {
auto lock = fifo_lock->getLock(RWLockFIFO::Read); auto lock = fifo_lock->getLock(RWLockImpl::Read);
} }
}; };

View File

@ -8,9 +8,9 @@ TableStructureReadLock::TableStructureReadLock(StoragePtr storage_, bool lock_st
: storage(storage_) : storage(storage_)
{ {
if (lock_data) if (lock_data)
data_lock = storage->data_lock->getLock(RWLockFIFO::Read, who); data_lock = storage->data_lock->getLock(RWLockImpl::Read, who);
if (lock_structure) if (lock_structure)
structure_lock = storage->structure_lock->getLock(RWLockFIFO::Read, who); structure_lock = storage->structure_lock->getLock(RWLockImpl::Read, who);
} }
} }

View File

@ -2,7 +2,7 @@
#include <Core/Names.h> #include <Core/Names.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/RWLockFIFO.h> #include <Common/RWLock.h>
#include <Core/Names.h> #include <Core/Names.h>
#include <Core/QueryProcessingStage.h> #include <Core/QueryProcessingStage.h>
#include <Databases/IDatabase.h> #include <Databases/IDatabase.h>
@ -28,9 +28,6 @@ class Context;
class IBlockInputStream; class IBlockInputStream;
class IBlockOutputStream; class IBlockOutputStream;
class RWLockFIFO;
using RWLockFIFOPtr = std::shared_ptr<RWLockFIFO>;
using StorageActionBlockType = size_t; using StorageActionBlockType = size_t;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>; using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
@ -64,8 +61,8 @@ private:
StoragePtr storage; StoragePtr storage;
/// Order is important. /// Order is important.
RWLockFIFO::LockHandler data_lock; RWLockImpl::LockHandler data_lock;
RWLockFIFO::LockHandler structure_lock; RWLockImpl::LockHandler structure_lock;
public: public:
TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data, const std::string & who); TableStructureReadLock(StoragePtr storage_, bool lock_structure, bool lock_data, const std::string & who);
@ -75,8 +72,8 @@ public:
using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>; using TableStructureReadLockPtr = std::shared_ptr<TableStructureReadLock>;
using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>; using TableStructureReadLocks = std::vector<TableStructureReadLockPtr>;
using TableStructureWriteLock = RWLockFIFO::LockHandler; using TableStructureWriteLock = RWLockImpl::LockHandler;
using TableDataWriteLock = RWLockFIFO::LockHandler; using TableDataWriteLock = RWLockImpl::LockHandler;
using TableFullWriteLock = std::pair<TableDataWriteLock, TableStructureWriteLock>; using TableFullWriteLock = std::pair<TableDataWriteLock, TableStructureWriteLock>;
@ -148,7 +145,7 @@ public:
*/ */
TableDataWriteLock lockDataForAlter(const std::string & who = "Alter") TableDataWriteLock lockDataForAlter(const std::string & who = "Alter")
{ {
auto res = data_lock->getLock(RWLockFIFO::Write, who); auto res = data_lock->getLock(RWLockImpl::Write, who);
if (is_dropped) if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return res; return res;
@ -156,7 +153,7 @@ public:
TableStructureWriteLock lockStructureForAlter(const std::string & who = "Alter") TableStructureWriteLock lockStructureForAlter(const std::string & who = "Alter")
{ {
auto res = structure_lock->getLock(RWLockFIFO::Write, who); auto res = structure_lock->getLock(RWLockImpl::Write, who);
if (is_dropped) if (is_dropped)
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED); throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
return res; return res;
@ -367,7 +364,7 @@ private:
* 2) all changes to the data after releasing the lock will be based on the structure of the table at the time after the lock was released. * 2) all changes to the data after releasing the lock will be based on the structure of the table at the time after the lock was released.
* You need to take for read for the entire time of the operation that changes the data. * You need to take for read for the entire time of the operation that changes the data.
*/ */
mutable RWLockFIFOPtr data_lock = RWLockFIFO::create(); mutable RWLock data_lock = RWLockImpl::create();
/** Lock for multiple columns and path to table. It is taken for write at RENAME, ALTER (for ALTER MODIFY for a while) and DROP. /** Lock for multiple columns and path to table. It is taken for write at RENAME, ALTER (for ALTER MODIFY for a while) and DROP.
* It is taken for read for the whole time of SELECT, INSERT and merge parts (for MergeTree). * It is taken for read for the whole time of SELECT, INSERT and merge parts (for MergeTree).
@ -376,7 +373,7 @@ private:
* That is, if this lock is taken for write, you should not worry about `parts_writing_lock`. * That is, if this lock is taken for write, you should not worry about `parts_writing_lock`.
* parts_writing_lock is only needed for cases when you do not want to take `table_structure_lock` for long operations (ALTER MODIFY). * parts_writing_lock is only needed for cases when you do not want to take `table_structure_lock` for long operations (ALTER MODIFY).
*/ */
mutable RWLockFIFOPtr structure_lock = RWLockFIFO::create(); mutable RWLock structure_lock = RWLockImpl::create();
}; };
/// table name -> table /// table name -> table

File diff suppressed because it is too large Load Diff