Merge pull request #45007 from ClickHouse/cancellable-mutex-integration

Fast shared mutex integration
This commit is contained in:
Sergei Trifonov 2023-01-25 11:15:46 +01:00 committed by GitHub
commit 0d1ea05ff6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 138 additions and 86 deletions

View File

@ -98,9 +98,15 @@ void CancelableSharedMutex::lock_shared()
bool CancelableSharedMutex::try_lock_shared()
{
UInt64 value = state.load();
if (!(value & writers) && state.compare_exchange_strong(value, value + 1)) // overflow is not realistic
return true;
while (true)
{
if (value & writers)
return false;
if (state.compare_exchange_strong(value, value + 1)) // overflow is not realistic
break;
// Concurrent try_lock_shared() should not fail, so we have to retry CAS, but avoid blocking wait
}
return true;
}
void CancelableSharedMutex::unlock_shared()

View File

@ -1,6 +1,6 @@
#pragma once
#include <shared_mutex>
#include <Common/SharedMutex.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
@ -12,7 +12,7 @@ class ProfilingScopedWriteRWLock
{
public:
ProfilingScopedWriteRWLock(std::shared_mutex & rwl_, ProfileEvents::Event event) :
ProfilingScopedWriteRWLock(SharedMutex & rwl_, ProfileEvents::Event event) :
scoped_write_lock(rwl_)
{
ProfileEvents::increment(event, watch.elapsed());
@ -20,14 +20,14 @@ public:
private:
Stopwatch watch;
std::unique_lock<std::shared_mutex> scoped_write_lock;
std::unique_lock<SharedMutex> scoped_write_lock;
};
class ProfilingScopedReadRWLock
{
public:
ProfilingScopedReadRWLock(std::shared_mutex & rwl, ProfileEvents::Event event) :
ProfilingScopedReadRWLock(SharedMutex & rwl, ProfileEvents::Event event) :
scoped_read_lock(rwl)
{
ProfileEvents::increment(event, watch.elapsed());
@ -35,7 +35,7 @@ public:
private:
Stopwatch watch;
std::shared_lock<std::shared_mutex> scoped_read_lock;
std::shared_lock<SharedMutex> scoped_read_lock;
};
}

View File

@ -37,9 +37,7 @@ void SharedMutex::lock()
bool SharedMutex::try_lock()
{
UInt64 value = 0;
if (state.compare_exchange_strong(value, writers))
return true;
return false;
return state.compare_exchange_strong(value, writers);
}
void SharedMutex::unlock()
@ -68,9 +66,15 @@ void SharedMutex::lock_shared()
bool SharedMutex::try_lock_shared()
{
UInt64 value = state.load();
if (!(value & writers) && state.compare_exchange_strong(value, value + 1))
return true;
while (true)
{
if (value & writers)
return false;
if (state.compare_exchange_strong(value, value + 1))
break;
// Concurrent try_lock_shared() should not fail, so we have to retry CAS, but avoid blocking wait
}
return true;
}
void SharedMutex::unlock_shared()

View File

@ -13,7 +13,6 @@
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_set>

View File

@ -27,7 +27,7 @@ namespace DB
struct NoCancel {};
// for all PerfTests
static constexpr int requests = 512 * 1024;
static constexpr int requests = 128 * 1024;
static constexpr int max_threads = 16;
template <class T, class Status = NoCancel>
@ -91,6 +91,49 @@ void TestSharedMutex()
ASSERT_EQ(test, writers);
}
// Test multiple readers can acquire lock simultaneously using try_shared_lock
for (int readers = 1; readers <= 128; readers *= 2)
{
T sm;
std::atomic<int> test(0);
std::barrier sync(readers + 1);
std::vector<std::thread> threads;
threads.reserve(readers);
auto reader = [&]
{
[[maybe_unused]] Status status;
bool acquired = sm.try_lock_shared();
ASSERT_TRUE(acquired);
if (!acquired) return; // Just to make TSA happy
sync.arrive_and_wait(); // (A) sync with writer
test++;
sync.arrive_and_wait(); // (B) wait for writer to call try_lock() while shared_lock is held
sm.unlock_shared();
sync.arrive_and_wait(); // (C) wait for writer to release lock, to ensure try_lock_shared() will see no writer
};
for (int i = 0; i < readers; i++)
threads.emplace_back(reader);
{ // writer
[[maybe_unused]] Status status;
sync.arrive_and_wait(); // (A) wait for all reader to acquire lock to avoid blocking them
ASSERT_FALSE(sm.try_lock());
sync.arrive_and_wait(); // (B) sync with readers
{
std::unique_lock lock(sm);
test++;
}
sync.arrive_and_wait(); // (C) sync with readers
}
for (auto & thread : threads)
thread.join();
ASSERT_EQ(test, readers + 1);
}
}
template <class T, class Status = NoCancel>

