Revert "Fix RWLock inconsistency after write lock timeout"

This commit is contained in:
Alexey Milovidov 2023-12-10 23:51:59 +03:00 committed by GitHub
parent 0a20ce5d32
commit 644ef7b63f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 376 deletions

View File

@ -3,8 +3,6 @@
#include <Common/Exception.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
namespace ProfileEvents
@ -157,34 +155,25 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
if (type == Type::Write)
{
/// Always add a group for a writer (writes are never performed simultaneously).
writers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
else
else if (readers_queue.empty() ||
(rdlock_owner == readers_queue.begin() && readers_queue.size() == 1 && !writers_queue.empty()))
{
/// We don't always add a group to readers_queue here because multiple readers can use the same group.
/// We can reuse the last group if the last group didn't get ownership yet,
/// or even if it got ownership but there are no writers waiting in writers_queue.
bool can_use_last_group = !readers_queue.empty() && (!readers_queue.back().ownership || writers_queue.empty());
if (!can_use_last_group)
readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
readers_queue.emplace_back(type); /// SM1: may throw (nothing to roll back)
}
GroupsContainer::iterator it_group =
(type == Type::Write) ? std::prev(writers_queue.end()) : std::prev(readers_queue.end());
/// Lock is free to acquire
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
{
/// Set `rdlock_owner` or `wrlock_owner` and make it owner.
(type == Read ? rdlock_owner : wrlock_owner) = it_group; /// SM2: nothrow
grantOwnership(it_group);
}
else
{
/// Wait until our group becomes the lock owner
const auto predicate = [&] () { return it_group->ownership; };
const auto predicate = [&] () { return it_group == (type == Read ? rdlock_owner : wrlock_owner); };
if (lock_deadline_tp == std::chrono::time_point<std::chrono::steady_clock>::max())
{
@ -204,20 +193,15 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
/// Rollback(SM1): nothrow
if (it_group->requests == 0)
{
((type == Read) ? readers_queue : writers_queue).erase(it_group);
/// When WRITE lock fails, we need to notify next read that is waiting,
/// to avoid handing request, hence next=true.
dropOwnerGroupAndPassOwnership(it_group, /* next= */ true);
}
/// While we were waiting for this write lock (which has just failed) more readers could start waiting,
/// we need to wake up them now.
if ((rdlock_owner != readers_queue.end()) && writers_queue.empty())
grantOwnershipToAllReaders();
return nullptr;
}
}
}
/// Our group must be an owner here.
chassert(it_group->ownership);
if (request_has_query_id)
{
try
@ -232,7 +216,7 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
/// Methods std::list<>::emplace_back() and std::unordered_map<>::emplace() provide strong exception safety
/// We only need to roll back the changes to these objects: owner_queries and the readers/writers queue
if (it_group->requests == 0)
dropOwnerGroupAndPassOwnership(it_group); /// Rollback(SM1): nothrow
dropOwnerGroupAndPassOwnership(it_group, /* next= */ false); /// Rollback(SM1): nothrow
throw;
}
@ -253,28 +237,19 @@ RWLockImpl::getLock(RWLockImpl::Type type, const String & query_id, const std::c
* it is guaranteed that all three steps have been executed successfully and the resulting state is consistent.
* With the mutex locked the order of steps to restore the lock's state can be arbitrary
*
* We do not employ try-catch: if something bad happens and chassert() is disabled, there is nothing we can do
* (we can't throw an exception here because RWLockImpl::unlock() is called from the destructor ~LockHolderImpl).
* We do not employ try-catch: if something bad happens, there is nothing we can do =(
*/
void RWLockImpl::unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept
{
std::lock_guard state_lock(internal_state_mtx);
/// Our group must be an owner here.
if (!group_it->ownership)
{
chassert(false && "RWLockImpl::unlock() is called for a non-owner group");
/// All of these are Undefined behavior and nothing we can do!
if (rdlock_owner == readers_queue.end() && wrlock_owner == writers_queue.end())
return;
}
/// Check consistency.
if ((group_it->type == Read)
? !(rdlock_owner != readers_queue.end() && wrlock_owner == writers_queue.end())
: !(wrlock_owner != writers_queue.end() && rdlock_owner == readers_queue.end() && group_it == wrlock_owner))
{
chassert(false && "RWLockImpl::unlock() found the rwlock inconsistent");
if (rdlock_owner != readers_queue.end() && group_it != rdlock_owner)
return;
if (wrlock_owner != writers_queue.end() && group_it != wrlock_owner)
return;
}
/// If query_id is not empty it must be listed in parent->owner_queries
if (query_id != NO_QUERY)
@ -289,26 +264,12 @@ void RWLockImpl::unlock(GroupsContainer::iterator group_it, const String & query
/// If we are the last remaining referrer, remove this QNode and notify the next one
if (--group_it->requests == 0) /// SM: nothrow
dropOwnerGroupAndPassOwnership(group_it);
dropOwnerGroupAndPassOwnership(group_it, /* next= */ false);
}
void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it) noexcept
void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it, bool next) noexcept
{
/// All readers with ownership must finish before switching to write phase.
/// Such readers has iterators from `readers_queue.begin()` to `rdlock_owner`, so if `rdlock_owner` is equal to `readers_queue.begin()`
/// that means there is only one reader with ownership left in the readers_queue and we can proceed to generic procedure.
if ((group_it->type == Read) && (rdlock_owner != readers_queue.begin()) && (rdlock_owner != readers_queue.end()))
{
if (rdlock_owner == group_it)
--rdlock_owner;
readers_queue.erase(group_it);
/// If there are no writers waiting in writers_queue then we can wake up other readers.
if (writers_queue.empty())
grantOwnershipToAllReaders();
return;
}
rdlock_owner = readers_queue.end();
wrlock_owner = writers_queue.end();
@ -317,86 +278,42 @@ void RWLockImpl::dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_
readers_queue.erase(group_it);
/// Prepare next phase
if (!writers_queue.empty())
{
wrlock_owner = writers_queue.begin();
}
else
{
rdlock_owner = readers_queue.begin();
}
}
else
{
writers_queue.erase(group_it);
/// Prepare next phase
if (!readers_queue.empty())
rdlock_owner = readers_queue.begin();
{
if (next && readers_queue.size() > 1)
{
rdlock_owner = std::next(readers_queue.begin());
}
else
{
rdlock_owner = readers_queue.begin();
}
}
else
{
wrlock_owner = writers_queue.begin();
}
}
if (rdlock_owner != readers_queue.end())
{
grantOwnershipToAllReaders();
rdlock_owner->cv.notify_all();
}
else if (wrlock_owner != writers_queue.end())
{
grantOwnership(wrlock_owner);
wrlock_owner->cv.notify_one();
}
}
void RWLockImpl::grantOwnership(GroupsContainer::iterator group_it) noexcept
{
if (!group_it->ownership)
{
group_it->ownership = true;
group_it->cv.notify_all();
}
}
void RWLockImpl::grantOwnershipToAllReaders() noexcept
{
if (rdlock_owner != readers_queue.end())
{
size_t num_new_owners = 0;
for (;;)
{
if (!rdlock_owner->ownership)
++num_new_owners;
grantOwnership(rdlock_owner);
if (std::next(rdlock_owner) == readers_queue.end())
break;
++rdlock_owner;
}
/// There couldn't be more than one reader group which is not an owner.
/// (Because we add a new reader group only if the last reader group is already an owner - see the `can_use_last_group` variable.)
chassert(num_new_owners <= 1);
}
}
std::unordered_map<String, size_t> RWLockImpl::getOwnerQueryIds() const
{
std::lock_guard lock{internal_state_mtx};
return owner_queries;
}
String RWLockImpl::getOwnerQueryIdsDescription() const
{
auto map = getOwnerQueryIds();
WriteBufferFromOwnString out;
bool need_comma = false;
for (const auto & [query_id, num_owners] : map)
{
if (need_comma)
out << ", ";
out << query_id;
if (num_owners != 1)
out << " (" << num_owners << ")";
need_comma = true;
}
return out.str();
}
}

