From 693489a8ad2c512022ccf6ba286a55fa103754da Mon Sep 17 00:00:00 2001 From: serxa Date: Thu, 12 Jan 2023 15:51:04 +0000 Subject: [PATCH] review fixes --- src/Common/ProfilingScopedRWLock.h | 10 +- src/Common/ThreadStatus.h | 1 - src/Common/Threading.cpp | 486 ------------------ src/Common/Threading.h | 270 ---------- src/Dictionaries/CacheDictionary.h | 4 +- src/Dictionaries/CacheDictionaryUpdateQueue.h | 1 - .../FakeMetadataStorageFromDisk.h | 3 +- .../MetadataStorageFromDisk.cpp | 4 +- .../ObjectStorages/MetadataStorageFromDisk.h | 7 +- ...taStorageFromDiskTransactionOperations.cpp | 30 +- ...dataStorageFromDiskTransactionOperations.h | 33 +- src/Interpreters/DatabaseCatalog.cpp | 4 +- src/Interpreters/DatabaseCatalog.h | 10 +- src/Interpreters/GraceHashJoin.h | 4 +- src/Interpreters/InterserverIOHandler.h | 4 +- src/Interpreters/MergeJoin.h | 4 +- src/Interpreters/ProcessList.h | 1 - src/Interpreters/Set.h | 4 +- src/Storages/IStorage.h | 1 - src/Storages/MergeTree/IMergeTreeDataPart.h | 1 - src/Storages/MergeTree/MergeTreeData.h | 3 +- .../RocksDB/StorageEmbeddedRocksDB.cpp | 1 - src/Storages/RocksDB/StorageEmbeddedRocksDB.h | 4 +- src/Storages/StorageFile.cpp | 1 + src/Storages/StorageFile.h | 2 +- src/Storages/StorageLog.h | 3 +- src/Storages/StorageStripeLog.h | 2 +- src/Storages/WindowView/StorageWindowView.cpp | 2 +- src/Storages/WindowView/StorageWindowView.h | 3 +- 29 files changed, 74 insertions(+), 829 deletions(-) delete mode 100644 src/Common/Threading.cpp delete mode 100644 src/Common/Threading.h diff --git a/src/Common/ProfilingScopedRWLock.h b/src/Common/ProfilingScopedRWLock.h index 70f4d607d56..50e52e66e2d 100644 --- a/src/Common/ProfilingScopedRWLock.h +++ b/src/Common/ProfilingScopedRWLock.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include @@ -12,7 +12,7 @@ class ProfilingScopedWriteRWLock { public: - ProfilingScopedWriteRWLock(DB::FastSharedMutex & rwl_, ProfileEvents::Event event) : + ProfilingScopedWriteRWLock(SharedMutex & rwl_, ProfileEvents::Event event) : scoped_write_lock(rwl_) { ProfileEvents::increment(event, watch.elapsed()); @@ -20,14 +20,14 @@ public: private: Stopwatch watch; - std::unique_lock scoped_write_lock; + std::unique_lock scoped_write_lock; }; class ProfilingScopedReadRWLock { public: - ProfilingScopedReadRWLock(DB::FastSharedMutex & rwl, ProfileEvents::Event event) : + ProfilingScopedReadRWLock(SharedMutex & rwl, ProfileEvents::Event event) : scoped_read_lock(rwl) { ProfileEvents::increment(event, watch.elapsed()); @@ -35,7 +35,7 @@ public: private: Stopwatch watch; - std::shared_lock scoped_read_lock; + std::shared_lock scoped_read_lock; }; } diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index eaef57785e6..a8e4ec5c57f 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include diff --git a/src/Common/Threading.cpp b/src/Common/Threading.cpp deleted file mode 100644 index 714f45d8c90..00000000000 --- a/src/Common/Threading.cpp +++ /dev/null @@ -1,486 +0,0 @@ -#include - -namespace DB -{ -namespace ErrorCodes -{ - extern const int THREAD_WAS_CANCELLED; -} -} - -#ifdef OS_LINUX /// Because of futex - -#include - -#include - -#include -#include -#include -#include - -namespace DB -{ - -namespace -{ - inline Int64 futexWait(void * address, UInt32 value) - { - return syscall(SYS_futex, address, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0); - } - - inline Int64 futexWake(void * address, int count) - { - return syscall(SYS_futex, address, FUTEX_WAKE_PRIVATE, count, nullptr, nullptr, 0); - } - - // inline void waitFetch(std::atomic & address, UInt32 & value) - // { - // futexWait(&address, value); - // value = address.load(); - // } - - // inline void wakeOne(std::atomic & address) - // { - // futexWake(&address, 1); - // } - - // inline void wakeAll(std::atomic & address) - // { - // futexWake(&address, INT_MAX); - // } - - inline constexpr UInt32 lowerValue(UInt64 value) - { - return static_cast(value & 0xffffffffull); - } - - inline constexpr UInt32 upperValue(UInt64 value) - { - return static_cast(value >> 32ull); - } - - inline UInt32 * lowerAddress(void * address) - { - return reinterpret_cast(address) + (std::endian::native == std::endian::big); - } - - inline UInt32 * upperAddress(void * address) - { - return reinterpret_cast(address) + (std::endian::native == std::endian::little); - } - - inline void waitLowerFetch(std::atomic & address, UInt64 & value) - { - futexWait(lowerAddress(&address), lowerValue(value)); - value = address.load(); - } - - inline bool cancellableWaitLowerFetch(std::atomic & address, UInt64 & value) - { - bool res = CancelToken::local().wait(lowerAddress(&address), lowerValue(value)); - value = address.load(); - return res; - } - - inline void wakeLowerOne(std::atomic & address) - { - syscall(SYS_futex, lowerAddress(&address), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0); - } - - // inline void wakeLowerAll(std::atomic & address) - // { - // syscall(SYS_futex, lowerAddress(&address), FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0); - // } - - inline void waitUpperFetch(std::atomic & address, UInt64 & value) - { - futexWait(upperAddress(&address), upperValue(value)); - value = address.load(); - } - - inline bool cancellableWaitUpperFetch(std::atomic & address, UInt64 & value) - { - bool res = CancelToken::local().wait(upperAddress(&address), upperValue(value)); - value = address.load(); - return res; - } - - // inline void wakeUpperOne(std::atomic & address) - // { - // syscall(SYS_futex, upperAddress(&address), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0); - // } - - inline void wakeUpperAll(std::atomic & address) - { - syscall(SYS_futex, upperAddress(&address), FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0); - } -} - -void CancelToken::Registry::insert(CancelToken * token) -{ - std::lock_guard lock(mutex); - threads[token->thread_id] = token; -} - -void CancelToken::Registry::remove(CancelToken * token) -{ - std::lock_guard lock(mutex); - threads.erase(token->thread_id); -} - -void CancelToken::Registry::signal(UInt64 tid) -{ - std::lock_guard lock(mutex); - if (auto it = threads.find(tid); it != threads.end()) - it->second->signalImpl(); -} - -void CancelToken::Registry::signal(UInt64 tid, int code, const String & message) -{ - std::lock_guard lock(mutex); - if (auto it = threads.find(tid); it != threads.end()) - it->second->signalImpl(code, message); -} - -const std::shared_ptr & CancelToken::Registry::instance() -{ - static std::shared_ptr registry{new Registry()}; // shared_ptr is used to enforce correct destruction order of tokens and registry - return registry; -} - -CancelToken::CancelToken() - : state(disabled) - , thread_id(getThreadId()) - , registry(Registry::instance()) -{ - registry->insert(this); -} - -CancelToken::~CancelToken() -{ - registry->remove(this); -} - -void CancelToken::signal(UInt64 tid) -{ - Registry::instance()->signal(tid); -} - -void CancelToken::signal(UInt64 tid, int code, const String & message) -{ - Registry::instance()->signal(tid, code, message); -} - -bool CancelToken::wait(UInt32 * address, UInt32 value) -{ - chassert((reinterpret_cast(address) & canceled) == 0); // An `address` must be 2-byte aligned - if (value & signaled) // Can happen after spurious wake-up due to cancel of other thread - return true; // Spin-wait unless signal is handled - - UInt64 s = state.load(); - while (true) - { - if (s & disabled) - { - // Start non-cancellable wait on futex. Spurious wake-up is possible. - futexWait(address, value); - return true; // Disabled - true is forced - } - if (s & canceled) - return false; // Has already been canceled - if (state.compare_exchange_strong(s, reinterpret_cast(address))) - break; // This futex has been "acquired" by this token - } - - // Start cancellable wait. Spurious wake-up is possible. - futexWait(address, value); - - // "Release" futex and check for cancellation - s = state.load(); - while (true) - { - chassert((s & disabled) != disabled); // `disable()` must not be called from another thread - if (s & canceled) - { - if (s == canceled) - break; // Signaled; futex "release" has been done by the signaling thread - else - { - s = state.load(); - continue; // To avoid race (may lead to futex destruction) we have to wait for signaling thread to finish - } - } - if (state.compare_exchange_strong(s, 0)) - return true; // There was no cancellation; futex "released" - } - - // Reset signaled bit - reinterpret_cast *>(address)->fetch_and(~signaled); - return false; -} - -void CancelToken::raise() -{ - std::unique_lock lock(signal_mutex); - if (exception_code != 0) - throw DB::Exception( - std::exchange(exception_code, 0), - std::exchange(exception_message, {})); - else - throw DB::Exception(ErrorCodes::THREAD_WAS_CANCELLED, "Thread was cancelled"); -} - -void CancelToken::notifyOne(UInt32 * address) -{ - futexWake(address, 1); -} - -void CancelToken::notifyAll(UInt32 * address) -{ - futexWake(address, INT_MAX); -} - -void CancelToken::signalImpl() -{ - signalImpl(0, {}); -} - -std::mutex CancelToken::signal_mutex; - -void CancelToken::signalImpl(int code, const String & message) -{ - // Serialize all signaling threads to avoid races due to concurrent signal()/raise() calls - std::unique_lock lock(signal_mutex); - - UInt64 s = state.load(); - while (true) - { - if (s & canceled) - return; // Already cancelled - don't signal twice - if (state.compare_exchange_strong(s, s | canceled)) - break; // It is the cancelling thread - should deliver signal if necessary - } - - exception_code = code; - exception_message = message; - - if ((s & disabled) == disabled) - return; // Cancellation is disabled - just signal token for later, but don't wake - std::atomic * address = reinterpret_cast *>(s & disabled); - if (address == nullptr) - return; // Thread is currently not waiting on futex - wake-up not required - - // Set signaled bit - UInt32 value = address->load(); - while (true) - { - if (value & signaled) // Already signaled, just spin-wait until previous signal is handled by waiter - value = address->load(); - else if (address->compare_exchange_strong(value, value | signaled)) - break; - } - - // Wake all threads waiting on `address`, one of them will be cancelled and others will get spurious wake-ups - // Woken canceled thread will reset signaled bit - futexWake(address, INT_MAX); - - // Signaling thread must remove address from state to notify canceled thread that `futexWake()` is done, thus `wake()` can return. - // Otherwise we may have race condition: signaling thread may try to wake futex that has been already destructed. - state.store(canceled); -} - -Cancellable::Cancellable() -{ - CancelToken::local().reset(); -} - -Cancellable::~Cancellable() -{ - CancelToken::local().disable(); -} - -NonCancellable::NonCancellable() -{ - CancelToken::local().disable(); -} - -NonCancellable::~NonCancellable() -{ - CancelToken::local().enable(); -} - -CancellableSharedMutex::CancellableSharedMutex() - : state(0) - , waiters(0) -{} - -void CancellableSharedMutex::lock() -{ - UInt64 value = state.load(); - while (true) - { - if (value & writers) - { - waiters++; - if (!cancellableWaitUpperFetch(state, value)) - { - waiters--; - CancelToken::local().raise(); - } - else - waiters--; - } - else if (state.compare_exchange_strong(value, value | writers)) - break; - } - - value |= writers; - while (value & readers) - { - if (!cancellableWaitLowerFetch(state, value)) - { - state.fetch_and(~writers); - wakeUpperAll(state); - CancelToken::local().raise(); - } - } -} - -bool CancellableSharedMutex::try_lock() -{ - UInt64 value = state.load(); - if ((value & (readers | writers)) == 0 && state.compare_exchange_strong(value, value | writers)) - return true; - return false; -} - -void CancellableSharedMutex::unlock() -{ - state.fetch_and(~writers); - if (waiters) - wakeUpperAll(state); -} - -void CancellableSharedMutex::lock_shared() -{ - UInt64 value = state.load(); - while (true) - { - if (value & writers) - { - waiters++; - if (!cancellableWaitUpperFetch(state, value)) - { - waiters--; - CancelToken::local().raise(); - } - else - waiters--; - } - else if (state.compare_exchange_strong(value, value + 1)) // overflow is not realistic - break; - } -} - -bool CancellableSharedMutex::try_lock_shared() -{ - UInt64 value = state.load(); - if (!(value & writers) && state.compare_exchange_strong(value, value + 1)) // overflow is not realistic - return true; - return false; -} - -void CancellableSharedMutex::unlock_shared() -{ - UInt64 value = state.fetch_sub(1) - 1; - if ((value & (writers | readers)) == writers) // If writer is waiting and no more readers - wakeLowerOne(state); // Wake writer -} - -FastSharedMutex::FastSharedMutex() - : state(0) - , waiters(0) -{} - -void FastSharedMutex::lock() -{ - UInt64 value = state.load(); - while (true) - { - if (value & writers) - { - waiters++; - waitUpperFetch(state, value); - waiters--; - } - else if (state.compare_exchange_strong(value, value | writers)) - break; - } - - value |= writers; - while (value & readers) - waitLowerFetch(state, value); -} - -bool FastSharedMutex::try_lock() -{ - UInt64 value = 0; - if (state.compare_exchange_strong(value, writers)) - return true; - return false; -} - -void FastSharedMutex::unlock() -{ - state.store(0); - if (waiters) - wakeUpperAll(state); -} - -void FastSharedMutex::lock_shared() -{ - UInt64 value = state.load(); - while (true) - { - if (value & writers) - { - waiters++; - waitUpperFetch(state, value); - waiters--; - } - else if (state.compare_exchange_strong(value, value + 1)) - break; - } -} - -bool FastSharedMutex::try_lock_shared() -{ - UInt64 value = state.load(); - if (!(value & writers) && state.compare_exchange_strong(value, value + 1)) - return true; - return false; -} - -void FastSharedMutex::unlock_shared() -{ - UInt64 value = state.fetch_sub(1) - 1; - if (value == writers) - wakeLowerOne(state); // Wake writer -} - -} - -#else - -namespace DB -{ - -void CancelToken::raise() -{ - throw DB::Exception(ErrorCodes::THREAD_WAS_CANCELLED, "Thread was cancelled"); -} - -} - -#endif diff --git a/src/Common/Threading.h b/src/Common/Threading.h deleted file mode 100644 index 1fa7b3acfd7..00000000000 --- a/src/Common/Threading.h +++ /dev/null @@ -1,270 +0,0 @@ -#pragma once - -#include -#include - -#include // for chassert to work - -#include // for std::shared_lock and std::unique_lock - -#ifdef OS_LINUX /// Because of futex - -#include -#include -#include -#include - -namespace DB -{ - -// Scoped object, enabling thread cancellation (cannot be nested) -struct Cancellable -{ - Cancellable(); - ~Cancellable(); -}; - -// Scoped object, disabling thread cancellation (cannot be nested; must be inside `Cancellable` region) -struct NonCancellable -{ - NonCancellable(); - ~NonCancellable(); -}; - -// Responsible for synchronization needed to deliver thread cancellation signal. -// Basic building block for cancellable synchronization primitives. -// Allows to perform cancellable wait on memory addresses (think futex) -class CancelToken -{ -public: - CancelToken(); - CancelToken(const CancelToken &) = delete; - CancelToken(CancelToken &&) = delete; - CancelToken & operator=(const CancelToken &) = delete; - ~CancelToken(); - - // Returns token for the current thread - static CancelToken & local() - { - static thread_local CancelToken token; - return token; - } - - // Cancellable wait on memory address (futex word). - // Thread will do atomic compare-and-sleep `*address == value`. Waiting will continue until `notify_one()` - // or `notify_all()` will be called with the same `address` or calling thread will be canceled using `signal()`. - // Note that spurious wake-ups are also possible due to cancellation of other waiters on the same `address`. - // WARNING: `address` must be 2-byte aligned and `value` highest bit must be zero. - // Return value: - // true - woken by either notify or spurious wakeup; - // false - iff cancellation signal has been received. - // Implementation details: - // It registers `address` inside token's `state` to allow other threads to wake this thread and deliver cancellation signal. - // Highest bit of `*address` is used for guaranteed delivery of the signal, but is guaranteed to be zero on return due to cancellation. - // Intended to be called only by thread associated with this token. - bool wait(UInt32 * address, UInt32 value); - - // Throws `DB::Exception` received from `signal()`. Call it if `wait()` returned false. - // Intended to be called only by thread associated with this token. - [[noreturn]] void raise(); - - // Regular wake by address (futex word). It does not interact with token in any way. We have it here to complement `wait()`. - // Can be called from any thread. - static void notifyOne(UInt32 * address); - static void notifyAll(UInt32 * address); - - // Send cancel signal to thread with specified `tid`. - // If thread was waiting using `wait()` it will be woken up (unless cancellation is disabled). - // Can be called from any thread. - static void signal(UInt64 tid); - static void signal(UInt64 tid, int code, const String & message); - - // Flag used to deliver cancellation into memory address to wake a thread. - // Note that most significant bit at `addresses` to be used with `wait()` is reserved. - static constexpr UInt32 signaled = 1u << 31u; - -private: - friend struct Cancellable; - friend struct NonCancellable; - - // Restores initial state for token to be reused. See `Cancellable` struct. - // Intended to be called only by thread associated with this token. - void reset() - { - state.store(0); - } - - // Enable thread cancellation. See `NonCancellable` struct. - // Intended to be called only by thread associated with this token. - void enable() - { - chassert((state.load() & disabled) == disabled); - state.fetch_and(~disabled); - } - - // Disable thread cancellation. See `NonCancellable` struct. - // Intended to be called only by thread associated with this token. - void disable() - { - chassert((state.load() & disabled) == 0); - state.fetch_or(disabled); - } - - // Singleton. Maps thread IDs to tokens. - struct Registry; - friend struct Registry; - struct Registry - { - std::mutex mutex; - std::unordered_map threads; // By thread ID - - void insert(CancelToken * token); - void remove(CancelToken * token); - void signal(UInt64 tid); - void signal(UInt64 tid, int code, const String & message); - - static const std::shared_ptr & instance(); - }; - - // Cancels this token and wakes thread if necessary. - // Can be called from any thread. - void signalImpl(); - void signalImpl(int code, const String & message); - - // Lower bit: cancel signal received flag - static constexpr UInt64 canceled = 1; - - // Upper bits - possible values: - // 1) all zeros: token is enabed, i.e. wait() call can return false, thread is not waiting on any address; - // 2) all ones: token is disabled, i.e. wait() call cannot be cancelled; - // 3) specific `address`: token is enabled and thread is currently waiting on this `address`. - static constexpr UInt64 disabled = ~canceled; - static_assert(sizeof(UInt32 *) == sizeof(UInt64)); // State must be able to hold an address - - // All signal handling logic should be globally serialized using this mutex - static std::mutex signal_mutex; - - // Cancellation state - alignas(64) std::atomic state; - [[maybe_unused]] char padding[64 - sizeof(state)]; - - // Cancellation exception - int exception_code; - String exception_message; - - // Token is permanently attached to a single thread. There is one-to-one mapping between threads and tokens. - const UInt64 thread_id; - - // To avoid `Registry` destruction before last `Token` destruction - const std::shared_ptr registry; -}; - -class CancellableSharedMutex -{ -public: - CancellableSharedMutex(); - ~CancellableSharedMutex() = default; - CancellableSharedMutex(const CancellableSharedMutex &) = delete; - CancellableSharedMutex & operator=(const CancellableSharedMutex &) = delete; - - // Exclusive ownership - void lock(); - bool try_lock(); - void unlock(); - - // Shared ownership - void lock_shared(); - bool try_lock_shared(); - void unlock_shared(); - -private: - // State 64-bits layout: - // 1b - 31b - 1b - 31b - // signaled - writers - signaled - readers - // 63------------------------------------0 - // Two 32-bit words are used for cancellable waiting, so each has its own separate signaled bit - static constexpr UInt64 readers = (1ull << 32ull) - 1ull - CancelToken::signaled; - static constexpr UInt64 readers_signaled = CancelToken::signaled; - static constexpr UInt64 writers = readers << 32ull; - static constexpr UInt64 writers_signaled = readers_signaled << 32ull; - - alignas(64) std::atomic state; - std::atomic waiters; -}; - -class FastSharedMutex -{ -public: - FastSharedMutex(); - ~FastSharedMutex() = default; - FastSharedMutex(const FastSharedMutex &) = delete; - FastSharedMutex & operator=(const FastSharedMutex &) = delete; - - // Exclusive ownership - void lock(); - bool try_lock(); - void unlock(); - - // Shared ownership - void lock_shared(); - bool try_lock_shared(); - void unlock_shared(); - -private: - static constexpr UInt64 readers = (1ull << 32ull) - 1ull; // Lower 32 bits of state - static constexpr UInt64 writers = ~readers; // Upper 32 bits of state - - alignas(64) std::atomic state; - std::atomic waiters; -}; - -} - -#else - -// WARNING: We support cancellable synchronization primitives only on linux for now - -namespace DB -{ - -struct Cancellable -{ - Cancellable() = default; - ~Cancellable() = default; -}; - -struct NonCancellable -{ - NonCancellable() = default; - ~NonCancellable() = default; -}; - -class CancelToken -{ -public: - CancelToken() = default; - CancelToken(const CancelToken &) = delete; - CancelToken(CancelToken &&) = delete; - CancelToken & operator=(const CancelToken &) = delete; - ~CancelToken() = default; - - static CancelToken & local() - { - static CancelToken token; - return token; - } - - bool wait(UInt32 *, UInt32) { return true; } - [[noreturn]] void raise(); - static void notifyOne(UInt32 *) {} - static void notifyAll(UInt32 *) {} - static void signal(UInt64) {} - static void signal(UInt64, int, const String &) {} -}; - -using CancellableSharedMutex = std::shared_mutex; -using FastSharedMutex = std::shared_mutex; - -} - -#endif diff --git a/src/Dictionaries/CacheDictionary.h b/src/Dictionaries/CacheDictionary.h index fe9e585fe3e..e19c4a66b1f 100644 --- a/src/Dictionaries/CacheDictionary.h +++ b/src/Dictionaries/CacheDictionary.h @@ -13,7 +13,7 @@ #include #include -#include +#include #include #include @@ -206,7 +206,7 @@ private: /// This lock is used for the inner cache state update function lock it for /// write, when it need to update cache state all other functions just /// readers. Surprisingly this lock is also used for last_exception pointer. - mutable DB::FastSharedMutex rw_lock; + mutable SharedMutex rw_lock; mutable std::exception_ptr last_exception; mutable std::atomic error_count {0}; diff --git a/src/Dictionaries/CacheDictionaryUpdateQueue.h b/src/Dictionaries/CacheDictionaryUpdateQueue.h index 3b19de71d85..8d0581d2052 100644 --- a/src/Dictionaries/CacheDictionaryUpdateQueue.h +++ b/src/Dictionaries/CacheDictionaryUpdateQueue.h @@ -7,7 +7,6 @@ #include #include -#include #include #include #include diff --git a/src/Disks/ObjectStorages/FakeMetadataStorageFromDisk.h b/src/Disks/ObjectStorages/FakeMetadataStorageFromDisk.h index 7abcbabcc49..246d2aebfaa 100644 --- a/src/Disks/ObjectStorages/FakeMetadataStorageFromDisk.h +++ b/src/Disks/ObjectStorages/FakeMetadataStorageFromDisk.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -15,7 +16,7 @@ class FakeMetadataStorageFromDisk final : public IMetadataStorage private: friend class FakeMetadataStorageFromDiskTransaction; - mutable DB::FastSharedMutex metadata_mutex; + mutable SharedMutex metadata_mutex; DiskPtr disk; ObjectStoragePtr object_storage; diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp index 69fb4b8888c..f89928fefe0 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp @@ -83,7 +83,7 @@ std::string MetadataStorageFromDisk::readInlineDataToString(const std::string & return readMetadata(path)->getInlineData(); } -DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock &) const +DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock &) const { auto metadata = std::make_unique(disk->getPath(), object_storage_root_path, path); auto str = readFileToString(path); @@ -91,7 +91,7 @@ DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const return metadata; } -DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock &) const +DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock &) const { auto metadata = std::make_unique(disk->getPath(), object_storage_root_path, path); auto str = readFileToString(path); diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h index f3a662fc566..5273f0b041e 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDisk.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -17,7 +18,7 @@ class MetadataStorageFromDisk final : public IMetadataStorage private: friend class MetadataStorageFromDiskTransaction; - mutable DB::FastSharedMutex metadata_mutex; + mutable SharedMutex metadata_mutex; DiskPtr disk; std::string object_storage_root_path; @@ -67,8 +68,8 @@ public: DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const; - DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock & lock) const; - DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock & lock) const; + DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock & lock) const; + DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock & lock) const; }; class MetadataStorageFromDiskTransaction final : public IMetadataTransaction diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp index fb5ae2668e6..7463622cb06 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp +++ b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.cpp @@ -26,7 +26,7 @@ SetLastModifiedOperation::SetLastModifiedOperation(const std::string & path_, Po { } -void SetLastModifiedOperation::execute(std::unique_lock &) +void SetLastModifiedOperation::execute(std::unique_lock &) { old_timestamp = disk.getLastModified(path); disk.setLastModified(path, new_timestamp); @@ -44,7 +44,7 @@ ChmodOperation::ChmodOperation(const std::string & path_, mode_t mode_, IDisk & { } -void ChmodOperation::execute(std::unique_lock &) +void ChmodOperation::execute(std::unique_lock &) { old_mode = disk.stat(path).st_mode; disk.chmod(path, mode); @@ -61,7 +61,7 @@ UnlinkFileOperation::UnlinkFileOperation(const std::string & path_, IDisk & disk { } -void UnlinkFileOperation::execute(std::unique_lock &) +void UnlinkFileOperation::execute(std::unique_lock &) { auto buf = disk.readFile(path, ReadSettings{}, std::nullopt, disk.getFileSize(path)); readStringUntilEOF(prev_data, *buf); @@ -81,7 +81,7 @@ CreateDirectoryOperation::CreateDirectoryOperation(const std::string & path_, ID { } -void CreateDirectoryOperation::execute(std::unique_lock &) +void CreateDirectoryOperation::execute(std::unique_lock &) { disk.createDirectory(path); } @@ -97,7 +97,7 @@ CreateDirectoryRecursiveOperation::CreateDirectoryRecursiveOperation(const std:: { } -void CreateDirectoryRecursiveOperation::execute(std::unique_lock &) +void CreateDirectoryRecursiveOperation::execute(std::unique_lock &) { namespace fs = std::filesystem; fs::path p(path); @@ -124,7 +124,7 @@ RemoveDirectoryOperation::RemoveDirectoryOperation(const std::string & path_, ID { } -void RemoveDirectoryOperation::execute(std::unique_lock &) +void RemoveDirectoryOperation::execute(std::unique_lock &) { disk.removeDirectory(path); } @@ -141,7 +141,7 @@ RemoveRecursiveOperation::RemoveRecursiveOperation(const std::string & path_, ID { } -void RemoveRecursiveOperation::execute(std::unique_lock &) +void RemoveRecursiveOperation::execute(std::unique_lock &) { if (disk.isFile(path)) disk.moveFile(path, temp_path); @@ -174,7 +174,7 @@ CreateHardlinkOperation::CreateHardlinkOperation(const std::string & path_from_, { } -void CreateHardlinkOperation::execute(std::unique_lock & lock) +void CreateHardlinkOperation::execute(std::unique_lock & lock) { auto metadata = metadata_storage.readMetadataUnlocked(path_from, lock); @@ -201,7 +201,7 @@ MoveFileOperation::MoveFileOperation(const std::string & path_from_, const std:: { } -void MoveFileOperation::execute(std::unique_lock &) +void MoveFileOperation::execute(std::unique_lock &) { disk.moveFile(path_from, path_to); } @@ -218,7 +218,7 @@ MoveDirectoryOperation::MoveDirectoryOperation(const std::string & path_from_, c { } -void MoveDirectoryOperation::execute(std::unique_lock &) +void MoveDirectoryOperation::execute(std::unique_lock &) { disk.moveDirectory(path_from, path_to); } @@ -236,7 +236,7 @@ ReplaceFileOperation::ReplaceFileOperation(const std::string & path_from_, const { } -void ReplaceFileOperation::execute(std::unique_lock &) +void ReplaceFileOperation::execute(std::unique_lock &) { if (disk.exists(path_to)) disk.moveFile(path_to, temp_path_to); @@ -262,7 +262,7 @@ WriteFileOperation::WriteFileOperation(const std::string & path_, IDisk & disk_, { } -void WriteFileOperation::execute(std::unique_lock &) +void WriteFileOperation::execute(std::unique_lock &) { if (disk.exists(path)) { @@ -288,7 +288,7 @@ void WriteFileOperation::undo() } } -void AddBlobOperation::execute(std::unique_lock & metadata_lock) +void AddBlobOperation::execute(std::unique_lock & metadata_lock) { DiskObjectStorageMetadataPtr metadata; if (metadata_storage.exists(path)) @@ -309,7 +309,7 @@ void AddBlobOperation::undo() write_operation->undo(); } -void UnlinkMetadataFileOperation::execute(std::unique_lock & metadata_lock) +void UnlinkMetadataFileOperation::execute(std::unique_lock & metadata_lock) { auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock); uint32_t ref_count = metadata->getRefCount(); @@ -336,7 +336,7 @@ void UnlinkMetadataFileOperation::undo() write_operation->undo(); } -void SetReadonlyFileOperation::execute(std::unique_lock & metadata_lock) +void SetReadonlyFileOperation::execute(std::unique_lock & metadata_lock) { auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock); metadata->setReadOnly(); diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h index ad026880c22..d8e4892a0a5 100644 --- a/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDiskTransactionOperations.h @@ -1,5 +1,6 @@ #pragma once +#include #include namespace DB @@ -13,7 +14,7 @@ class IDisk; struct IMetadataOperation { - virtual void execute(std::unique_lock & metadata_lock) = 0; + virtual void execute(std::unique_lock & metadata_lock) = 0; virtual void undo() = 0; virtual void finalize() {} virtual ~IMetadataOperation() = default; @@ -26,7 +27,7 @@ struct SetLastModifiedOperation final : public IMetadataOperation { SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -41,7 +42,7 @@ struct ChmodOperation final : public IMetadataOperation { ChmodOperation(const std::string & path_, mode_t mode_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -57,7 +58,7 @@ struct UnlinkFileOperation final : public IMetadataOperation { UnlinkFileOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -72,7 +73,7 @@ struct CreateDirectoryOperation final : public IMetadataOperation { CreateDirectoryOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -86,7 +87,7 @@ struct CreateDirectoryRecursiveOperation final : public IMetadataOperation { CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -101,7 +102,7 @@ struct RemoveDirectoryOperation final : public IMetadataOperation { RemoveDirectoryOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -114,7 +115,7 @@ struct RemoveRecursiveOperation final : public IMetadataOperation { RemoveRecursiveOperation(const std::string & path_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -130,7 +131,7 @@ struct WriteFileOperation final : public IMetadataOperation { WriteFileOperation(const std::string & path_, IDisk & disk_, const std::string & data_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; private: @@ -149,7 +150,7 @@ struct CreateHardlinkOperation final : public IMetadataOperation IDisk & disk_, const MetadataStorageFromDisk & metadata_storage_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -166,7 +167,7 @@ struct MoveFileOperation final : public IMetadataOperation { MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -181,7 +182,7 @@ struct MoveDirectoryOperation final : public IMetadataOperation { MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -196,7 +197,7 @@ struct ReplaceFileOperation final : public IMetadataOperation { ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_); - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -226,7 +227,7 @@ struct AddBlobOperation final : public IMetadataOperation , metadata_storage(metadata_storage_) {} - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -254,7 +255,7 @@ struct UnlinkMetadataFileOperation final : public IMetadataOperation { } - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; @@ -279,7 +280,7 @@ struct SetReadonlyFileOperation final : public IMetadataOperation { } - void execute(std::unique_lock & metadata_lock) override; + void execute(std::unique_lock & metadata_lock) override; void undo() override; diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 0ed9a8dc2ce..da6fdf7b578 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -729,7 +729,7 @@ DDLGuardPtr DatabaseCatalog::getDDLGuard(const String & database, const String & return std::make_unique(db_guard.first, db_guard.second, std::move(lock), table, database); } -std::unique_lock DatabaseCatalog::getExclusiveDDLGuardForDatabase(const String & database) +std::unique_lock DatabaseCatalog::getExclusiveDDLGuardForDatabase(const String & database) { DDLGuards::iterator db_guard_iter; { @@ -1282,7 +1282,7 @@ TemporaryLockForUUIDDirectory & TemporaryLockForUUIDDirectory::operator = (Tempo } -DDLGuard::DDLGuard(Map & map_, DB::FastSharedMutex & db_mutex_, std::unique_lock guards_lock_, const String & elem, const String & database_name) +DDLGuard::DDLGuard(Map & map_, SharedMutex & db_mutex_, std::unique_lock guards_lock_, const String & elem, const String & database_name) : map(map_), db_mutex(db_mutex_), guards_lock(std::move(guards_lock_)) { it = map.emplace(elem, Entry{std::make_unique(), 0}).first; diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index cac8541b437..5dc3f90b7f4 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include #include #include @@ -58,7 +58,7 @@ public: DDLGuard( Map & map_, - DB::FastSharedMutex & db_mutex_, + SharedMutex & db_mutex_, std::unique_lock guards_lock_, const String & elem, const String & database_name); @@ -69,7 +69,7 @@ public: private: Map & map; - DB::FastSharedMutex & db_mutex; + SharedMutex & db_mutex; Map::iterator it; std::unique_lock guards_lock; std::unique_lock table_lock; @@ -142,7 +142,7 @@ public: /// Get an object that protects the table from concurrently executing multiple DDL operations. DDLGuardPtr getDDLGuard(const String & database, const String & table); /// Get an object that protects the database from concurrent DDL queries all tables in the database - std::unique_lock getExclusiveDDLGuardForDatabase(const String & database); + std::unique_lock getExclusiveDDLGuardForDatabase(const String & database); void assertDatabaseExists(const String & database_name) const; @@ -298,7 +298,7 @@ private: /// For the duration of the operation, an element is placed here, and an object is returned, /// which deletes the element in the destructor when counter becomes zero. /// In case the element already exists, waits when query will be executed in other thread. See class DDLGuard below. - using DatabaseGuard = std::pair; + using DatabaseGuard = std::pair; using DDLGuards = std::map; DDLGuards ddl_guards TSA_GUARDED_BY(ddl_guards_mutex); /// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order. diff --git a/src/Interpreters/GraceHashJoin.h b/src/Interpreters/GraceHashJoin.h index 26506dbc93b..56ed3fc387d 100644 --- a/src/Interpreters/GraceHashJoin.h +++ b/src/Interpreters/GraceHashJoin.h @@ -7,7 +7,7 @@ #include #include -#include +#include #include @@ -131,7 +131,7 @@ private: TemporaryDataOnDiskPtr tmp_data; Buckets buckets; - mutable DB::FastSharedMutex rehash_mutex; + mutable SharedMutex rehash_mutex; FileBucket * current_bucket = nullptr; mutable std::mutex current_bucket_mutex; diff --git a/src/Interpreters/InterserverIOHandler.h b/src/Interpreters/InterserverIOHandler.h index 3f21f23766a..e9d658ad251 100644 --- a/src/Interpreters/InterserverIOHandler.h +++ b/src/Interpreters/InterserverIOHandler.h @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include #include @@ -43,7 +43,7 @@ public: /// You need to stop the data transfer if blocker is activated. ActionBlocker blocker; - DB::FastSharedMutex rwlock; + SharedMutex rwlock; }; using InterserverIOEndpointPtr = std::shared_ptr; diff --git a/src/Interpreters/MergeJoin.h b/src/Interpreters/MergeJoin.h index 5be637c1feb..8b5d884a0e6 100644 --- a/src/Interpreters/MergeJoin.h +++ b/src/Interpreters/MergeJoin.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include #include @@ -71,7 +71,7 @@ private: using Cache = CacheBase, BlockByteWeight>; - mutable DB::FastSharedMutex rwlock; + mutable SharedMutex rwlock; std::shared_ptr table_join; SizeLimits size_limits; SortDescription left_sort_description; diff --git a/src/Interpreters/ProcessList.h b/src/Interpreters/ProcessList.h index 8edae5fa866..eae8b15c695 100644 --- a/src/Interpreters/ProcessList.h +++ b/src/Interpreters/ProcessList.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include diff --git a/src/Interpreters/Set.h b/src/Interpreters/Set.h index d11ab109df5..00eff614c7c 100644 --- a/src/Interpreters/Set.h +++ b/src/Interpreters/Set.h @@ -7,7 +7,7 @@ #include #include -#include +#include #include @@ -131,7 +131,7 @@ private: /** Protects work with the set in the functions `insertFromBlock` and `execute`. * These functions can be called simultaneously from different threads only when using StorageSet, */ - mutable DB::FastSharedMutex rwlock; + mutable SharedMutex rwlock; template void insertFromBlockImpl( diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 8ffdc42d4f5..107e43dedcf 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -17,7 +17,6 @@ #include #include #include -#include #include #include diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index bf8ded9bbaa..68d5147362b 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index c31b8869daf..8edc374e842 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -1088,7 +1089,7 @@ protected: MultiVersion storage_settings; /// Used to determine which UUIDs to send to root query executor for deduplication. - mutable DB::FastSharedMutex pinned_part_uuids_mutex; + mutable SharedMutex pinned_part_uuids_mutex; PinnedPartUUIDsPtr pinned_part_uuids; /// True if at least one part was created/removed with transaction. diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp index 4f9f1f21931..5a764c56123 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp @@ -25,7 +25,6 @@ #include #include #include -#include #include #include diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h index bac303d5c18..7f6fc49fb18 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.h +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include @@ -86,7 +86,7 @@ private: const String primary_key; using RocksDBPtr = std::unique_ptr; RocksDBPtr rocksdb_ptr; - mutable DB::FastSharedMutex rocksdb_ptr_mx; + mutable SharedMutex rocksdb_ptr_mx; String rocksdb_dir; Int32 ttl; bool read_only; diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index c293530db46..381df75ce20 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -49,6 +49,7 @@ #include #include #include +#include namespace ProfileEvents diff --git a/src/Storages/StorageFile.h b/src/Storages/StorageFile.h index 3eff78510f1..03b3aacb67f 100644 --- a/src/Storages/StorageFile.h +++ b/src/Storages/StorageFile.h @@ -3,10 +3,10 @@ #include #include -#include #include #include +#include namespace DB diff --git a/src/Storages/StorageLog.h b/src/Storages/StorageLog.h index 27edad8da2f..a2b1356f240 100644 --- a/src/Storages/StorageLog.h +++ b/src/Storages/StorageLog.h @@ -1,14 +1,15 @@ #pragma once #include +#include #include #include #include -#include #include #include + namespace DB { diff --git a/src/Storages/StorageStripeLog.h b/src/Storages/StorageStripeLog.h index 8682c16328a..3f1b4ed0ad5 100644 --- a/src/Storages/StorageStripeLog.h +++ b/src/Storages/StorageStripeLog.h @@ -1,11 +1,11 @@ #pragma once #include +#include #include #include #include -#include #include #include #include diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index baf0d517fb8..d6b70acea66 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -1443,7 +1443,7 @@ void StorageWindowView::writeIntoWindowView( }); } - std::shared_lock fire_signal_lock; + std::shared_lock fire_signal_lock; QueryPipelineBuilder builder; if (window_view.is_proctime) { diff --git a/src/Storages/WindowView/StorageWindowView.h b/src/Storages/WindowView/StorageWindowView.h index 77d48b98e7b..b313e466211 100644 --- a/src/Storages/WindowView/StorageWindowView.h +++ b/src/Storages/WindowView/StorageWindowView.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -213,7 +214,7 @@ private: /// Mutex for the blocks and ready condition std::mutex mutex; - DB::FastSharedMutex fire_signal_mutex; + SharedMutex fire_signal_mutex; mutable std::mutex sample_block_lock; /// Mutex to protect access to sample block IntervalKind::Kind window_kind;