diff --git a/src/Common/CancelableSharedMutex.cpp b/src/Common/CancelableSharedMutex.cpp index c8ca93309ee..d9f2c71e8db 100644 --- a/src/Common/CancelableSharedMutex.cpp +++ b/src/Common/CancelableSharedMutex.cpp @@ -98,9 +98,15 @@ void CancelableSharedMutex::lock_shared() bool CancelableSharedMutex::try_lock_shared() { UInt64 value = state.load(); - if (!(value & writers) && state.compare_exchange_strong(value, value + 1)) // overflow is not realistic - return true; - return false; + while (true) + { + if (value & writers) + return false; + if (state.compare_exchange_strong(value, value + 1)) // overflow is not realistic + break; + // Concurrent try_lock_shared() should not fail, so we have to retry CAS, but avoid blocking wait + } + return true; } void CancelableSharedMutex::unlock_shared() diff --git a/src/Common/ProfilingScopedRWLock.h b/src/Common/ProfilingScopedRWLock.h index f5753f9ce46..50e52e66e2d 100644 --- a/src/Common/ProfilingScopedRWLock.h +++ b/src/Common/ProfilingScopedRWLock.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -12,7 +12,7 @@ class ProfilingScopedWriteRWLock { public: - ProfilingScopedWriteRWLock(std::shared_mutex & rwl_, ProfileEvents::Event event) : + ProfilingScopedWriteRWLock(SharedMutex & rwl_, ProfileEvents::Event event) : scoped_write_lock(rwl_) { ProfileEvents::increment(event, watch.elapsed()); @@ -20,14 +20,14 @@ public: private: Stopwatch watch; - std::unique_lock scoped_write_lock; + std::unique_lock scoped_write_lock; }; class ProfilingScopedReadRWLock { public: - ProfilingScopedReadRWLock(std::shared_mutex & rwl, ProfileEvents::Event event) : + ProfilingScopedReadRWLock(SharedMutex & rwl, ProfileEvents::Event event) : scoped_read_lock(rwl) { ProfileEvents::increment(event, watch.elapsed()); @@ -35,7 +35,7 @@ public: private: Stopwatch watch; - std::shared_lock scoped_read_lock; + std::shared_lock scoped_read_lock; }; } diff --git a/src/Common/SharedMutex.cpp b/src/Common/SharedMutex.cpp index 31525dbd668..1df09ca998a 100644 --- a/src/Common/SharedMutex.cpp +++ b/src/Common/SharedMutex.cpp @@ -37,9 +37,7 @@ void SharedMutex::lock() bool SharedMutex::try_lock() { UInt64 value = 0; - if (state.compare_exchange_strong(value, writers)) - return true; - return false; + return state.compare_exchange_strong(value, writers); } void SharedMutex::unlock() @@ -68,9 +66,15 @@ void SharedMutex::lock_shared() bool SharedMutex::try_lock_shared() { UInt64 value = state.load(); - if (!(value & writers) && state.compare_exchange_strong(value, value + 1)) - return true; - return false; + while (true) + { + if (value & writers) + return false; + if (state.compare_exchange_strong(value, value + 1)) + break; + // Concurrent try_lock_shared() should not fail, so we have to retry CAS, but avoid blocking wait + } + return true; } void SharedMutex::unlock_shared() diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 7b6fb433ec5..69c5732ddb6 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -13,7 +13,6 @@ #include #include #include -#include #include diff --git a/src/Common/tests/gtest_threading.cpp b/src/Common/tests/gtest_threading.cpp index 8662e93e81b..8329045cd70 100644 --- a/src/Common/tests/gtest_threading.cpp +++ b/src/Common/tests/gtest_threading.cpp @@ -27,7 +27,7 @@ namespace DB struct NoCancel {}; // for all PerfTests -static constexpr int requests = 512 * 1024; +static constexpr int requests = 128 * 1024; static constexpr int max_threads = 16; template @@ -91,6 +91,49 @@ void TestSharedMutex() ASSERT_EQ(test, writers); } + + // Test multiple readers can acquire lock simultaneously using try_shared_lock + for (int readers = 1; readers <= 128; readers *= 2) + { + T sm; + std::atomic test(0); + std::barrier sync(readers + 1); + + std::vector threads; + threads.reserve(readers); + auto reader = [&] + { + [[maybe_unused]] Status status; + bool acquired = sm.try_lock_shared(); + ASSERT_TRUE(acquired); + if (!acquired) return; // Just to make TSA happy + sync.arrive_and_wait(); // (A) sync with writer + test++; + sync.arrive_and_wait(); // (B) wait for writer to call try_lock() while shared_lock is held + sm.unlock_shared(); + sync.arrive_and_wait(); // (C) wait for writer to release lock, to ensure try_lock_shared() will see no writer + }; + + for (int i = 0; i < readers; i++) + threads.emplace_back(reader); + + { // writer + [[maybe_unused]] Status status; + sync.arrive_and_wait(); // (A) wait for all reader to acquire lock to avoid blocking them + ASSERT_FALSE(sm.try_lock()); + sync.arrive_and_wait(); // (B) sync with readers + { + std::unique_lock lock(sm); + test++; + } + sync.arrive_and_wait(); // (C) sync with readers + } + + for (auto & thread : threads) + thread.join(); + + ASSERT_EQ(test, readers + 1); + } } template diff --git a/src/Dictionaries/CacheDictionary.h b/src/Dictionaries/CacheDictionary.h index d4716999b47..e19c4a66b1f 100644 --- a/src/Dictionaries/CacheDictionary.h +++ b/src/Dictionaries/CacheDictionary.h @@ -4,7 +4,6 @@ #include #include #include -#include #include #include @@ -14,6 +13,7 @@ #include #include +#include #include #include @@ -206,7 +206,7 @@ private: /// This lock is used for the inner cache state update function lock it for /// write, when it need to update cache state all other functions just /// readers. Surprisingly this lock is also used for last_exception pointer. - mutable std::shared_mutex rw_lock; + mutable SharedMutex rw_lock; mutable std::exception_ptr last_exception; mutable std::atomic error_count {0}; diff --git a/src/Dictionaries/CacheDictionaryUpdateQueue.h b/src/Dictionaries/CacheDictionaryUpdateQueue.h index 8db5c4a59df..8d0581d2052 100644 --- a/src/Dictionaries/CacheDictionaryUpdateQueue.h +++ b/src/Dictionaries/CacheDictionaryUpdateQueue.h @@ -2,7 +2,6 @@ #include #include -#include #include #include #include diff --git a/src/Disks/ObjectStorages/FakeMetadataStorageFromDisk.h b/src/Disks/ObjectStorages/FakeMetadataStorageFromDisk.h index 849e7235c0a..246d2aebfaa 100644 --- a/src/Disks/ObjectStorages/FakeMetadataStorageFromDisk.h +++ b/src/Disks/ObjectStorages/FakeMetadataStorageFromDisk.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -15,7 +16,7 @@ class FakeMetadataStorageFromDisk final : public IMetadataStorage private: friend class FakeMetadataStorageFromDiskTransaction; - mutable std::shared_mutex metadata_mutex; + mutable SharedMutex metadata_mutex; DiskPtr disk; ObjectStoragePtr object_storage; diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp index 508982ac9c4..96c8b3daf04 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp @@ -83,7 +83,7 @@ std::string MetadataStorageFromDisk::readInlineDataToString(const std::string & return readMetadata(path)->getInlineData(); } -DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock &) const +DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock &) const { auto metadata = std::make_unique(disk->getPath(), object_storage_root_path, path); auto str = readFileToString(path); @@ -91,7 +91,7 @@ DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const return metadata; } -DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock &) const +DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock &) const { auto metadata = std::make_unique(disk->getPath(), object_storage_root_path, path); auto str = readFileToString(path); diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h index 2c80572e7b4..5273f0b041e 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -17,7 +18,7 @@ class MetadataStorageFromDisk final : public IMetadataStorage private: friend class MetadataStorageFromDiskTransaction; - mutable std::shared_mutex metadata_mutex; + mutable SharedMutex metadata_mutex; DiskPtr disk; std::string object_storage_root_path; @@ -67,8 +68,8 @@ public: DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const; - DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock & lock) const; - DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock & lock) const; + DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock & lock) const; + DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock & lock) const; }; class MetadataStorageFromDiskTransaction final : public IMetadataTransaction diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp index ce5171fedee..7463622cb06 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp @@ -26,7 +26,7 @@ SetLastModifiedOperation::SetLastModifiedOperation(const std::string & path_, Po { } -void SetLastModifiedOperation::execute(std::unique_lock &) +void SetLastModifiedOperation::execute(std::unique_lock &) { old_timestamp = disk.getLastModified(path); disk.setLastModified(path, new_timestamp); @@ -44,7 +44,7 @@ ChmodOperation::ChmodOperation(const std::string & path_, mode_t mode_, IDisk & { } -void ChmodOperation::execute(std::unique_lock &) +void ChmodOperation::execute(std::unique_lock &) { old_mode = disk.stat(path).st_mode; disk.chmod(path, mode); @@ -61,7 +61,7 @@ UnlinkFileOperation::UnlinkFileOperation(const std::string & path_, IDisk & disk { } -void UnlinkFileOperation::execute(std::unique_lock &) +void UnlinkFileOperation::execute(std::unique_lock &) { auto buf = disk.readFile(path, ReadSettings{}, std::nullopt, disk.getFileSize(path)); readStringUntilEOF(prev_data, *buf); @@ -81,7 +81,7 @@ CreateDirectoryOperation::CreateDirectoryOperation(const std::string & path_, ID { } -void CreateDirectoryOperation::execute(std::unique_lock &) +void CreateDirectoryOperation::execute(std::unique_lock &) { disk.createDirectory(path); } @@ -97,7 +97,7 @@ CreateDirectoryRecursiveOperation::CreateDirectoryRecursiveOperation(const std:: { } -void CreateDirectoryRecursiveOperation::execute(std::unique_lock &) +void CreateDirectoryRecursiveOperation::execute(std::unique_lock &) { namespace fs = std::filesystem; fs::path p(path); @@ -124,7 +124,7 @@ RemoveDirectoryOperation::RemoveDirectoryOperation(const std::string & path_, ID { } -void RemoveDirectoryOperation::execute(std::unique_lock &) +void RemoveDirectoryOperation::execute(std::unique_lock &) { disk.removeDirectory(path); } @@ -141,7 +141,7 @@ RemoveRecursiveOperation::RemoveRecursiveOperation(const std::string & path_, ID { } -void RemoveRecursiveOperation::execute(std::unique_lock &) +void RemoveRecursiveOperation::execute(std::unique_lock &) { if (disk.isFile(path)) disk.moveFile(path, temp_path); @@ -174,7 +174,7 @@ CreateHardlinkOperation::CreateHardlinkOperation(const std::string & path_from_, { } -void CreateHardlinkOperation::execute(std::unique_lock & lock) +void CreateHardlinkOperation::execute(std::unique_lock & lock) { auto metadata = metadata_storage.readMetadataUnlocked(path_from, lock); @@ -201,7 +201,7 @@ MoveFileOperation::MoveFileOperation(const std::string & path_from_, const std:: { } -void MoveFileOperation::execute(std::unique_lock &) +void MoveFileOperation::execute(std::unique_lock &) { disk.moveFile(path_from, path_to); } @@ -218,7 +218,7 @@ MoveDirectoryOperation::MoveDirectoryOperation(const std::string & path_from_, c { } -void MoveDirectoryOperation::execute(std::unique_lock &) +void MoveDirectoryOperation::execute(std::unique_lock &) { disk.moveDirectory(path_from, path_to); } @@ -236,7 +236,7 @@ ReplaceFileOperation::ReplaceFileOperation(const std::string & path_from_, const { } -void ReplaceFileOperation::execute(std::unique_lock &) +void ReplaceFileOperation::execute(std::unique_lock &) { if (disk.exists(path_to)) disk.moveFile(path_to, temp_path_to); @@ -262,7 +262,7 @@ WriteFileOperation::WriteFileOperation(const std::string & path_, IDisk & disk_, { } -void WriteFileOperation::execute(std::unique_lock &) +void WriteFileOperation::execute(std::unique_lock &) { if (disk.exists(path)) { @@ -288,7 +288,7 @@ void WriteFileOperation::undo() } } -void AddBlobOperation::execute(std::unique_lock & metadata_lock) +void AddBlobOperation::execute(std::unique_lock & metadata_lock) { DiskObjectStorageMetadataPtr metadata; if (metadata_storage.exists(path)) @@ -309,7 +309,7 @@ void AddBlobOperation::undo() write_operation->undo(); } -void UnlinkMetadataFileOperation::execute(std::unique_lock & metadata_lock) +void UnlinkMetadataFileOperation::execute(std::unique_lock & metadata_lock) { auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock); uint32_t ref_count = metadata->getRefCount(); @@ -336,7 +336,7 @@ void UnlinkMetadataFileOperation::undo() write_operation->undo(); } -void SetReadonlyFileOperation::execute(std::unique_lock & metadata_lock) +void SetReadonlyFileOperation::execute(std::unique_lock & metadata_lock) { auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock); metadata->setReadOnly(); diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h index 0bce6141301..d8e4892a0a5 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h @@ -1,5 +1,6 @@ #pragma once +#include #include namespace DB @@ -13,7 +14,7 @@ class IDisk; struct IMetadataOperation { - virtual void execute(std::unique_lock & metadata_lock) = 0; + virtual void execute(std::unique_lock & metadata_lock) = 0; virtual void undo() = 0; virtual void finalize() {} virtual ~IMetadataOperation() = default; @@ -26,7 +27,7 @@ struct SetLastModifiedOperation final : public IMetadataOperation { SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -41,7 +42,7 @@ struct ChmodOperation final : public IMetadataOperation { ChmodOperation(const std::string & path_, mode_t mode_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -57,7 +58,7 @@ struct UnlinkFileOperation final : public IMetadataOperation { UnlinkFileOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -72,7 +73,7 @@ struct CreateDirectoryOperation final : public IMetadataOperation { CreateDirectoryOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -86,7 +87,7 @@ struct CreateDirectoryRecursiveOperation final : public IMetadataOperation { CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -101,7 +102,7 @@ struct RemoveDirectoryOperation final : public IMetadataOperation { RemoveDirectoryOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -114,7 +115,7 @@ struct RemoveRecursiveOperation final : public IMetadataOperation { RemoveRecursiveOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -130,7 +131,7 @@ struct WriteFileOperation final : public IMetadataOperation { WriteFileOperation(const std::string & path_, IDisk & disk_, const std::string & data_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; private: @@ -149,7 +150,7 @@ struct CreateHardlinkOperation final : public IMetadataOperation IDisk & disk_, const MetadataStorageFromDisk & metadata_storage_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -166,7 +167,7 @@ struct MoveFileOperation final : public IMetadataOperation { MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -181,7 +182,7 @@ struct MoveDirectoryOperation final : public IMetadataOperation { MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -196,7 +197,7 @@ struct ReplaceFileOperation final : public IMetadataOperation { ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -226,7 +227,7 @@ struct AddBlobOperation final : public IMetadataOperation , metadata_storage(metadata_storage_) {} - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -254,7 +255,7 @@ struct UnlinkMetadataFileOperation final : public IMetadataOperation { } - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -279,7 +280,7 @@ struct SetReadonlyFileOperation final : public IMetadataOperation { } - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 6abc95b6738..10141c78d2b 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -726,7 +726,7 @@ DDLGuardPtr DatabaseCatalog::getDDLGuard(const String & database, const String & return std::make_unique(db_guard.first, db_guard.second, std::move(lock), table, database); } -std::unique_lock DatabaseCatalog::getExclusiveDDLGuardForDatabase(const String & database) +std::unique_lock DatabaseCatalog::getExclusiveDDLGuardForDatabase(const String & database) { DDLGuards::iterator db_guard_iter; { @@ -1279,7 +1279,7 @@ TemporaryLockForUUIDDirectory & TemporaryLockForUUIDDirectory::operator = (Tempo } -DDLGuard::DDLGuard(Map & map_, std::shared_mutex & db_mutex_, std::unique_lock guards_lock_, const String & elem, const String & database_name) +DDLGuard::DDLGuard(Map & map_, SharedMutex & db_mutex_, std::unique_lock guards_lock_, const String & elem, const String & database_name) : map(map_), db_mutex(db_mutex_), guards_lock(std::move(guards_lock_)) { it = map.emplace(elem, Entry{std::make_unique(), 0}).first; diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index a3fa4515a69..5dc3f90b7f4 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -6,6 +6,7 @@ #include #include #include +#include #include #include @@ -17,7 +18,6 @@ #include #include #include -#include #include #include #include @@ -58,7 +58,7 @@ public: DDLGuard( Map & map_, - std::shared_mutex & db_mutex_, + SharedMutex & db_mutex_, std::unique_lock guards_lock_, const String & elem, const String & database_name); @@ -69,7 +69,7 @@ public: private: Map & map; - std::shared_mutex & db_mutex; + SharedMutex & db_mutex; Map::iterator it; std::unique_lock guards_lock; std::unique_lock table_lock; @@ -142,7 +142,7 @@ public: /// Get an object that protects the table from concurrently executing multiple DDL operations. DDLGuardPtr getDDLGuard(const String & database, const String & table); /// Get an object that protects the database from concurrent DDL queries all tables in the database - std::unique_lock getExclusiveDDLGuardForDatabase(const String & database); + std::unique_lock getExclusiveDDLGuardForDatabase(const String & database); void assertDatabaseExists(const String & database_name) const; @@ -298,7 +298,7 @@ private: /// For the duration of the operation, an element is placed here, and an object is returned, /// which deletes the element in the destructor when counter becomes zero. /// In case the element already exists, waits when query will be executed in other thread. See class DDLGuard below. - using DatabaseGuard = std::pair; + using DatabaseGuard = std::pair; using DDLGuards = std::map; DDLGuards ddl_guards TSA_GUARDED_BY(ddl_guards_mutex); /// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order. diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index be03cee4a35..4f7694e2f07 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -7,6 +7,7 @@ #include #include +#include #include @@ -135,7 +136,7 @@ private: TemporaryDataOnDiskPtr tmp_data; Buckets buckets; - mutable std::shared_mutex rehash_mutex; + mutable SharedMutex rehash_mutex; FileBucket * current_bucket = nullptr; mutable std::mutex current_bucket_mutex; diff --git a/src/Interpreters/InterserverIOHandler.h b/src/Interpreters/InterserverIOHandler.h index 6aa91edcc62..375c6ee9ca5 100644 --- a/src/Interpreters/InterserverIOHandler.h +++ b/src/Interpreters/InterserverIOHandler.h @@ -7,11 +7,11 @@ #include #include #include +#include #include #include #include -#include #include namespace zkutil @@ -43,7 +43,7 @@ public: /// You need to stop the data transfer if blocker is activated. ActionBlocker blocker; - std::shared_mutex rwlock; + SharedMutex rwlock; }; using InterserverIOEndpointPtr = std::shared_ptr; diff --git a/src/Interpreters/MergeJoin.h b/src/Interpreters/MergeJoin.h index 770ca0409bf..8b5d884a0e6 100644 --- a/src/Interpreters/MergeJoin.h +++ b/src/Interpreters/MergeJoin.h @@ -1,7 +1,6 @@ #pragma once -#include - +#include #include #include #include @@ -72,7 +71,7 @@ private: using Cache = CacheBase, BlockByteWeight>; - mutable std::shared_mutex rwlock; + mutable SharedMutex rwlock; std::shared_ptr table_join; SizeLimits size_limits; SortDescription left_sort_description; diff --git a/src/Interpreters/ProcessList.h b/src/Interpreters/ProcessList.h index 34edfc5a2e2..eae8b15c695 100644 --- a/src/Interpreters/ProcessList.h +++ b/src/Interpreters/ProcessList.h @@ -25,7 +25,6 @@ #include #include #include -#include #include #include diff --git a/src/Interpreters/Set.cpp b/src/Interpreters/Set.cpp index 8cdd60b2426..75bb05f8346 100644 --- a/src/Interpreters/Set.cpp +++ b/src/Interpreters/Set.cpp @@ -176,7 +176,7 @@ bool Set::insertFromBlock(const ColumnsWithTypeAndName & columns) bool Set::insertFromBlock(const Columns & columns) { - std::lock_guard lock(rwlock); + std::lock_guard lock(rwlock); if (data.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Method Set::setHeader must be called before Set::insertFromBlock"); diff --git a/src/Interpreters/Set.h b/src/Interpreters/Set.h index bafb0dcea7a..00eff614c7c 100644 --- a/src/Interpreters/Set.h +++ b/src/Interpreters/Set.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include @@ -8,6 +7,7 @@ #include #include +#include #include @@ -131,7 +131,7 @@ private: /** Protects work with the set in the functions `insertFromBlock` and `execute`. * These functions can be called simultaneously from different threads only when using StorageSet, */ - mutable std::shared_mutex rwlock; + mutable SharedMutex rwlock; template void insertFromBlockImpl( diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index ddc3ef16cbf..22aa47ddfb3 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -19,7 +19,6 @@ #include #include -#include #include diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 2c5169a1729..68d5147362b 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -1,6 +1,6 @@ #pragma once -#include "IO/WriteSettings.h" +#include #include #include #include @@ -22,8 +22,6 @@ #include #include -#include - namespace zkutil { diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 15f3787003c..2e57d58ef41 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -1089,7 +1090,7 @@ protected: MultiVersion storage_settings; /// Used to determine which UUIDs to send to root query executor for deduplication. - mutable std::shared_mutex pinned_part_uuids_mutex; + mutable SharedMutex pinned_part_uuids_mutex; PinnedPartUUIDsPtr pinned_part_uuids; /// True if at least one part was created/removed with transaction. diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp index a1c5474ae3a..86c0dffa60d 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp @@ -33,7 +33,6 @@ #include #include -#include #include @@ -493,7 +492,7 @@ static StoragePtr create(const StorageFactory::Arguments & args) std::shared_ptr StorageEmbeddedRocksDB::getRocksDBStatistics() const { - std::shared_lock lock(rocksdb_ptr_mx); + std::shared_lock lock(rocksdb_ptr_mx); if (!rocksdb_ptr) return nullptr; return rocksdb_ptr->GetOptions().statistics; @@ -501,7 +500,7 @@ std::shared_ptr StorageEmbeddedRocksDB::getRocksDBStatistic std::vector StorageEmbeddedRocksDB::multiGet(const std::vector & slices_keys, std::vector & values) const { - std::shared_lock lock(rocksdb_ptr_mx); + std::shared_lock lock(rocksdb_ptr_mx); if (!rocksdb_ptr) return {}; return rocksdb_ptr->MultiGet(rocksdb::ReadOptions(), slices_keys, &values); diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h index 02938fb5f69..7f6fc49fb18 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include @@ -86,7 +86,7 @@ private: const String primary_key; using RocksDBPtr = std::unique_ptr; RocksDBPtr rocksdb_ptr; - mutable std::shared_mutex rocksdb_ptr_mx; + mutable SharedMutex rocksdb_ptr_mx; String rocksdb_dir; Int32 ttl; bool read_only; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 7fd4b224905..e2a2f84bc72 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -49,6 +49,7 @@ #include #include #include +#include namespace ProfileEvents diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index 652063c780f..8fb2470495f 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -1442,11 +1442,11 @@ void StorageWindowView::writeIntoWindowView( }); } - std::shared_lock fire_signal_lock; + std::shared_lock fire_signal_lock; QueryPipelineBuilder builder; if (window_view.is_proctime) { - fire_signal_lock = std::shared_lock(window_view.fire_signal_mutex); + fire_signal_lock = std::shared_lock(window_view.fire_signal_mutex); /// Fill ____timestamp column with current time in case of now() time column. if (window_view.is_time_column_func_now) diff --git a/src/Storages/WindowView/StorageWindowView.h b/src/Storages/WindowView/StorageWindowView.h index 6da34389e4d..b313e466211 100644 --- a/src/Storages/WindowView/StorageWindowView.h +++ b/src/Storages/WindowView/StorageWindowView.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -213,7 +214,7 @@ private: /// Mutex for the blocks and ready condition std::mutex mutex; - std::shared_mutex fire_signal_mutex; + SharedMutex fire_signal_mutex; mutable std::mutex sample_block_lock; /// Mutex to protect access to sample block IntervalKind::Kind window_kind;