View File

@ -62,42 +62,35 @@ public:
inline static const String NO_QUERY = String();
inline static const auto default_locking_timeout_ms = std::chrono::milliseconds(120000);
/// Returns all query_id owning locks (both read and write) right now.
/// !! This function are for debugging and logging purposes only, DO NOT use them for synchronization!
std::unordered_map<String, size_t> getOwnerQueryIds() const;
String getOwnerQueryIdsDescription() const;
private:
/// Group of locking requests that should be granted simultaneously
/// i.e. one or several readers or a single writer
struct Group
{
const Type type;
size_t requests = 0;
size_t requests;
bool ownership = false; /// whether this group got ownership? (that means `cv` is notified and the locking requests should stop waiting)
std::condition_variable cv; /// all locking requests of the group wait on this condvar
explicit Group(Type type_) : type{type_} {}
explicit Group(Type type_) : type{type_}, requests{0} {}
};
using GroupsContainer = std::list<Group>;
using OwnerQueryIds = std::unordered_map<String /* query_id */, size_t /* num_owners */>;
using OwnerQueryIds = std::unordered_map<String, size_t>;
mutable std::mutex internal_state_mtx;
GroupsContainer readers_queue;
GroupsContainer writers_queue;
GroupsContainer::iterator rdlock_owner{readers_queue.end()}; /// last group with ownership in readers_queue in read phase
/// or readers_queue.end() in writer phase
GroupsContainer::iterator rdlock_owner{readers_queue.end()}; /// equals to readers_queue.begin() in read phase
/// or readers_queue.end() otherwise
GroupsContainer::iterator wrlock_owner{writers_queue.end()}; /// equals to writers_queue.begin() in write phase
/// or writers_queue.end() in read phase
/// or writers_queue.end() otherwise
OwnerQueryIds owner_queries;
RWLockImpl() = default;
void unlock(GroupsContainer::iterator group_it, const String & query_id) noexcept;
void dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it) noexcept;
void grantOwnership(GroupsContainer::iterator group_it) noexcept;
void grantOwnershipToAllReaders() noexcept;
/// @param next - notify next after begin, used on writer lock failures
void dropOwnerGroupAndPassOwnership(GroupsContainer::iterator group_it, bool next) noexcept;
};
}

