fix try_shared_lock() in SharedMutex and CancelableSharedMutex

This commit is contained in:
serxa 2023-01-24 14:32:36 +00:00
parent 0fbfa17863
commit 51da43d6cf
3 changed files with 63 additions and 10 deletions

View File

@ -98,9 +98,15 @@ void CancelableSharedMutex::lock_shared()
bool CancelableSharedMutex::try_lock_shared() bool CancelableSharedMutex::try_lock_shared()
{ {
UInt64 value = state.load(); UInt64 value = state.load();
if (!(value & writers) && state.compare_exchange_strong(value, value + 1)) // overflow is not realistic while (true)
return true; {
return false; if (value & writers)
return false;
if (state.compare_exchange_strong(value, value + 1)) // overflow is not realistic
break;
// Concurrent try_lock_shared() should not fail, so we have to retry CAS, but avoid blocking wait
}
return true;
} }
void CancelableSharedMutex::unlock_shared() void CancelableSharedMutex::unlock_shared()

View File

@ -37,9 +37,7 @@ void SharedMutex::lock()
bool SharedMutex::try_lock() bool SharedMutex::try_lock()
{ {
UInt64 value = 0; UInt64 value = 0;
if (state.compare_exchange_strong(value, writers)) return state.compare_exchange_strong(value, writers);
return true;
return false;
} }
void SharedMutex::unlock() void SharedMutex::unlock()
@ -68,9 +66,15 @@ void SharedMutex::lock_shared()
bool SharedMutex::try_lock_shared() bool SharedMutex::try_lock_shared()
{ {
UInt64 value = state.load(); UInt64 value = state.load();
if (!(value & writers) && state.compare_exchange_strong(value, value + 1)) while (true)
return true; {
return false; if (value & writers)
return false;
if (state.compare_exchange_strong(value, value + 1))
break;
// Concurrent try_lock_shared() should not fail, so we have to retry CAS, but avoid blocking wait
}
return true;
} }
void SharedMutex::unlock_shared() void SharedMutex::unlock_shared()

View File

@ -27,7 +27,7 @@ namespace DB
struct NoCancel {}; struct NoCancel {};
// for all PerfTests // for all PerfTests
static constexpr int requests = 512 * 1024; static constexpr int requests = 128 * 1024;
static constexpr int max_threads = 16; static constexpr int max_threads = 16;
template <class T, class Status = NoCancel> template <class T, class Status = NoCancel>
@ -91,6 +91,49 @@ void TestSharedMutex()
ASSERT_EQ(test, writers); ASSERT_EQ(test, writers);
} }
// Test multiple readers can acquire lock simultaneously using try_shared_lock
for (int readers = 1; readers <= 128; readers *= 2)
{
T sm;
std::atomic<int> test(0);
std::barrier sync(readers + 1);
std::vector<std::thread> threads;
threads.reserve(readers);
auto reader = [&]
{
[[maybe_unused]] Status status;
bool acquired = sm.try_lock_shared();
ASSERT_TRUE(acquired);
if (!acquired) return; // Just to make TSA happy
sync.arrive_and_wait(); // (A) sync with writer
test++;
sync.arrive_and_wait(); // (B) wait for writer to call try_lock() while shared_lock is held
sm.unlock_shared();
sync.arrive_and_wait(); // (C) wait for writer to release lock, to ensure try_lock_shared() will see no writer
};
for (int i = 0; i < readers; i++)
threads.emplace_back(reader);
{ // writer
[[maybe_unused]] Status status;
sync.arrive_and_wait(); // (A) wait for all reader to acquire lock to avoid blocking them
ASSERT_FALSE(sm.try_lock());
sync.arrive_and_wait(); // (B) sync with readers
{
std::unique_lock lock(sm);
test++;
}
sync.arrive_and_wait(); // (C) sync with readers
}
for (auto & thread : threads)
thread.join();
ASSERT_EQ(test, readers + 1);
}
} }
template <class T, class Status = NoCancel> template <class T, class Status = NoCancel>