View File

@ -4,7 +4,6 @@
#include <chrono>
#include <cmath>
#include <mutex>
#include <shared_mutex>
#include <utility>
#include <vector>
@ -14,6 +13,7 @@
#include <Common/randomSeed.h>
#include <Common/ThreadPool.h>
#include <Common/SharedMutex.h>
#include <Common/CurrentMetrics.h>
#include <Dictionaries/IDictionary.h>
@ -206,7 +206,7 @@ private:
/// This lock is used for the inner cache state update function lock it for
/// write, when it need to update cache state all other functions just
/// readers. Surprisingly this lock is also used for last_exception pointer.
mutable std::shared_mutex rw_lock;
mutable SharedMutex rw_lock;
mutable std::exception_ptr last_exception;
mutable std::atomic<size_t> error_count {0};

View File

@ -2,7 +2,6 @@
#include <atomic>
#include <mutex>
#include <shared_mutex>
#include <utility>
#include <vector>
#include <functional>

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SharedMutex.h>
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/ObjectStorages/MetadataFromDiskTransactionState.h>
@ -15,7 +16,7 @@ class FakeMetadataStorageFromDisk final : public IMetadataStorage
private:
friend class FakeMetadataStorageFromDiskTransaction;
mutable std::shared_mutex metadata_mutex;
mutable SharedMutex metadata_mutex;
DiskPtr disk;
ObjectStoragePtr object_storage;

View File

@ -83,7 +83,7 @@ std::string MetadataStorageFromDisk::readInlineDataToString(const std::string &
return readMetadata(path)->getInlineData();
}
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> &) const
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<SharedMutex> &) const
{
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), object_storage_root_path, path);
auto str = readFileToString(path);
@ -91,7 +91,7 @@ DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const
return metadata;
}
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock<std::shared_mutex> &) const
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock<SharedMutex> &) const
{
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), object_storage_root_path, path);
auto str = readFileToString(path);

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SharedMutex.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/IDisk.h>
@ -17,7 +18,7 @@ class MetadataStorageFromDisk final : public IMetadataStorage
private:
friend class MetadataStorageFromDiskTransaction;
mutable std::shared_mutex metadata_mutex;
mutable SharedMutex metadata_mutex;
DiskPtr disk;
std::string object_storage_root_path;
@ -67,8 +68,8 @@ public:
DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const;
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock<std::shared_mutex> & lock) const;
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> & lock) const;
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock<SharedMutex> & lock) const;
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock<SharedMutex> & lock) const;
};
class MetadataStorageFromDiskTransaction final : public IMetadataTransaction

View File