View File

@ -24,39 +24,6 @@ namespace DB
}
namespace
{
class Events
{
public:
Events() : start_time(std::chrono::steady_clock::now()) {}
void add(String && event)
{
String timepoint = std::to_string(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time).count());
if (timepoint.length() < 5)
timepoint.insert(0, 5 - timepoint.length(), ' ');
std::lock_guard lock{mutex};
//std::cout << timepoint << " : " << event << std::endl;
events.emplace_back(std::move(event));
}
void check(const Strings & expected_events)
{
std::lock_guard lock{mutex};
EXPECT_EQ(events.size(), expected_events.size());
for (size_t i = 0; i != events.size(); ++i)
EXPECT_EQ(events[i], (i < expected_events.size() ? expected_events[i] : ""));
}
private:
const std::chrono::time_point<std::chrono::steady_clock> start_time;
Strings events TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
};
}
TEST(Common, RWLock1)
{
/// Tests with threads require this, because otherwise
@ -320,200 +287,3 @@ TEST(Common, RWLockNotUpgradeableWithNoQuery)
read_thread.join();
}
TEST(Common, RWLockWriteLockTimeoutDuringRead)
{
static auto rw_lock = RWLockImpl::create();
Events events;
std::thread ra_thread([&] ()
{
events.add("Locking ra");
auto ra = rw_lock->getLock(RWLockImpl::Read, "ra");
events.add(ra ? "Locked ra" : "Failed to lock ra");
EXPECT_NE(ra, nullptr);
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(400));
events.add("Unlocking ra");
ra.reset();
events.add("Unlocked ra");
});
std::thread wc_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(100));
events.add("Locking wc");
auto wc = rw_lock->getLock(RWLockImpl::Write, "wc", std::chrono::milliseconds(200));
events.add(wc ? "Locked wc" : "Failed to lock wc");
EXPECT_EQ(wc, nullptr);
});
ra_thread.join();
wc_thread.join();
{
events.add("Locking wd");
auto wd = rw_lock->getLock(RWLockImpl::Write, "wd", std::chrono::milliseconds(1000));
events.add(wd ? "Locked wd" : "Failed to lock wd");
EXPECT_NE(wd, nullptr);
events.add("Unlocking wd");
wd.reset();
events.add("Unlocked wd");
}
events.check(
{"Locking ra",
"Locked ra",
"Locking wc",
"Failed to lock wc",
"Unlocking ra",
"Unlocked ra",
"Locking wd",
"Locked wd",
"Unlocking wd",
"Unlocked wd"});
}
TEST(Common, RWLockWriteLockTimeoutDuringTwoReads)
{
static auto rw_lock = RWLockImpl::create();
Events events;
std::thread ra_thread([&] ()
{
events.add("Locking ra");
auto ra = rw_lock->getLock(RWLockImpl::Read, "ra");
events.add(ra ? "Locked ra" : "Failed to lock ra");
EXPECT_NE(ra, nullptr);
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(400));
events.add("Unlocking ra");
ra.reset();
events.add("Unlocked ra");
});
std::thread rb_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
events.add("Locking rb");
auto rb = rw_lock->getLock(RWLockImpl::Read, "rb");
events.add(rb ? "Locked rb" : "Failed to lock rb");
EXPECT_NE(rb, nullptr);
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
events.add("Unlocking rb");
rb.reset();
events.add("Unlocked rb");
});
std::thread wc_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(100));
events.add("Locking wc");
auto wc = rw_lock->getLock(RWLockImpl::Write, "wc", std::chrono::milliseconds(200));
events.add(wc ? "Locked wc" : "Failed to lock wc");
EXPECT_EQ(wc, nullptr);
});
ra_thread.join();
rb_thread.join();
wc_thread.join();
{
events.add("Locking wd");
auto wd = rw_lock->getLock(RWLockImpl::Write, "wd", std::chrono::milliseconds(1000));
events.add(wd ? "Locked wd" : "Failed to lock wd");
EXPECT_NE(wd, nullptr);
events.add("Unlocking wd");
wd.reset();
events.add("Unlocked wd");
}
events.check(
{"Locking ra",
"Locked ra",
"Locking wc",
"Locking rb",
"Failed to lock wc",
"Locked rb",
"Unlocking ra",
"Unlocked ra",
"Unlocking rb",
"Unlocked rb",
"Locking wd",
"Locked wd",
"Unlocking wd",
"Unlocked wd"});
}
TEST(Common, RWLockWriteLockTimeoutDuringWriteWithWaitingRead)
{
static auto rw_lock = RWLockImpl::create();
Events events;
std::thread wa_thread([&] ()
{
events.add("Locking wa");
auto wa = rw_lock->getLock(RWLockImpl::Write, "wa");
events.add(wa ? "Locked wa" : "Failed to lock wa");
EXPECT_NE(wa, nullptr);
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(500));
events.add("Unlocking wa");
wa.reset();
events.add("Unlocked wa");
});
std::thread wb_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(100));
events.add("Locking wb");
auto wc = rw_lock->getLock(RWLockImpl::Write, "wc", std::chrono::milliseconds(200));
events.add(wc ? "Locked wb" : "Failed to lock wb");
EXPECT_EQ(wc, nullptr);
});
std::thread rc_thread([&] ()
{
std::this_thread::sleep_for(std::chrono::duration<int, std::milli>(200));
events.add("Locking rc");
auto rc = rw_lock->getLock(RWLockImpl::Read, "rc", std::chrono::milliseconds(200));
events.add(rc ? "Locked rc" : "Failed to lock rc");
EXPECT_EQ(rc, nullptr);
});
wa_thread.join();
wb_thread.join();
rc_thread.join();
{
events.add("Locking wd");
auto wd = rw_lock->getLock(RWLockImpl::Write, "wd", std::chrono::milliseconds(1000));
events.add(wd ? "Locked wd" : "Failed to lock wd");
EXPECT_NE(wd, nullptr);
events.add("Unlocking wd");
wd.reset();
events.add("Unlocked wd");
}
events.check(
{"Locking wa",
"Locked wa",
"Locking wb",
"Locking rc",
"Failed to lock wb",
"Failed to lock rc",
"Unlocking wa",
"Unlocked wa",
"Locking wd",
"Locked wd",
"Unlocking wd",
"Unlocked wd"});
}

