mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
Merge pull request #57730 from ClickHouse/revert-57454-fix-rwlock
Revert "Fix RWLock inconsistency after write lock timeout"
This commit is contained in:
commit
d74f72d310
@ -3,8 +3,6 @@
|
|||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
#include <IO/Operators.h>
|
|
||||||
#include <IO/WriteBufferFromString.h>
|
|
||||||
|
|
||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
@ -157,34 +155,25 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
|
|||||||
|
|
||||||
if (type == Type::Write)
|
if (type == Type::Write)
|
||||||
{
|
{
|
||||||
/// Always add a group for a writer (writes are never performed simultaneously).
|
|
||||||
writers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
|
writers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
|
||||||
}
|
}
|
||||||
else
|
else if (readers_queue.empty() ||
|
||||||
|
(rdlock_owner == readers_queue.begin() && readers_queue.size() == 1 && !writers_queue.empty()))
|
||||||
{
|
{
|
||||||
/// We don't always add a group to readers_queue here because multiple readers can use the same group.
|
|
||||||
/// We can reuse the last group if the last group didn't get ownership yet,
|
|
||||||
/// or even if it got ownership but there are no writers waiting in writers_queue.
|
|
||||||
bool can_use_last_group = !readers_queue.empty() && (!readers_queue.back().ownership || writers_queue.empty());
|
|
||||||
|
|
||||||
if (!can_use_last_group)
|
|
||||||
readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
|
readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
|
||||||
}
|
}
|
||||||
|
|
||||||
GroupsContainer::iterator it_group =
|
GroupsContainer::iterator it_group =
|
||||||
(type == Type::Write) ? std::prev(writers_queue.end()) : std::prev(readers_queue.end());
|
(type == Type::Write) ? std::prev(writers_queue.end()) : std::prev(readers_queue.end());
|
||||||
|
|
||||||
/// Lock is free to acquire
|
/// Lock is free to acquire
|
||||||
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
|
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
|
||||||
{
|
{
|
||||||
/// Set `rdlock_owner` or `wrlock_owner` and make it owner.
|
|
||||||
(type == Read ? rdlock_owner : wrlock_owner) = it_group; /// SM2: nothrow
|
(type == Read ? rdlock_owner : wrlock_owner) = it_group; /// SM2: nothrow
|
||||||
grantOwnership(it_group);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/// Wait until our group becomes the lock owner
|
/// Wait until our group becomes the lock owner
|
||||||
const auto predicate = [&] () { return it_group->ownership; };
|
const auto predicate = [&] () { return it_group == (type == Read ? rdlock_owner : wrlock_owner); };
|
||||||
|
|
||||||
if (lock_deadline_tp == std::chrono::time_point<std::chrono::steady_clock>::max())
|
if (lock_deadline_tp == std::chrono::time_point<std::chrono::steady_clock>::max())
|
||||||
{
|
{
|
||||||
@ -204,20 +193,15 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
|
|||||||
/// Rollback(SM1): nothrow
|
/// Rollback(SM1): nothrow
|
||||||
if (it_group->requests == 0)
|
if (it_group->requests == 0)
|
||||||
{
|
{
|
||||||
((type == Read) ? readers_queue : writers_queue).erase(it_group);
|
/// When WRITE lock fails, we need to notify next read that is waiting,
|
||||||
|
/// to avoid handing request, hence next=true.
|
||||||
|
dropOwnerGroupAndPassOwnership(it_group, /* next= */ true);
|
||||||
}
|
}
|
||||||
/// While we were waiting for this write lock (which has just failed) more readers could start waiting,
|
|
||||||
/// we need to wake up them now.
|
|
||||||
if ((rdlock_owner != readers_queue.end()) && writers_queue.empty())
|
|
||||||
grantOwnershipToAllReaders();
|
|
||||||
return nullptr;
|
return nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Our group must be an owner here.
|
|
||||||
chassert(it_group->ownership);
|
|
||||||
|
|
||||||
if (request_has_query_id)
|
if (request_has_query_id)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
@ -232,7 +216,7 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
|
|||||||
/// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
|
/// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
|
||||||
/// We only need to roll back the changes to these objects: owner_queries and the readers/writers queue
|
/// We only need to roll back the changes to these objects: owner_queries and the readers/writers queue
|
||||||
if (it_group->requests == 0)
|
if (it_group->requests == 0)
|
||||||
dropOwnerGroupAndPassOwnership(it_group); /// Rollback(SM1): nothrow
|
dropOwnerGroupAndPassOwnership(it_group, /* next= */ false); /// Rollback(SM1): nothrow
|
||||||
|
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
@ -253,28 +237,19 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
|
|||||||
* it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
|
* it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
|
||||||
* With the mutex locked the order of steps to restore the lock's state can be arbitrary
|
* With the mutex locked the order of steps to restore the lock's state can be arbitrary
|
||||||
*
|
*
|
||||||
* We do not employ try-catch: if something bad happens and chassert() is disabled, there is nothing we can do
|
* We do not employ try-catch: if something bad happens, there is nothing we can do =(
|
||||||
* (we can't throw an exception here because RWLockImpl::unlock() is called from the destructor ~LockHolderImpl).
|
|
||||||
*/
|
*/
|
||||||
void RWLockImpl::unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept
|
void RWLockImpl::unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept
|
||||||
{
|
{
|
||||||
std::lock_guard state_lock(internal_state_mtx);
|
std::lock_guard state_lock(internal_state_mtx);
|
||||||
|
|
||||||
/// Our group must be an owner here.
|
/// All of these are Undefined behavior and nothing we can do!
|
||||||
if (!group_it->ownership)
|
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
|
||||||
{
|
|
||||||
chassert(false && "RWLockImpl::unlock() is called for a non-owner group");
|
|
||||||
return;
|
return;
|
||||||
}
|
if (rdlock_owner != readers_queue.end() && group_it != rdlock_owner)
|
||||||
|
return;
|
||||||
/// Check consistency.
|
if (wrlock_owner != writers_queue.end() && group_it != wrlock_owner)
|
||||||
if ((group_it->type == Read)
|
|
||||||
? !(rdlock_owner != readers_queue.end() && wrlock_owner == writers_queue.end())
|
|
||||||
: !(wrlock_owner != writers_queue.end() && rdlock_owner == readers_queue.end() && group_it == wrlock_owner))
|
|
||||||
{
|
|
||||||
chassert(false && "RWLockImpl::unlock() found the rwlock inconsistent");
|
|
||||||
return;
|
return;
|
||||||
}
|
|
||||||
|
|
||||||
/// If query_id is not empty it must be listed in parent->owner_queries
|
/// If query_id is not empty it must be listed in parent->owner_queries
|
||||||
if (query_id != NO_QUERY)
|
if (query_id != NO_QUERY)
|
||||||
@ -289,26 +264,12 @@ void RWLockImpl::unlock(GroupsContainer::iterator group_it, const String & query
|
|||||||
|
|
||||||
/// If we are the last remaining referrer, remove this QNode and notify the next one
|
/// If we are the last remaining referrer, remove this QNode and notify the next one
|
||||||
if (--group_it->requests == 0) /// SM: nothrow
|
if (--group_it->requests == 0) /// SM: nothrow
|
||||||
dropOwnerGroupAndPassOwnership(group_it);
|
dropOwnerGroupAndPassOwnership(group_it, /* next= */ false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it) noexcept
|
void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it, bool next) noexcept
|
||||||
{
|
{
|
||||||
/// All readers with ownership must finish before switching to write phase.
|
|
||||||
/// Such readers has iterators from `readers_queue.begin()` to `rdlock_owner`, so if `rdlock_owner` is equal to `readers_queue.begin()`
|
|
||||||
/// that means there is only one reader with ownership left in the readers_queue and we can proceed to generic procedure.
|
|
||||||
if ((group_it->type == Read) && (rdlock_owner != readers_queue.begin()) && (rdlock_owner != readers_queue.end()))
|
|
||||||
{
|
|
||||||
if (rdlock_owner == group_it)
|
|
||||||
--rdlock_owner;
|
|
||||||
readers_queue.erase(group_it);
|
|
||||||
/// If there are no writers waiting in writers_queue then we can wake up other readers.
|
|
||||||
if (writers_queue.empty())
|
|
||||||
grantOwnershipToAllReaders();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
rdlock_owner = readers_queue.end();
|
rdlock_owner = readers_queue.end();
|
||||||
wrlock_owner = writers_queue.end();
|
wrlock_owner = writers_queue.end();
|
||||||
|
|
||||||
@ -317,86 +278,42 @@ void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_
|
|||||||
readers_queue.erase(group_it);
|
readers_queue.erase(group_it);
|
||||||
/// Prepare next phase
|
/// Prepare next phase
|
||||||
if (!writers_queue.empty())
|
if (!writers_queue.empty())
|
||||||
|
{
|
||||||
wrlock_owner = writers_queue.begin();
|
wrlock_owner = writers_queue.begin();
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
rdlock_owner = readers_queue.begin();
|
rdlock_owner = readers_queue.begin();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
writers_queue.erase(group_it);
|
writers_queue.erase(group_it);
|
||||||
/// Prepare next phase
|
/// Prepare next phase
|
||||||
if (!readers_queue.empty())
|
if (!readers_queue.empty())
|
||||||
rdlock_owner = readers_queue.begin();
|
{
|
||||||
|
if (next && readers_queue.size() > 1)
|
||||||
|
{
|
||||||
|
rdlock_owner = std::next(readers_queue.begin());
|
||||||
|
}
|
||||||
else
|
else
|
||||||
|
{
|
||||||
|
rdlock_owner = readers_queue.begin();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
wrlock_owner = writers_queue.begin();
|
wrlock_owner = writers_queue.begin();
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (rdlock_owner != readers_queue.end())
|
if (rdlock_owner != readers_queue.end())
|
||||||
{
|
{
|
||||||
grantOwnershipToAllReaders();
|
rdlock_owner->cv.notify_all();
|
||||||
}
|
}
|
||||||
else if (wrlock_owner != writers_queue.end())
|
else if (wrlock_owner != writers_queue.end())
|
||||||
{
|
{
|
||||||
grantOwnership(wrlock_owner);
|
wrlock_owner->cv.notify_one();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void RWLockImpl::grantOwnership(GroupsContainer::iterator group_it) noexcept
|
|
||||||
{
|
|
||||||
if (!group_it->ownership)
|
|
||||||
{
|
|
||||||
group_it->ownership = true;
|
|
||||||
group_it->cv.notify_all();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void RWLockImpl::grantOwnershipToAllReaders() noexcept
|
|
||||||
{
|
|
||||||
if (rdlock_owner != readers_queue.end())
|
|
||||||
{
|
|
||||||
size_t num_new_owners = 0;
|
|
||||||
|
|
||||||
for (;;)
|
|
||||||
{
|
|
||||||
if (!rdlock_owner->ownership)
|
|
||||||
++num_new_owners;
|
|
||||||
grantOwnership(rdlock_owner);
|
|
||||||
if (std::next(rdlock_owner) == readers_queue.end())
|
|
||||||
break;
|
|
||||||
++rdlock_owner;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// There couldn't be more than one reader group which is not an owner.
|
|
||||||
/// (Because we add a new reader group only if the last reader group is already an owner - see the `can_use_last_group` variable.)
|
|
||||||
chassert(num_new_owners <= 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
std::unordered_map<String, size_t> RWLockImpl::getOwnerQueryIds() const
|
|
||||||
{
|
|
||||||
std::lock_guard lock{internal_state_mtx};
|
|
||||||
return owner_queries;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
String RWLockImpl::getOwnerQueryIdsDescription() const
|
|
||||||
{
|
|
||||||
auto map = getOwnerQueryIds();
|
|
||||||
WriteBufferFromOwnString out;
|
|
||||||
bool need_comma = false;
|
|
||||||
for (const auto & [query_id, num_owners] : map)
|
|
||||||
{
|
|
||||||
if (need_comma)
|
|
||||||
out << ", ";
|
|
||||||
out << query_id;
|
|
||||||
if (num_owners != 1)
|
|
||||||
out << " (" << num_owners << ")";
|
|
||||||
need_comma = true;
|
|
||||||
}
|
|
||||||
return out.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -62,42 +62,35 @@ public:
|
|||||||
inline static const String NO_QUERY = String();
|
inline static const String NO_QUERY = String();
|
||||||
inline static const auto default_locking_timeout_ms = std::chrono::milliseconds(120000);
|
inline static const auto default_locking_timeout_ms = std::chrono::milliseconds(120000);
|
||||||
|
|
||||||
/// Returns all query_id owning locks (both read and write) right now.
|
|
||||||
/// !! This function are for debugging and logging purposes only, DO NOT use them for synchronization!
|
|
||||||
std::unordered_map<String, size_t> getOwnerQueryIds() const;
|
|
||||||
String getOwnerQueryIdsDescription() const;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/// Group of locking requests that should be granted simultaneously
|
/// Group of locking requests that should be granted simultaneously
|
||||||
/// i.e. one or several readers or a single writer
|
/// i.e. one or several readers or a single writer
|
||||||
struct Group
|
struct Group
|
||||||
{
|
{
|
||||||
const Type type;
|
const Type type;
|
||||||
size_t requests = 0;
|
size_t requests;
|
||||||
|
|
||||||
bool ownership = false; /// whether this group got ownership? (that means `cv` is notified and the locking requests should stop waiting)
|
|
||||||
std::condition_variable cv; /// all locking requests of the group wait on this condvar
|
std::condition_variable cv; /// all locking requests of the group wait on this condvar
|
||||||
|
|
||||||
explicit Group(Type type_) : type{type_} {}
|
explicit Group(Type type_) : type{type_}, requests{0} {}
|
||||||
};
|
};
|
||||||
|
|
||||||
using GroupsContainer = std::list<Group>;
|
using GroupsContainer = std::list<Group>;
|
||||||
using OwnerQueryIds = std::unordered_map<String /* query_id */, size_t /* num_owners */>;
|
using OwnerQueryIds = std::unordered_map<String, size_t>;
|
||||||
|
|
||||||
mutable std::mutex internal_state_mtx;
|
mutable std::mutex internal_state_mtx;
|
||||||
|
|
||||||
GroupsContainer readers_queue;
|
GroupsContainer readers_queue;
|
||||||
GroupsContainer writers_queue;
|
GroupsContainer writers_queue;
|
||||||
GroupsContainer::iterator rdlock_owner{readers_queue.end()}; /// last group with ownership in readers_queue in read phase
|
GroupsContainer::iterator rdlock_owner{readers_queue.end()}; /// equals to readers_queue.begin() in read phase
|
||||||
/// or readers_queue.end() in writer phase
|
/// or readers_queue.end() otherwise
|
||||||
GroupsContainer::iterator wrlock_owner{writers_queue.end()}; /// equals to writers_queue.begin() in write phase
|
GroupsContainer::iterator wrlock_owner{writers_queue.end()}; /// equals to writers_queue.begin() in write phase
|
||||||
/// or writers_queue.end() in read phase
|
/// or writers_queue.end() otherwise
|
||||||
OwnerQueryIds owner_queries;
|
OwnerQueryIds owner_queries;
|
||||||
|
|
||||||
RWLockImpl() = default;
|
RWLockImpl() = default;
|
||||||
void unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept;
|
void unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept;
|
||||||
void dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it) noexcept;
|
/// @param next - notify next after begin, used on writer lock failures
|
||||||
void grantOwnership(GroupsContainer::iterator group_it) noexcept;
|
void dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it, bool next) noexcept;
|
||||||
void grantOwnershipToAllReaders() noexcept;
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -24,39 +24,6 @@ namespace DB
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
namespace
|
|
||||||
{
|
|
||||||
class Events
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
Events() : start_time(std::chrono::steady_clock::now()) {}
|
|
||||||
|
|
||||||
void add(String && event)
|
|
||||||
{
|
|
||||||
String timepoint = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count());
|
|
||||||
if (timepoint.length() < 5)
|
|
||||||
timepoint.insert(0, 5 - timepoint.length(), ' ');
|
|
||||||
std::lock_guard lock{mutex};
|
|
||||||
//std::cout << timepoint << " : " << event << std::endl;
|
|
||||||
events.emplace_back(std::move(event));
|
|
||||||
}
|
|
||||||
|
|
||||||
void check(const Strings & expected_events)
|
|
||||||
{
|
|
||||||
std::lock_guard lock{mutex};
|
|
||||||
EXPECT_EQ(events.size(), expected_events.size());
|
|
||||||
for (size_t i = 0; i != events.size(); ++i)
|
|
||||||
EXPECT_EQ(events[i], (i < expected_events.size() ? expected_events[i] : ""));
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
const std::chrono::time_point<std::chrono::steady_clock> start_time;
|
|
||||||
Strings events TSA_GUARDED_BY(mutex);
|
|
||||||
mutable std::mutex mutex;
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
TEST(Common, RWLock1)
|
TEST(Common, RWLock1)
|
||||||
{
|
{
|
||||||
/// Tests with threads require this, because otherwise
|
/// Tests with threads require this, because otherwise
|
||||||
@ -320,200 +287,3 @@ TEST(Common, RWLockNotUpgradeableWithNoQuery)
|
|||||||
|
|
||||||
read_thread.join();
|
read_thread.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
TEST(Common, RWLockWriteLockTimeoutDuringRead)
|
|
||||||
{
|
|
||||||
static auto rw_lock = RWLockImpl::create();
|
|
||||||
Events events;
|
|
||||||
|
|
||||||
std::thread ra_thread([&] ()
|
|
||||||
{
|
|
||||||
events.add("Locking ra");
|
|
||||||
auto ra = rw_lock->getLock(RWLockImpl::Read, "ra");
|
|
||||||
events.add(ra ? "Locked ra" : "Failed to lock ra");
|
|
||||||
EXPECT_NE(ra, nullptr);
|
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(400));
|
|
||||||
|
|
||||||
events.add("Unlocking ra");
|
|
||||||
ra.reset();
|
|
||||||
events.add("Unlocked ra");
|
|
||||||
});
|
|
||||||
|
|
||||||
std::thread wc_thread([&] ()
|
|
||||||
{
|
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(100));
|
|
||||||
events.add("Locking wc");
|
|
||||||
auto wc = rw_lock->getLock(RWLockImpl::Write, "wc", std::chrono::milliseconds(200));
|
|
||||||
events.add(wc ? "Locked wc" : "Failed to lock wc");
|
|
||||||
EXPECT_EQ(wc, nullptr);
|
|
||||||
});
|
|
||||||
|
|
||||||
ra_thread.join();
|
|
||||||
wc_thread.join();
|
|
||||||
|
|
||||||
{
|
|
||||||
events.add("Locking wd");
|
|
||||||
auto wd = rw_lock->getLock(RWLockImpl::Write, "wd", std::chrono::milliseconds(1000));
|
|
||||||
events.add(wd ? "Locked wd" : "Failed to lock wd");
|
|
||||||
EXPECT_NE(wd, nullptr);
|
|
||||||
events.add("Unlocking wd");
|
|
||||||
wd.reset();
|
|
||||||
events.add("Unlocked wd");
|
|
||||||
}
|
|
||||||
|
|
||||||
events.check(
|
|
||||||
{"Locking ra",
|
|
||||||
"Locked ra",
|
|
||||||
"Locking wc",
|
|
||||||
"Failed to lock wc",
|
|
||||||
"Unlocking ra",
|
|
||||||
"Unlocked ra",
|
|
||||||
"Locking wd",
|
|
||||||
"Locked wd",
|
|
||||||
"Unlocking wd",
|
|
||||||
"Unlocked wd"});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
TEST(Common, RWLockWriteLockTimeoutDuringTwoReads)
|
|
||||||
{
|
|
||||||
static auto rw_lock = RWLockImpl::create();
|
|
||||||
Events events;
|
|
||||||
|
|
||||||
std::thread ra_thread([&] ()
|
|
||||||
{
|
|
||||||
events.add("Locking ra");
|
|
||||||
auto ra = rw_lock->getLock(RWLockImpl::Read, "ra");
|
|
||||||
events.add(ra ? "Locked ra" : "Failed to lock ra");
|
|
||||||
EXPECT_NE(ra, nullptr);
|
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(400));
|
|
||||||
|
|
||||||
events.add("Unlocking ra");
|
|
||||||
ra.reset();
|
|
||||||
events.add("Unlocked ra");
|
|
||||||
});
|
|
||||||
|
|
||||||
std::thread rb_thread([&] ()
|
|
||||||
{
|
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
|
|
||||||
events.add("Locking rb");
|
|
||||||
|
|
||||||
auto rb = rw_lock->getLock(RWLockImpl::Read, "rb");
|
|
||||||
events.add(rb ? "Locked rb" : "Failed to lock rb");
|
|
||||||
EXPECT_NE(rb, nullptr);
|
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
|
|
||||||
events.add("Unlocking rb");
|
|
||||||
rb.reset();
|
|
||||||
events.add("Unlocked rb");
|
|
||||||
});
|
|
||||||
|
|
||||||
std::thread wc_thread([&] ()
|
|
||||||
{
|
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(100));
|
|
||||||
events.add("Locking wc");
|
|
||||||
auto wc = rw_lock->getLock(RWLockImpl::Write, "wc", std::chrono::milliseconds(200));
|
|
||||||
events.add(wc ? "Locked wc" : "Failed to lock wc");
|
|
||||||
EXPECT_EQ(wc, nullptr);
|
|
||||||
});
|
|
||||||
|
|
||||||
ra_thread.join();
|
|
||||||
rb_thread.join();
|
|
||||||
wc_thread.join();
|
|
||||||
|
|
||||||
{
|
|
||||||
events.add("Locking wd");
|
|
||||||
auto wd = rw_lock->getLock(RWLockImpl::Write, "wd", std::chrono::milliseconds(1000));
|
|
||||||
events.add(wd ? "Locked wd" : "Failed to lock wd");
|
|
||||||
EXPECT_NE(wd, nullptr);
|
|
||||||
events.add("Unlocking wd");
|
|
||||||
wd.reset();
|
|
||||||
events.add("Unlocked wd");
|
|
||||||
}
|
|
||||||
|
|
||||||
events.check(
|
|
||||||
{"Locking ra",
|
|
||||||
"Locked ra",
|
|
||||||
"Locking wc",
|
|
||||||
"Locking rb",
|
|
||||||
"Failed to lock wc",
|
|
||||||
"Locked rb",
|
|
||||||
"Unlocking ra",
|
|
||||||
"Unlocked ra",
|
|
||||||
"Unlocking rb",
|
|
||||||
"Unlocked rb",
|
|
||||||
"Locking wd",
|
|
||||||
"Locked wd",
|
|
||||||
"Unlocking wd",
|
|
||||||
"Unlocked wd"});
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
TEST(Common, RWLockWriteLockTimeoutDuringWriteWithWaitingRead)
|
|
||||||
{
|
|
||||||
static auto rw_lock = RWLockImpl::create();
|
|
||||||
Events events;
|
|
||||||
|
|
||||||
std::thread wa_thread([&] ()
|
|
||||||
{
|
|
||||||
events.add("Locking wa");
|
|
||||||
auto wa = rw_lock->getLock(RWLockImpl::Write, "wa");
|
|
||||||
events.add(wa ? "Locked wa" : "Failed to lock wa");
|
|
||||||
EXPECT_NE(wa, nullptr);
|
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(500));
|
|
||||||
|
|
||||||
events.add("Unlocking wa");
|
|
||||||
wa.reset();
|
|
||||||
events.add("Unlocked wa");
|
|
||||||
});
|
|
||||||
|
|
||||||
std::thread wb_thread([&] ()
|
|
||||||
{
|
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(100));
|
|
||||||
events.add("Locking wb");
|
|
||||||
auto wc = rw_lock->getLock(RWLockImpl::Write, "wc", std::chrono::milliseconds(200));
|
|
||||||
events.add(wc ? "Locked wb" : "Failed to lock wb");
|
|
||||||
EXPECT_EQ(wc, nullptr);
|
|
||||||
});
|
|
||||||
|
|
||||||
std::thread rc_thread([&] ()
|
|
||||||
{
|
|
||||||
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
|
|
||||||
events.add("Locking rc");
|
|
||||||
auto rc = rw_lock->getLock(RWLockImpl::Read, "rc", std::chrono::milliseconds(200));
|
|
||||||
events.add(rc ? "Locked rc" : "Failed to lock rc");
|
|
||||||
EXPECT_EQ(rc, nullptr);
|
|
||||||
});
|
|
||||||
|
|
||||||
wa_thread.join();
|
|
||||||
wb_thread.join();
|
|
||||||
rc_thread.join();
|
|
||||||
|
|
||||||
{
|
|
||||||
events.add("Locking wd");
|
|
||||||
auto wd = rw_lock->getLock(RWLockImpl::Write, "wd", std::chrono::milliseconds(1000));
|
|
||||||
events.add(wd ? "Locked wd" : "Failed to lock wd");
|
|
||||||
EXPECT_NE(wd, nullptr);
|
|
||||||
events.add("Unlocking wd");
|
|
||||||
wd.reset();
|
|
||||||
events.add("Unlocked wd");
|
|
||||||
}
|
|
||||||
|
|
||||||
events.check(
|
|
||||||
{"Locking wa",
|
|
||||||
"Locked wa",
|
|
||||||
"Locking wb",
|
|
||||||
"Locking rc",
|
|
||||||
"Failed to lock wb",
|
|
||||||
"Failed to lock rc",
|
|
||||||
"Unlocking wa",
|
|
||||||
"Unlocked wa",
|
|
||||||
"Locking wd",
|
|
||||||
"Locked wd",
|
|
||||||
"Unlocking wd",
|
|
||||||
"Unlocked wd"});
|
|
||||||
}
|
|
||||||
|
@ -41,8 +41,8 @@ RWLockImpl::LockHolder IStorage::tryLockTimed(
|
|||||||
{
|
{
|
||||||
const String type_str = type == RWLockImpl::Type::Read ? "READ" : "WRITE";
|
const String type_str = type == RWLockImpl::Type::Read ? "READ" : "WRITE";
|
||||||
throw Exception(ErrorCodes::DEADLOCK_AVOIDED,
|
throw Exception(ErrorCodes::DEADLOCK_AVOIDED,
|
||||||
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry. Owner query ids: {}",
|
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry",
|
||||||
type_str, getStorageID(), acquire_timeout.count(), rwlock->getOwnerQueryIdsDescription());
|
type_str, getStorageID(), acquire_timeout.count());
|
||||||
}
|
}
|
||||||
return lock_holder;
|
return lock_holder;
|
||||||
}
|
}
|
||||||
|
@ -216,7 +216,7 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
|
|||||||
node = nodes[randint(0, num_nodes - 1)]
|
node = nodes[randint(0, num_nodes - 1)]
|
||||||
# "DROP TABLE IF EXISTS" still can throw some errors (e.g. "WRITE locking attempt on node0 has timed out!")
|
# "DROP TABLE IF EXISTS" still can throw some errors (e.g. "WRITE locking attempt on node0 has timed out!")
|
||||||
# So we use query_and_get_answer_with_error() to ignore any errors.
|
# So we use query_and_get_answer_with_error() to ignore any errors.
|
||||||
# `lock_acquire_timeout` is reduced because we don't wait our test to wait too long.
|
# `lock_acquire_timeout` is also reduced because we don't wait our test to wait too long.
|
||||||
node.query_and_get_answer_with_error(
|
node.query_and_get_answer_with_error(
|
||||||
f"DROP TABLE IF EXISTS {table_name} SYNC",
|
f"DROP TABLE IF EXISTS {table_name} SYNC",
|
||||||
settings={"lock_acquire_timeout": 10},
|
settings={"lock_acquire_timeout": 10},
|
||||||
@ -227,24 +227,15 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
|
|||||||
table_name1 = f"mydb.tbl{randint(1, num_nodes)}"
|
table_name1 = f"mydb.tbl{randint(1, num_nodes)}"
|
||||||
table_name2 = f"mydb.tbl{randint(1, num_nodes)}"
|
table_name2 = f"mydb.tbl{randint(1, num_nodes)}"
|
||||||
node = nodes[randint(0, num_nodes - 1)]
|
node = nodes[randint(0, num_nodes - 1)]
|
||||||
# `lock_acquire_timeout` is reduced because we don't wait our test to wait too long.
|
|
||||||
node.query_and_get_answer_with_error(
|
node.query_and_get_answer_with_error(
|
||||||
f"RENAME TABLE {table_name1} TO {table_name2}",
|
f"RENAME TABLE {table_name1} TO {table_name2}"
|
||||||
settings={"lock_acquire_timeout": 10},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def truncate_tables():
|
def truncate_tables():
|
||||||
while time.time() < end_time:
|
while time.time() < end_time:
|
||||||
table_name = f"mydb.tbl{randint(1, num_nodes)}"
|
table_name = f"mydb.tbl{randint(1, num_nodes)}"
|
||||||
node = nodes[randint(0, num_nodes - 1)]
|
node = nodes[randint(0, num_nodes - 1)]
|
||||||
# "TRUNCATE TABLE IF EXISTS" still can throw some errors
|
node.query(f"TRUNCATE TABLE IF EXISTS {table_name} SYNC")
|
||||||
# (e.g. "WRITE locking attempt on node0 has timed out!" if the table engine is "Log").
|
|
||||||
# So we use query_and_get_answer_with_error() to ignore any errors.
|
|
||||||
# `lock_acquire_timeout` is reduced because we don't wait our test to wait too long.
|
|
||||||
node.query_and_get_answer_with_error(
|
|
||||||
f"TRUNCATE TABLE IF EXISTS {table_name} SYNC",
|
|
||||||
settings={"lock_acquire_timeout": 10},
|
|
||||||
)
|
|
||||||
|
|
||||||
def make_backups():
|
def make_backups():
|
||||||
ids = []
|
ids = []
|
||||||
|
Loading…
Reference in New Issue
Block a user