@ -26,7 +26,7 @@ SetLastModifiedOperation::SetLastModifiedOperation(const std::string & path_, Po
{
}
void SetLastModifiedOperation::execute(std::unique_lock<std::shared_mutex> &)
void SetLastModifiedOperation::execute(std::unique_lock<SharedMutex> &)
{
old_timestamp = disk.getLastModified(path);
disk.setLastModified(path, new_timestamp);
@ -44,7 +44,7 @@ ChmodOperation::ChmodOperation(const std::string & path_, mode_t mode_, IDisk &
{
}
void ChmodOperation::execute(std::unique_lock<std::shared_mutex> &)
void ChmodOperation::execute(std::unique_lock<SharedMutex> &)
{
old_mode = disk.stat(path).st_mode;
disk.chmod(path, mode);
@ -61,7 +61,7 @@ UnlinkFileOperation::UnlinkFileOperation(const std::string & path_, IDisk & disk
{
}
void UnlinkFileOperation::execute(std::unique_lock<std::shared_mutex> &)
void UnlinkFileOperation::execute(std::unique_lock<SharedMutex> &)
{
auto buf = disk.readFile(path, ReadSettings{}, std::nullopt, disk.getFileSize(path));
readStringUntilEOF(prev_data, *buf);
@ -81,7 +81,7 @@ CreateDirectoryOperation::CreateDirectoryOperation(const std::string & path_, ID
{
}
void CreateDirectoryOperation::execute(std::unique_lock<std::shared_mutex> &)
void CreateDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
{
disk.createDirectory(path);
}
@ -97,7 +97,7 @@ CreateDirectoryRecursiveOperation::CreateDirectoryRecursiveOperation(const std::
{
}
void CreateDirectoryRecursiveOperation::execute(std::unique_lock<std::shared_mutex> &)
void CreateDirectoryRecursiveOperation::execute(std::unique_lock<SharedMutex> &)
{
namespace fs = std::filesystem;
fs::path p(path);
@ -124,7 +124,7 @@ RemoveDirectoryOperation::RemoveDirectoryOperation(const std::string & path_, ID
{
}
void RemoveDirectoryOperation::execute(std::unique_lock<std::shared_mutex> &)
void RemoveDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
{
disk.removeDirectory(path);
}
@ -141,7 +141,7 @@ RemoveRecursiveOperation::RemoveRecursiveOperation(const std::string & path_, ID
{
}
void RemoveRecursiveOperation::execute(std::unique_lock<std::shared_mutex> &)
void RemoveRecursiveOperation::execute(std::unique_lock<SharedMutex> &)
{
if (disk.isFile(path))
disk.moveFile(path, temp_path);
@ -174,7 +174,7 @@ CreateHardlinkOperation::CreateHardlinkOperation(const std::string & path_from_,
{
}
void CreateHardlinkOperation::execute(std::unique_lock<std::shared_mutex> & lock)
void CreateHardlinkOperation::execute(std::unique_lock<SharedMutex> & lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path_from, lock);
@ -201,7 +201,7 @@ MoveFileOperation::MoveFileOperation(const std::string & path_from_, const std::
{
}
void MoveFileOperation::execute(std::unique_lock<std::shared_mutex> &)
void MoveFileOperation::execute(std::unique_lock<SharedMutex> &)
{
disk.moveFile(path_from, path_to);
}
@ -218,7 +218,7 @@ MoveDirectoryOperation::MoveDirectoryOperation(const std::string & path_from_, c
{
}
void MoveDirectoryOperation::execute(std::unique_lock<std::shared_mutex> &)
void MoveDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
{
disk.moveDirectory(path_from, path_to);
}
@ -236,7 +236,7 @@ ReplaceFileOperation::ReplaceFileOperation(const std::string & path_from_, const
{
}
void ReplaceFileOperation::execute(std::unique_lock<std::shared_mutex> &)
void ReplaceFileOperation::execute(std::unique_lock<SharedMutex> &)
{
if (disk.exists(path_to))
disk.moveFile(path_to, temp_path_to);
@ -262,7 +262,7 @@ WriteFileOperation::WriteFileOperation(const std::string & path_, IDisk & disk_,
{
}
void WriteFileOperation::execute(std::unique_lock<std::shared_mutex> &)
void WriteFileOperation::execute(std::unique_lock<SharedMutex> &)
{
if (disk.exists(path))
{
@ -288,7 +288,7 @@ void WriteFileOperation::undo()
}
}
void AddBlobOperation::execute(std::unique_lock<std::shared_mutex> & metadata_lock)
void AddBlobOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
DiskObjectStorageMetadataPtr metadata;
if (metadata_storage.exists(path))
@ -309,7 +309,7 @@ void AddBlobOperation::undo()
write_operation->undo();
}
void UnlinkMetadataFileOperation::execute(std::unique_lock<std::shared_mutex> & metadata_lock)
void UnlinkMetadataFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
uint32_t ref_count = metadata->getRefCount();
@ -336,7 +336,7 @@ void UnlinkMetadataFileOperation::undo()
write_operation->undo();
}
void SetReadonlyFileOperation::execute(std::unique_lock<std::shared_mutex> & metadata_lock)
void SetReadonlyFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
metadata->setReadOnly();

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SharedMutex.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
namespace DB
@ -13,7 +14,7 @@ class IDisk;
struct IMetadataOperation
{
virtual void execute(std::unique_lock<std::shared_mutex> & metadata_lock) = 0;
virtual void execute(std::unique_lock<SharedMutex> & metadata_lock) = 0;
virtual void undo() = 0;
virtual void finalize() {}
virtual ~IMetadataOperation() = default;
@ -26,7 +27,7 @@ struct SetLastModifiedOperation final : public IMetadataOperation
{
SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -41,7 +42,7 @@ struct ChmodOperation final : public IMetadataOperation
{
ChmodOperation(const std::string & path_, mode_t mode_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -57,7 +58,7 @@ struct UnlinkFileOperation final : public IMetadataOperation
{
UnlinkFileOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -72,7 +73,7 @@ struct CreateDirectoryOperation final : public IMetadataOperation
{
CreateDirectoryOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -86,7 +87,7 @@ struct CreateDirectoryRecursiveOperation final : public IMetadataOperation
{
CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -101,7 +102,7 @@ struct RemoveDirectoryOperation final : public IMetadataOperation
{
RemoveDirectoryOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -114,7 +115,7 @@ struct RemoveRecursiveOperation final : public IMetadataOperation
{
RemoveRecursiveOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -130,7 +131,7 @@ struct WriteFileOperation final : public IMetadataOperation
{
WriteFileOperation(const std::string & path_, IDisk & disk_, const std::string & data_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
private:
@ -149,7 +150,7 @@ struct CreateHardlinkOperation final : public IMetadataOperation
IDisk & disk_,
const MetadataStorageFromDisk & metadata_storage_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -166,7 +167,7 @@ struct MoveFileOperation final : public IMetadataOperation
{
MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -181,7 +182,7 @@ struct MoveDirectoryOperation final : public IMetadataOperation
{
MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -196,7 +197,7 @@ struct ReplaceFileOperation final : public IMetadataOperation
{
ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -226,7 +227,7 @@ struct AddBlobOperation final : public IMetadataOperation
, metadata_storage(metadata_storage_)
{}
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -254,7 +255,7 @@ struct UnlinkMetadataFileOperation final : public IMetadataOperation
{
}
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -279,7 +280,7 @@ struct SetReadonlyFileOperation final : public IMetadataOperation
{
}
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;

View File

@ -726,7 +726,7 @@ DDLGuardPtr DatabaseCatalog::getDDLGuard(const String & database, const String &
return std::make_unique<DDLGuard>(db_guard.first, db_guard.second, std::move(lock), table, database);
}
std::unique_lock<std::shared_mutex> DatabaseCatalog::getExclusiveDDLGuardForDatabase(const String & database)
std::unique_lock<SharedMutex> DatabaseCatalog::getExclusiveDDLGuardForDatabase(const String & database)
{
DDLGuards::iterator db_guard_iter;
{
@ -1279,7 +1279,7 @@ TemporaryLockForUUIDDirectory & TemporaryLockForUUIDDirectory::operator = (Tempo
}
DDLGuard::DDLGuard(Map & map_, std::shared_mutex & db_mutex_, std::unique_lock<std::mutex> guards_lock_, const String & elem, const String & database_name)
DDLGuard::DDLGuard(Map & map_, SharedMutex & db_mutex_, std::unique_lock<std::mutex> guards_lock_, const String & elem, const String & database_name)
: map(map_), db_mutex(db_mutex_), guards_lock(std::move(guards_lock_))
{
it = map.emplace(elem, Entry{std::make_unique<std::mutex>(), 0}).first;

View File

@ -6,6 +6,7 @@
#include <Databases/TablesDependencyGraph.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Common/SharedMutex.h>
#include <boost/noncopyable.hpp>
#include <Poco/Logger.h>
@ -17,7 +18,6 @@
#include <memory>
#include <mutex>
#include <set>
#include <shared_mutex>
#include <unordered_map>
#include <unordered_set>
#include <filesystem>
@ -58,7 +58,7 @@ public:
DDLGuard(
Map & map_,
std::shared_mutex & db_mutex_,
SharedMutex & db_mutex_,
std::unique_lock<std::mutex> guards_lock_,
const String & elem,
const String & database_name);
@ -69,7 +69,7 @@ public:
private:
Map & map;
std::shared_mutex & db_mutex;
SharedMutex & db_mutex;
Map::iterator it;
std::unique_lock<std::mutex> guards_lock;
std::unique_lock<std::mutex> table_lock;
@ -142,7 +142,7 @@ public:
/// Get an object that protects the table from concurrently executing multiple DDL operations.
DDLGuardPtr getDDLGuard(const String & database, const String & table);
/// Get an object that protects the database from concurrent DDL queries all tables in the database
std::unique_lock<std::shared_mutex> getExclusiveDDLGuardForDatabase(const String & database);
std::unique_lock<SharedMutex> getExclusiveDDLGuardForDatabase(const String & database);
void assertDatabaseExists(const String & database_name) const;
@ -298,7 +298,7 @@ private:
/// For the duration of the operation, an element is placed here, and an object is returned,
/// which deletes the element in the destructor when counter becomes zero.
/// In case the element already exists, waits when query will be executed in other thread. See class DDLGuard below.
using DatabaseGuard = std::pair<DDLGuard::Map, std::shared_mutex>;
using DatabaseGuard = std::pair<DDLGuard::Map, SharedMutex>;
using DDLGuards = std::map<String, DatabaseGuard>;
DDLGuards ddl_guards TSA_GUARDED_BY(ddl_guards_mutex);
/// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order.

View File

@ -7,6 +7,7 @@
#include <Core/Block.h>
#include <Common/MultiVersion.h>
#include <Common/SharedMutex.h>
#include <mutex>
@ -135,7 +136,7 @@ private:
TemporaryDataOnDiskPtr tmp_data;
Buckets buckets;
mutable std::shared_mutex rehash_mutex;
mutable SharedMutex rehash_mutex;
FileBucket * current_bucket = nullptr;
mutable std::mutex current_bucket_mutex;

View File

@ -7,11 +7,11 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/ActionBlocker.h>
#include <Common/SharedMutex.h>
#include <base/types.h>
#include <atomic>
#include <map>
#include <shared_mutex>
#include <utility>
namespace zkutil
@ -43,7 +43,7 @@ public:
/// You need to stop the data transfer if blocker is activated.
ActionBlocker blocker;
std::shared_mutex rwlock;
SharedMutex rwlock;
};
using InterserverIOEndpointPtr = std::shared_ptr<InterserverIOEndpoint>;

View File

@ -1,7 +1,6 @@
#pragma once
#include <shared_mutex>
#include <Common/SharedMutex.h>
#include <Common/CacheBase.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
@ -72,7 +71,7 @@ private:
using Cache = CacheBase<size_t, Block, std::hash<size_t>, BlockByteWeight>;
mutable std::shared_mutex rwlock;
mutable SharedMutex rwlock;
std::shared_ptr<TableJoin> table_join;
SizeLimits size_limits;
SortDescription left_sort_description;

View File

@ -25,7 +25,6 @@
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <unordered_map>
#include <vector>

View File

@ -176,7 +176,7 @@ bool Set::insertFromBlock(const ColumnsWithTypeAndName & columns)
bool Set::insertFromBlock(const Columns & columns)
{
std::lock_guard<std::shared_mutex> lock(rwlock);
std::lock_guard lock(rwlock);
if (data.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Method Set::setHeader must be called before Set::insertFromBlock");

View File

@ -1,6 +1,5 @@
#pragma once
#include <shared_mutex>
#include <Core/Block.h>
#include <QueryPipeline/SizeLimits.h>
#include <DataTypes/IDataType.h>
@ -8,6 +7,7 @@
#include <Parsers/IAST.h>
#include <Storages/MergeTree/BoolMask.h>
#include <Common/SharedMutex.h>
#include <Common/logger_useful.h>
@ -131,7 +131,7 @@ private:
/** Protects work with the set in the functions `insertFromBlock` and `execute`.
* These functions can be called simultaneously from different threads only when using StorageSet,
*/
mutable std::shared_mutex rwlock;
mutable SharedMutex rwlock;
template <typename Method>
void insertFromBlockImpl(

View File

@ -19,7 +19,6 @@
#include <Common/TypePromotion.h>
#include <optional>
#include <shared_mutex>
#include <compare>

View File

@ -1,6 +1,6 @@
#pragma once
#include "IO/WriteSettings.h"
#include <IO/WriteSettings.h>
#include <Core/Block.h>
#include <base/types.h>
#include <Core/NamesAndTypes.h>
@ -22,8 +22,6 @@
#include <DataTypes/Serializations/SerializationInfo.h>
#include <Storages/MergeTree/IPartMetadataManager.h>
#include <shared_mutex>
namespace zkutil
{

View File

@ -2,6 +2,7 @@
#include <base/defines.h>
#include <Common/SimpleIncrement.h>
#include <Common/SharedMutex.h>
#include <Common/MultiVersion.h>
#include <Storages/IStorage.h>
#include <IO/ReadBufferFromString.h>
@ -1089,7 +1090,7 @@ protected:
MultiVersion<MergeTreeSettings> storage_settings;
/// Used to determine which UUIDs to send to root query executor for deduplication.
mutable std::shared_mutex pinned_part_uuids_mutex;
mutable SharedMutex pinned_part_uuids_mutex;
PinnedPartUUIDsPtr pinned_part_uuids;
/// True if at least one part was created/removed with transaction.

View File

@ -33,7 +33,6 @@
#include <cstddef>
#include <filesystem>
#include <shared_mutex>
#include <utility>
@ -493,7 +492,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
std::shared_ptr<rocksdb::Statistics> StorageEmbeddedRocksDB::getRocksDBStatistics() const
{
std::shared_lock<std::shared_mutex> lock(rocksdb_ptr_mx);
std::shared_lock lock(rocksdb_ptr_mx);
if (!rocksdb_ptr)
return nullptr;
return rocksdb_ptr->GetOptions().statistics;
@ -501,7 +500,7 @@ std::shared_ptr<rocksdb::Statistics> StorageEmbeddedRocksDB::getRocksDBStatistic
std::vector<rocksdb::Status> StorageEmbeddedRocksDB::multiGet(const std::vector<rocksdb::Slice> & slices_keys, std::vector<String> & values) const
{
std::shared_lock<std::shared_mutex> lock(rocksdb_ptr_mx);
std::shared_lock lock(rocksdb_ptr_mx);
if (!rocksdb_ptr)
return {};
return rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values);

View File

@ -1,7 +1,7 @@
#pragma once
#include <memory>
#include <shared_mutex>
#include <Common/SharedMutex.h>
#include <Storages/IStorage.h>
#include <Interpreters/IKeyValueEntity.h>
#include <rocksdb/status.h>
@ -86,7 +86,7 @@ private:
const String primary_key;
using RocksDBPtr = std::unique_ptr<rocksdb::DB>;
RocksDBPtr rocksdb_ptr;
mutable std::shared_mutex rocksdb_ptr_mx;
mutable SharedMutex rocksdb_ptr_mx;
String rocksdb_dir;
Int32 ttl;
bool read_only;

View File

@ -49,6 +49,7 @@
#include <unistd.h>
#include <re2/re2.h>
#include <filesystem>
#include <shared_mutex>
namespace ProfileEvents

View File

@ -1442,11 +1442,11 @@ void StorageWindowView::writeIntoWindowView(
});
}
std::shared_lock<std::shared_mutex> fire_signal_lock;
std::shared_lock<SharedMutex> fire_signal_lock;
QueryPipelineBuilder builder;
if (window_view.is_proctime)
{
fire_signal_lock = std::shared_lock<std::shared_mutex>(window_view.fire_signal_mutex);
fire_signal_lock = std::shared_lock(window_view.fire_signal_mutex);
/// Fill ____timestamp column with current time in case of now() time column.
if (window_view.is_time_column_func_now)

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SharedMutex.h>
#include <Core/BackgroundSchedulePool.h>
#include <DataTypes/DataTypeInterval.h>
#include <Interpreters/InterpreterSelectQuery.h>
@ -213,7 +214,7 @@ private:
/// Mutex for the blocks and ready condition
std::mutex mutex;
std::shared_mutex fire_signal_mutex;
SharedMutex fire_signal_mutex;
mutable std::mutex sample_block_lock; /// Mutex to protect access to sample block
IntervalKind::Kind window_kind;