View File

@ -41,8 +41,8 @@ RWLockImpl::LockHolder IStorage::tryLockTimed(
{
const String type_str = type == RWLockImpl::Type::Read ? "READ" : "WRITE";
throw Exception(ErrorCodes::DEADLOCK_AVOIDED,
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry. Owner query ids: {}",
type_str, getStorageID(), acquire_timeout.count(), rwlock->getOwnerQueryIdsDescription());
"{} locking attempt on \"{}\" has timed out! ({}ms) Possible deadlock avoided. Client should retry",
type_str, getStorageID(), acquire_timeout.count());
}
return lock_holder;
}

View File

@ -216,7 +216,7 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
node = nodes[randint(0, num_nodes - 1)]
# "DROP TABLE IF EXISTS" still can throw some errors (e.g. "WRITE locking attempt on node0 has timed out!")
# So we use query_and_get_answer_with_error() to ignore any errors.
# `lock_acquire_timeout` is reduced because we don't wait our test to wait too long.
# `lock_acquire_timeout` is also reduced because we don't wait our test to wait too long.
node.query_and_get_answer_with_error(
f"DROP TABLE IF EXISTS {table_name} SYNC",
settings={"lock_acquire_timeout": 10},
@ -227,24 +227,15 @@ def test_create_or_drop_tables_during_backup(db_engine, table_engine):
table_name1 = f"mydb.tbl{randint(1, num_nodes)}"
table_name2 = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)]
# `lock_acquire_timeout` is reduced because we don't wait our test to wait too long.
node.query_and_get_answer_with_error(
f"RENAME TABLE {table_name1} TO {table_name2}",
settings={"lock_acquire_timeout": 10},
f"RENAME TABLE {table_name1} TO {table_name2}"
)
def truncate_tables():
while time.time() < end_time:
table_name = f"mydb.tbl{randint(1, num_nodes)}"
node = nodes[randint(0, num_nodes - 1)]
# "TRUNCATE TABLE IF EXISTS" still can throw some errors
# (e.g. "WRITE locking attempt on node0 has timed out!" if the table engine is "Log").
# So we use query_and_get_answer_with_error() to ignore any errors.
# `lock_acquire_timeout` is reduced because we don't wait our test to wait too long.
node.query_and_get_answer_with_error(
f"TRUNCATE TABLE IF EXISTS {table_name} SYNC",
settings={"lock_acquire_timeout": 10},
)
node.query(f"TRUNCATE TABLE IF EXISTS {table_name} SYNC")
def make_backups():
ids = []