review fixes

This commit is contained in:
serxa 2023-01-12 15:51:04 +00:00
parent 12d8543578
commit 693489a8ad
29 changed files with 74 additions and 829 deletions

View File

@ -1,6 +1,6 @@
#pragma once
#include <Common/Threading.h>
#include <Common/SharedMutex.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
@ -12,7 +12,7 @@ class ProfilingScopedWriteRWLock
{
public:
ProfilingScopedWriteRWLock(DB::FastSharedMutex & rwl_, ProfileEvents::Event event) :
ProfilingScopedWriteRWLock(SharedMutex & rwl_, ProfileEvents::Event event) :
scoped_write_lock(rwl_)
{
ProfileEvents::increment(event, watch.elapsed());
@ -20,14 +20,14 @@ public:
private:
Stopwatch watch;
std::unique_lock<DB::FastSharedMutex> scoped_write_lock;
std::unique_lock<SharedMutex> scoped_write_lock;
};
class ProfilingScopedReadRWLock
{
public:
ProfilingScopedReadRWLock(DB::FastSharedMutex & rwl, ProfileEvents::Event event) :
ProfilingScopedReadRWLock(SharedMutex & rwl, ProfileEvents::Event event) :
scoped_read_lock(rwl)
{
ProfileEvents::increment(event, watch.elapsed());
@ -35,7 +35,7 @@ public:
private:
Stopwatch watch;
std::shared_lock<DB::FastSharedMutex> scoped_read_lock;
std::shared_lock<SharedMutex> scoped_read_lock;
};
}

View File

@ -5,7 +5,6 @@
#include <IO/Progress.h>
#include <Common/MemoryTracker.h>
#include <Common/ProfileEvents.h>
#include <Common/Threading.h>
#include <base/StringRef.h>
#include <Common/ConcurrentBoundedQueue.h>

View File

@ -1,486 +0,0 @@
#include <Common/Threading.h>
namespace DB
{
namespace ErrorCodes
{
extern const int THREAD_WAS_CANCELLED;
}
}
#ifdef OS_LINUX /// Because of futex
#include <base/getThreadId.h>
#include <bit>
#include <linux/futex.h>
#include <sys/types.h>
#include <sys/syscall.h>
#include <unistd.h>
namespace DB
{
namespace
{
inline Int64 futexWait(void * address, UInt32 value)
{
return syscall(SYS_futex, address, FUTEX_WAIT_PRIVATE, value, nullptr, nullptr, 0);
}
inline Int64 futexWake(void * address, int count)
{
return syscall(SYS_futex, address, FUTEX_WAKE_PRIVATE, count, nullptr, nullptr, 0);
}
// inline void waitFetch(std::atomic<UInt32> & address, UInt32 & value)
// {
// futexWait(&address, value);
// value = address.load();
// }
// inline void wakeOne(std::atomic<UInt32> & address)
// {
// futexWake(&address, 1);
// }
// inline void wakeAll(std::atomic<UInt32> & address)
// {
// futexWake(&address, INT_MAX);
// }
inline constexpr UInt32 lowerValue(UInt64 value)
{
return static_cast<UInt32>(value & 0xffffffffull);
}
inline constexpr UInt32 upperValue(UInt64 value)
{
return static_cast<UInt32>(value >> 32ull);
}
inline UInt32 * lowerAddress(void * address)
{
return reinterpret_cast<UInt32 *>(address) + (std::endian::native == std::endian::big);
}
inline UInt32 * upperAddress(void * address)
{
return reinterpret_cast<UInt32 *>(address) + (std::endian::native == std::endian::little);
}
inline void waitLowerFetch(std::atomic<UInt64> & address, UInt64 & value)
{
futexWait(lowerAddress(&address), lowerValue(value));
value = address.load();
}
inline bool cancellableWaitLowerFetch(std::atomic<UInt64> & address, UInt64 & value)
{
bool res = CancelToken::local().wait(lowerAddress(&address), lowerValue(value));
value = address.load();
return res;
}
inline void wakeLowerOne(std::atomic<UInt64> & address)
{
syscall(SYS_futex, lowerAddress(&address), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
}
// inline void wakeLowerAll(std::atomic<UInt64> & address)
// {
// syscall(SYS_futex, lowerAddress(&address), FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
// }
inline void waitUpperFetch(std::atomic<UInt64> & address, UInt64 & value)
{
futexWait(upperAddress(&address), upperValue(value));
value = address.load();
}
inline bool cancellableWaitUpperFetch(std::atomic<UInt64> & address, UInt64 & value)
{
bool res = CancelToken::local().wait(upperAddress(&address), upperValue(value));
value = address.load();
return res;
}
// inline void wakeUpperOne(std::atomic<UInt64> & address)
// {
// syscall(SYS_futex, upperAddress(&address), FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0);
// }
inline void wakeUpperAll(std::atomic<UInt64> & address)
{
syscall(SYS_futex, upperAddress(&address), FUTEX_WAKE_PRIVATE, INT_MAX, nullptr, nullptr, 0);
}
}
void CancelToken::Registry::insert(CancelToken * token)
{
std::lock_guard<std::mutex> lock(mutex);
threads[token->thread_id] = token;
}
void CancelToken::Registry::remove(CancelToken * token)
{
std::lock_guard<std::mutex> lock(mutex);
threads.erase(token->thread_id);
}
void CancelToken::Registry::signal(UInt64 tid)
{
std::lock_guard<std::mutex> lock(mutex);
if (auto it = threads.find(tid); it != threads.end())
it->second->signalImpl();
}
void CancelToken::Registry::signal(UInt64 tid, int code, const String & message)
{
std::lock_guard<std::mutex> lock(mutex);
if (auto it = threads.find(tid); it != threads.end())
it->second->signalImpl(code, message);
}
const std::shared_ptr<CancelToken::Registry> & CancelToken::Registry::instance()
{
static std::shared_ptr<Registry> registry{new Registry()}; // shared_ptr is used to enforce correct destruction order of tokens and registry
return registry;
}
CancelToken::CancelToken()
: state(disabled)
, thread_id(getThreadId())
, registry(Registry::instance())
{
registry->insert(this);
}
CancelToken::~CancelToken()
{
registry->remove(this);
}
void CancelToken::signal(UInt64 tid)
{
Registry::instance()->signal(tid);
}
void CancelToken::signal(UInt64 tid, int code, const String & message)
{
Registry::instance()->signal(tid, code, message);
}
bool CancelToken::wait(UInt32 * address, UInt32 value)
{
chassert((reinterpret_cast<UInt64>(address) & canceled) == 0); // An `address` must be 2-byte aligned
if (value & signaled) // Can happen after spurious wake-up due to cancel of other thread
return true; // Spin-wait unless signal is handled
UInt64 s = state.load();
while (true)
{
if (s & disabled)
{
// Start non-cancellable wait on futex. Spurious wake-up is possible.
futexWait(address, value);
return true; // Disabled - true is forced
}
if (s & canceled)
return false; // Has already been canceled
if (state.compare_exchange_strong(s, reinterpret_cast<UInt64>(address)))
break; // This futex has been "acquired" by this token
}
// Start cancellable wait. Spurious wake-up is possible.
futexWait(address, value);
// "Release" futex and check for cancellation
s = state.load();
while (true)
{
chassert((s & disabled) != disabled); // `disable()` must not be called from another thread
if (s & canceled)
{
if (s == canceled)
break; // Signaled; futex "release" has been done by the signaling thread
else
{
s = state.load();
continue; // To avoid race (may lead to futex destruction) we have to wait for signaling thread to finish
}
}
if (state.compare_exchange_strong(s, 0))
return true; // There was no cancellation; futex "released"
}
// Reset signaled bit
reinterpret_cast<std::atomic<UInt32> *>(address)->fetch_and(~signaled);
return false;
}
void CancelToken::raise()
{
std::unique_lock<std::mutex> lock(signal_mutex);
if (exception_code != 0)
throw DB::Exception(
std::exchange(exception_code, 0),
std::exchange(exception_message, {}));
else
throw DB::Exception(ErrorCodes::THREAD_WAS_CANCELLED, "Thread was cancelled");
}
void CancelToken::notifyOne(UInt32 * address)
{
futexWake(address, 1);
}
void CancelToken::notifyAll(UInt32 * address)
{
futexWake(address, INT_MAX);
}
void CancelToken::signalImpl()
{
signalImpl(0, {});
}
std::mutex CancelToken::signal_mutex;
void CancelToken::signalImpl(int code, const String & message)
{
// Serialize all signaling threads to avoid races due to concurrent signal()/raise() calls
std::unique_lock<std::mutex> lock(signal_mutex);
UInt64 s = state.load();
while (true)
{
if (s & canceled)
return; // Already cancelled - don't signal twice
if (state.compare_exchange_strong(s, s | canceled))
break; // It is the cancelling thread - should deliver signal if necessary
}
exception_code = code;
exception_message = message;
if ((s & disabled) == disabled)
return; // Cancellation is disabled - just signal token for later, but don't wake
std::atomic<UInt32> * address = reinterpret_cast<std::atomic<UInt32> *>(s & disabled);
if (address == nullptr)
return; // Thread is currently not waiting on futex - wake-up not required
// Set signaled bit
UInt32 value = address->load();
while (true)
{
if (value & signaled) // Already signaled, just spin-wait until previous signal is handled by waiter
value = address->load();
else if (address->compare_exchange_strong(value, value | signaled))
break;
}
// Wake all threads waiting on `address`, one of them will be cancelled and others will get spurious wake-ups
// Woken canceled thread will reset signaled bit
futexWake(address, INT_MAX);
// Signaling thread must remove address from state to notify canceled thread that `futexWake()` is done, thus `wake()` can return.
// Otherwise we may have race condition: signaling thread may try to wake futex that has been already destructed.
state.store(canceled);
}
Cancellable::Cancellable()
{
CancelToken::local().reset();
}
Cancellable::~Cancellable()
{
CancelToken::local().disable();
}
NonCancellable::NonCancellable()
{
CancelToken::local().disable();
}
NonCancellable::~NonCancellable()
{
CancelToken::local().enable();
}
CancellableSharedMutex::CancellableSharedMutex()
: state(0)
, waiters(0)
{}
void CancellableSharedMutex::lock()
{
UInt64 value = state.load();
while (true)
{
if (value & writers)
{
waiters++;
if (!cancellableWaitUpperFetch(state, value))
{
waiters--;
CancelToken::local().raise();
}
else
waiters--;
}
else if (state.compare_exchange_strong(value, value | writers))
break;
}
value |= writers;
while (value & readers)
{
if (!cancellableWaitLowerFetch(state, value))
{
state.fetch_and(~writers);
wakeUpperAll(state);
CancelToken::local().raise();
}
}
}
bool CancellableSharedMutex::try_lock()
{
UInt64 value = state.load();
if ((value & (readers | writers)) == 0 && state.compare_exchange_strong(value, value | writers))
return true;
return false;
}
void CancellableSharedMutex::unlock()
{
state.fetch_and(~writers);
if (waiters)
wakeUpperAll(state);
}
void CancellableSharedMutex::lock_shared()
{
UInt64 value = state.load();
while (true)
{
if (value & writers)
{
waiters++;
if (!cancellableWaitUpperFetch(state, value))
{
waiters--;
CancelToken::local().raise();
}
else
waiters--;
}
else if (state.compare_exchange_strong(value, value + 1)) // overflow is not realistic
break;
}
}
bool CancellableSharedMutex::try_lock_shared()
{
UInt64 value = state.load();
if (!(value & writers) && state.compare_exchange_strong(value, value + 1)) // overflow is not realistic
return true;
return false;
}
void CancellableSharedMutex::unlock_shared()
{
UInt64 value = state.fetch_sub(1) - 1;
if ((value & (writers | readers)) == writers) // If writer is waiting and no more readers
wakeLowerOne(state); // Wake writer
}
FastSharedMutex::FastSharedMutex()
: state(0)
, waiters(0)
{}
void FastSharedMutex::lock()
{
UInt64 value = state.load();
while (true)
{
if (value & writers)
{
waiters++;
waitUpperFetch(state, value);
waiters--;
}
else if (state.compare_exchange_strong(value, value | writers))
break;
}
value |= writers;
while (value & readers)
waitLowerFetch(state, value);
}
bool FastSharedMutex::try_lock()
{
UInt64 value = 0;
if (state.compare_exchange_strong(value, writers))
return true;
return false;
}
void FastSharedMutex::unlock()
{
state.store(0);
if (waiters)
wakeUpperAll(state);
}
void FastSharedMutex::lock_shared()
{
UInt64 value = state.load();
while (true)
{
if (value & writers)
{
waiters++;
waitUpperFetch(state, value);
waiters--;
}
else if (state.compare_exchange_strong(value, value + 1))
break;
}
}
bool FastSharedMutex::try_lock_shared()
{
UInt64 value = state.load();
if (!(value & writers) && state.compare_exchange_strong(value, value + 1))
return true;
return false;
}
void FastSharedMutex::unlock_shared()
{
UInt64 value = state.fetch_sub(1) - 1;
if (value == writers)
wakeLowerOne(state); // Wake writer
}
}
#else
namespace DB
{
void CancelToken::raise()
{
throw DB::Exception(ErrorCodes::THREAD_WAS_CANCELLED, "Thread was cancelled");
}
}
#endif

View File

@ -1,270 +0,0 @@
#pragma once
#include <base/types.h>
#include <base/defines.h>
#include <Common/Exception.h> // for chassert to work
#include <shared_mutex> // for std::shared_lock and std::unique_lock
#ifdef OS_LINUX /// Because of futex
#include <atomic>
#include <mutex>
#include <unordered_map>
#include <memory>
namespace DB
{
// Scoped object, enabling thread cancellation (cannot be nested)
struct Cancellable
{
Cancellable();
~Cancellable();
};
// Scoped object, disabling thread cancellation (cannot be nested; must be inside `Cancellable` region)
struct NonCancellable
{
NonCancellable();
~NonCancellable();
};
// Responsible for synchronization needed to deliver thread cancellation signal.
// Basic building block for cancellable synchronization primitives.
// Allows to perform cancellable wait on memory addresses (think futex)
class CancelToken
{
public:
CancelToken();
CancelToken(const CancelToken &) = delete;
CancelToken(CancelToken &&) = delete;
CancelToken & operator=(const CancelToken &) = delete;
~CancelToken();
// Returns token for the current thread
static CancelToken & local()
{
static thread_local CancelToken token;
return token;
}
// Cancellable wait on memory address (futex word).
// Thread will do atomic compare-and-sleep `*address == value`. Waiting will continue until `notify_one()`
// or `notify_all()` will be called with the same `address` or calling thread will be canceled using `signal()`.
// Note that spurious wake-ups are also possible due to cancellation of other waiters on the same `address`.
// WARNING: `address` must be 2-byte aligned and `value` highest bit must be zero.
// Return value:
// true - woken by either notify or spurious wakeup;
// false - iff cancellation signal has been received.
// Implementation details:
// It registers `address` inside token's `state` to allow other threads to wake this thread and deliver cancellation signal.
// Highest bit of `*address` is used for guaranteed delivery of the signal, but is guaranteed to be zero on return due to cancellation.
// Intended to be called only by thread associated with this token.
bool wait(UInt32 * address, UInt32 value);
// Throws `DB::Exception` received from `signal()`. Call it if `wait()` returned false.
// Intended to be called only by thread associated with this token.
[[noreturn]] void raise();
// Regular wake by address (futex word). It does not interact with token in any way. We have it here to complement `wait()`.
// Can be called from any thread.
static void notifyOne(UInt32 * address);
static void notifyAll(UInt32 * address);
// Send cancel signal to thread with specified `tid`.
// If thread was waiting using `wait()` it will be woken up (unless cancellation is disabled).
// Can be called from any thread.
static void signal(UInt64 tid);
static void signal(UInt64 tid, int code, const String & message);
// Flag used to deliver cancellation into memory address to wake a thread.
// Note that most significant bit at `addresses` to be used with `wait()` is reserved.
static constexpr UInt32 signaled = 1u << 31u;
private:
friend struct Cancellable;
friend struct NonCancellable;
// Restores initial state for token to be reused. See `Cancellable` struct.
// Intended to be called only by thread associated with this token.
void reset()
{
state.store(0);
}
// Enable thread cancellation. See `NonCancellable` struct.
// Intended to be called only by thread associated with this token.
void enable()
{
chassert((state.load() & disabled) == disabled);
state.fetch_and(~disabled);
}
// Disable thread cancellation. See `NonCancellable` struct.
// Intended to be called only by thread associated with this token.
void disable()
{
chassert((state.load() & disabled) == 0);
state.fetch_or(disabled);
}
// Singleton. Maps thread IDs to tokens.
struct Registry;
friend struct Registry;
struct Registry
{
std::mutex mutex;
std::unordered_map<UInt64, CancelToken*> threads; // By thread ID
void insert(CancelToken * token);
void remove(CancelToken * token);
void signal(UInt64 tid);
void signal(UInt64 tid, int code, const String & message);
static const std::shared_ptr<Registry> & instance();
};
// Cancels this token and wakes thread if necessary.
// Can be called from any thread.
void signalImpl();
void signalImpl(int code, const String & message);
// Lower bit: cancel signal received flag
static constexpr UInt64 canceled = 1;
// Upper bits - possible values:
// 1) all zeros: token is enabed, i.e. wait() call can return false, thread is not waiting on any address;
// 2) all ones: token is disabled, i.e. wait() call cannot be cancelled;
// 3) specific `address`: token is enabled and thread is currently waiting on this `address`.
static constexpr UInt64 disabled = ~canceled;
static_assert(sizeof(UInt32 *) == sizeof(UInt64)); // State must be able to hold an address
// All signal handling logic should be globally serialized using this mutex
static std::mutex signal_mutex;
// Cancellation state
alignas(64) std::atomic<UInt64> state;
[[maybe_unused]] char padding[64 - sizeof(state)];
// Cancellation exception
int exception_code;
String exception_message;
// Token is permanently attached to a single thread. There is one-to-one mapping between threads and tokens.
const UInt64 thread_id;
// To avoid `Registry` destruction before last `Token` destruction
const std::shared_ptr<Registry> registry;
};
class CancellableSharedMutex
{
public:
CancellableSharedMutex();
~CancellableSharedMutex() = default;
CancellableSharedMutex(const CancellableSharedMutex &) = delete;
CancellableSharedMutex & operator=(const CancellableSharedMutex &) = delete;
// Exclusive ownership
void lock();
bool try_lock();
void unlock();
// Shared ownership
void lock_shared();
bool try_lock_shared();
void unlock_shared();
private:
// State 64-bits layout:
// 1b - 31b - 1b - 31b
// signaled - writers - signaled - readers
// 63------------------------------------0
// Two 32-bit words are used for cancellable waiting, so each has its own separate signaled bit
static constexpr UInt64 readers = (1ull << 32ull) - 1ull - CancelToken::signaled;
static constexpr UInt64 readers_signaled = CancelToken::signaled;
static constexpr UInt64 writers = readers << 32ull;
static constexpr UInt64 writers_signaled = readers_signaled << 32ull;
alignas(64) std::atomic<UInt64> state;
std::atomic<UInt32> waiters;
};
class FastSharedMutex
{
public:
FastSharedMutex();
~FastSharedMutex() = default;
FastSharedMutex(const FastSharedMutex &) = delete;
FastSharedMutex & operator=(const FastSharedMutex &) = delete;
// Exclusive ownership
void lock();
bool try_lock();
void unlock();
// Shared ownership
void lock_shared();
bool try_lock_shared();
void unlock_shared();
private:
static constexpr UInt64 readers = (1ull << 32ull) - 1ull; // Lower 32 bits of state
static constexpr UInt64 writers = ~readers; // Upper 32 bits of state
alignas(64) std::atomic<UInt64> state;
std::atomic<UInt32> waiters;
};
}
#else
// WARNING: We support cancellable synchronization primitives only on linux for now
namespace DB
{
struct Cancellable
{
Cancellable() = default;
~Cancellable() = default;
};
struct NonCancellable
{
NonCancellable() = default;
~NonCancellable() = default;
};
class CancelToken
{
public:
CancelToken() = default;
CancelToken(const CancelToken &) = delete;
CancelToken(CancelToken &&) = delete;
CancelToken & operator=(const CancelToken &) = delete;
~CancelToken() = default;
static CancelToken & local()
{
static CancelToken token;
return token;
}
bool wait(UInt32 *, UInt32) { return true; }
[[noreturn]] void raise();
static void notifyOne(UInt32 *) {}
static void notifyAll(UInt32 *) {}
static void signal(UInt64) {}
static void signal(UInt64, int, const String &) {}
};
using CancellableSharedMutex = std::shared_mutex;
using FastSharedMutex = std::shared_mutex;
}
#endif

View File

@ -13,7 +13,7 @@
#include <Common/randomSeed.h>
#include <Common/ThreadPool.h>
#include <Common/Threading.h>
#include <Common/SharedMutex.h>
#include <Common/CurrentMetrics.h>
#include <Dictionaries/IDictionary.h>
@ -206,7 +206,7 @@ private:
/// This lock is used for the inner cache state update function lock it for
/// write, when it need to update cache state all other functions just
/// readers. Surprisingly this lock is also used for last_exception pointer.
mutable DB::FastSharedMutex rw_lock;
mutable SharedMutex rw_lock;
mutable std::exception_ptr last_exception;
mutable std::atomic<size_t> error_count {0};

View File

@ -7,7 +7,6 @@
#include <functional>
#include <Common/ThreadPool.h>
#include <Common/Threading.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentMetrics.h>
#include <Common/PODArray.h>

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SharedMutex.h>
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/ObjectStorages/MetadataFromDiskTransactionState.h>
@ -15,7 +16,7 @@ class FakeMetadataStorageFromDisk final : public IMetadataStorage
private:
friend class FakeMetadataStorageFromDiskTransaction;
mutable DB::FastSharedMutex metadata_mutex;
mutable SharedMutex metadata_mutex;
DiskPtr disk;
ObjectStoragePtr object_storage;

View File

@ -83,7 +83,7 @@ std::string MetadataStorageFromDisk::readInlineDataToString(const std::string &
return readMetadata(path)->getInlineData();
}
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<DB::FastSharedMutex> &) const
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<SharedMutex> &) const
{
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), object_storage_root_path, path);
auto str = readFileToString(path);
@ -91,7 +91,7 @@ DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const
return metadata;
}
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock<DB::FastSharedMutex> &) const
DiskObjectStorageMetadataPtr MetadataStorageFromDisk::readMetadataUnlocked(const std::string & path, std::unique_lock<SharedMutex> &) const
{
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), object_storage_root_path, path);
auto str = readFileToString(path);

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SharedMutex.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/IDisk.h>
@ -17,7 +18,7 @@ class MetadataStorageFromDisk final : public IMetadataStorage
private:
friend class MetadataStorageFromDiskTransaction;
mutable DB::FastSharedMutex metadata_mutex;
mutable SharedMutex metadata_mutex;
DiskPtr disk;
std::string object_storage_root_path;
@ -67,8 +68,8 @@ public:
DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const;
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock<DB::FastSharedMutex> & lock) const;
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock<DB::FastSharedMutex> & lock) const;
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::unique_lock<SharedMutex> & lock) const;
DiskObjectStorageMetadataPtr readMetadataUnlocked(const std::string & path, std::shared_lock<SharedMutex> & lock) const;
};
class MetadataStorageFromDiskTransaction final : public IMetadataTransaction

View File

@ -26,7 +26,7 @@ SetLastModifiedOperation::SetLastModifiedOperation(const std::string & path_, Po
{
}
void SetLastModifiedOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void SetLastModifiedOperation::execute(std::unique_lock<SharedMutex> &)
{
old_timestamp = disk.getLastModified(path);
disk.setLastModified(path, new_timestamp);
@ -44,7 +44,7 @@ ChmodOperation::ChmodOperation(const std::string & path_, mode_t mode_, IDisk &
{
}
void ChmodOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void ChmodOperation::execute(std::unique_lock<SharedMutex> &)
{
old_mode = disk.stat(path).st_mode;
disk.chmod(path, mode);
@ -61,7 +61,7 @@ UnlinkFileOperation::UnlinkFileOperation(const std::string & path_, IDisk & disk
{
}
void UnlinkFileOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void UnlinkFileOperation::execute(std::unique_lock<SharedMutex> &)
{
auto buf = disk.readFile(path, ReadSettings{}, std::nullopt, disk.getFileSize(path));
readStringUntilEOF(prev_data, *buf);
@ -81,7 +81,7 @@ CreateDirectoryOperation::CreateDirectoryOperation(const std::string & path_, ID
{
}
void CreateDirectoryOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void CreateDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
{
disk.createDirectory(path);
}
@ -97,7 +97,7 @@ CreateDirectoryRecursiveOperation::CreateDirectoryRecursiveOperation(const std::
{
}
void CreateDirectoryRecursiveOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void CreateDirectoryRecursiveOperation::execute(std::unique_lock<SharedMutex> &)
{
namespace fs = std::filesystem;
fs::path p(path);
@ -124,7 +124,7 @@ RemoveDirectoryOperation::RemoveDirectoryOperation(const std::string & path_, ID
{
}
void RemoveDirectoryOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void RemoveDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
{
disk.removeDirectory(path);
}
@ -141,7 +141,7 @@ RemoveRecursiveOperation::RemoveRecursiveOperation(const std::string & path_, ID
{
}
void RemoveRecursiveOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void RemoveRecursiveOperation::execute(std::unique_lock<SharedMutex> &)
{
if (disk.isFile(path))
disk.moveFile(path, temp_path);
@ -174,7 +174,7 @@ CreateHardlinkOperation::CreateHardlinkOperation(const std::string & path_from_,
{
}
void CreateHardlinkOperation::execute(std::unique_lock<DB::FastSharedMutex> & lock)
void CreateHardlinkOperation::execute(std::unique_lock<SharedMutex> & lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path_from, lock);
@ -201,7 +201,7 @@ MoveFileOperation::MoveFileOperation(const std::string & path_from_, const std::
{
}
void MoveFileOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void MoveFileOperation::execute(std::unique_lock<SharedMutex> &)
{
disk.moveFile(path_from, path_to);
}
@ -218,7 +218,7 @@ MoveDirectoryOperation::MoveDirectoryOperation(const std::string & path_from_, c
{
}
void MoveDirectoryOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void MoveDirectoryOperation::execute(std::unique_lock<SharedMutex> &)
{
disk.moveDirectory(path_from, path_to);
}
@ -236,7 +236,7 @@ ReplaceFileOperation::ReplaceFileOperation(const std::string & path_from_, const
{
}
void ReplaceFileOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void ReplaceFileOperation::execute(std::unique_lock<SharedMutex> &)
{
if (disk.exists(path_to))
disk.moveFile(path_to, temp_path_to);
@ -262,7 +262,7 @@ WriteFileOperation::WriteFileOperation(const std::string & path_, IDisk & disk_,
{
}
void WriteFileOperation::execute(std::unique_lock<DB::FastSharedMutex> &)
void WriteFileOperation::execute(std::unique_lock<SharedMutex> &)
{
if (disk.exists(path))
{
@ -288,7 +288,7 @@ void WriteFileOperation::undo()
}
}
void AddBlobOperation::execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock)
void AddBlobOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
DiskObjectStorageMetadataPtr metadata;
if (metadata_storage.exists(path))
@ -309,7 +309,7 @@ void AddBlobOperation::undo()
write_operation->undo();
}
void UnlinkMetadataFileOperation::execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock)
void UnlinkMetadataFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
uint32_t ref_count = metadata->getRefCount();
@ -336,7 +336,7 @@ void UnlinkMetadataFileOperation::undo()
write_operation->undo();
}
void SetReadonlyFileOperation::execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock)
void SetReadonlyFileOperation::execute(std::unique_lock<SharedMutex> & metadata_lock)
{
auto metadata = metadata_storage.readMetadataUnlocked(path, metadata_lock);
metadata->setReadOnly();

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SharedMutex.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
namespace DB
@ -13,7 +14,7 @@ class IDisk;
struct IMetadataOperation
{
virtual void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) = 0;
virtual void execute(std::unique_lock<SharedMutex> & metadata_lock) = 0;
virtual void undo() = 0;
virtual void finalize() {}
virtual ~IMetadataOperation() = default;
@ -26,7 +27,7 @@ struct SetLastModifiedOperation final : public IMetadataOperation
{
SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -41,7 +42,7 @@ struct ChmodOperation final : public IMetadataOperation
{
ChmodOperation(const std::string & path_, mode_t mode_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -57,7 +58,7 @@ struct UnlinkFileOperation final : public IMetadataOperation
{
UnlinkFileOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -72,7 +73,7 @@ struct CreateDirectoryOperation final : public IMetadataOperation
{
CreateDirectoryOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -86,7 +87,7 @@ struct CreateDirectoryRecursiveOperation final : public IMetadataOperation
{
CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -101,7 +102,7 @@ struct RemoveDirectoryOperation final : public IMetadataOperation
{
RemoveDirectoryOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -114,7 +115,7 @@ struct RemoveRecursiveOperation final : public IMetadataOperation
{
RemoveRecursiveOperation(const std::string & path_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -130,7 +131,7 @@ struct WriteFileOperation final : public IMetadataOperation
{
WriteFileOperation(const std::string & path_, IDisk & disk_, const std::string & data_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
private:
@ -149,7 +150,7 @@ struct CreateHardlinkOperation final : public IMetadataOperation
IDisk & disk_,
const MetadataStorageFromDisk & metadata_storage_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -166,7 +167,7 @@ struct MoveFileOperation final : public IMetadataOperation
{
MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -181,7 +182,7 @@ struct MoveDirectoryOperation final : public IMetadataOperation
{
MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -196,7 +197,7 @@ struct ReplaceFileOperation final : public IMetadataOperation
{
ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_);
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -226,7 +227,7 @@ struct AddBlobOperation final : public IMetadataOperation
, metadata_storage(metadata_storage_)
{}
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -254,7 +255,7 @@ struct UnlinkMetadataFileOperation final : public IMetadataOperation
{
}
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;
@ -279,7 +280,7 @@ struct SetReadonlyFileOperation final : public IMetadataOperation
{
}
void execute(std::unique_lock<DB::FastSharedMutex> & metadata_lock) override;
void execute(std::unique_lock<SharedMutex> & metadata_lock) override;
void undo() override;

View File

@ -729,7 +729,7 @@ DDLGuardPtr DatabaseCatalog::getDDLGuard(const String & database, const String &
return std::make_unique<DDLGuard>(db_guard.first, db_guard.second, std::move(lock), table, database);
}
std::unique_lock<DB::FastSharedMutex> DatabaseCatalog::getExclusiveDDLGuardForDatabase(const String & database)
std::unique_lock<SharedMutex> DatabaseCatalog::getExclusiveDDLGuardForDatabase(const String & database)
{
DDLGuards::iterator db_guard_iter;
{
@ -1282,7 +1282,7 @@ TemporaryLockForUUIDDirectory & TemporaryLockForUUIDDirectory::operator = (Tempo
}
DDLGuard::DDLGuard(Map & map_, DB::FastSharedMutex & db_mutex_, std::unique_lock<std::mutex> guards_lock_, const String & elem, const String & database_name)
DDLGuard::DDLGuard(Map & map_, SharedMutex & db_mutex_, std::unique_lock<std::mutex> guards_lock_, const String & elem, const String & database_name)
: map(map_), db_mutex(db_mutex_), guards_lock(std::move(guards_lock_))
{
it = map.emplace(elem, Entry{std::make_unique<std::mutex>(), 0}).first;

View File

@ -6,7 +6,7 @@
#include <Databases/TablesDependencyGraph.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Common/Threading.h>
#include <Common/SharedMutex.h>
#include <boost/noncopyable.hpp>
#include <Poco/Logger.h>
@ -58,7 +58,7 @@ public:
DDLGuard(
Map & map_,
DB::FastSharedMutex & db_mutex_,
SharedMutex & db_mutex_,
std::unique_lock<std::mutex> guards_lock_,
const String & elem,
const String & database_name);
@ -69,7 +69,7 @@ public:
private:
Map & map;
DB::FastSharedMutex & db_mutex;
SharedMutex & db_mutex;
Map::iterator it;
std::unique_lock<std::mutex> guards_lock;
std::unique_lock<std::mutex> table_lock;
@ -142,7 +142,7 @@ public:
/// Get an object that protects the table from concurrently executing multiple DDL operations.
DDLGuardPtr getDDLGuard(const String & database, const String & table);
/// Get an object that protects the database from concurrent DDL queries all tables in the database
std::unique_lock<DB::FastSharedMutex> getExclusiveDDLGuardForDatabase(const String & database);
std::unique_lock<SharedMutex> getExclusiveDDLGuardForDatabase(const String & database);
void assertDatabaseExists(const String & database_name) const;
@ -298,7 +298,7 @@ private:
/// For the duration of the operation, an element is placed here, and an object is returned,
/// which deletes the element in the destructor when counter becomes zero.
/// In case the element already exists, waits when query will be executed in other thread. See class DDLGuard below.
using DatabaseGuard = std::pair<DDLGuard::Map, DB::FastSharedMutex>;
using DatabaseGuard = std::pair<DDLGuard::Map, SharedMutex>;
using DDLGuards = std::map<String, DatabaseGuard>;
DDLGuards ddl_guards TSA_GUARDED_BY(ddl_guards_mutex);
/// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order.

View File

@ -7,7 +7,7 @@
#include <Core/Block.h>
#include <Common/MultiVersion.h>
#include <Common/Threading.h>
#include <Common/SharedMutex.h>
#include <mutex>
@ -131,7 +131,7 @@ private:
TemporaryDataOnDiskPtr tmp_data;
Buckets buckets;
mutable DB::FastSharedMutex rehash_mutex;
mutable SharedMutex rehash_mutex;
FileBucket * current_bucket = nullptr;
mutable std::mutex current_bucket_mutex;

View File

@ -7,7 +7,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Common/ActionBlocker.h>
#include <Common/Threading.h>
#include <Common/SharedMutex.h>
#include <base/types.h>
#include <atomic>
@ -43,7 +43,7 @@ public:
/// You need to stop the data transfer if blocker is activated.
ActionBlocker blocker;
DB::FastSharedMutex rwlock;
SharedMutex rwlock;
};
using InterserverIOEndpointPtr = std::shared_ptr<InterserverIOEndpoint>;

View File

@ -1,6 +1,6 @@
#pragma once
#include <Common/Threading.h>
#include <Common/SharedMutex.h>
#include <Common/CacheBase.h>
#include <Core/Block.h>
#include <Core/SortDescription.h>
@ -71,7 +71,7 @@ private:
using Cache = CacheBase<size_t, Block, std::hash<size_t>, BlockByteWeight>;
mutable DB::FastSharedMutex rwlock;
mutable SharedMutex rwlock;
std::shared_ptr<TableJoin> table_join;
SizeLimits size_limits;
SortDescription left_sort_description;

View File

@ -19,7 +19,6 @@
#include <Common/Stopwatch.h>
#include <Common/Throttler.h>
#include <Common/OvercommitTracker.h>
#include <Common/Threading.h>
#include <condition_variable>
#include <list>

View File

@ -7,7 +7,7 @@
#include <Parsers/IAST.h>
#include <Storages/MergeTree/BoolMask.h>
#include <Common/Threading.h>
#include <Common/SharedMutex.h>
#include <Common/logger_useful.h>
@ -131,7 +131,7 @@ private:
/** Protects work with the set in the functions `insertFromBlock` and `execute`.
* These functions can be called simultaneously from different threads only when using StorageSet,
*/
mutable DB::FastSharedMutex rwlock;
mutable SharedMutex rwlock;
template <typename Method>
void insertFromBlockImpl(

View File

@ -17,7 +17,6 @@
#include <Common/Exception.h>
#include <Common/RWLock.h>
#include <Common/TypePromotion.h>
#include <Common/Threading.h>
#include <optional>
#include <compare>

View File

@ -1,6 +1,5 @@
#pragma once
#include <Common/Threading.h>
#include <IO/WriteSettings.h>
#include <Core/Block.h>
#include <base/types.h>

View File

@ -2,6 +2,7 @@
#include <base/defines.h>
#include <Common/SimpleIncrement.h>
#include <Common/SharedMutex.h>
#include <Common/MultiVersion.h>
#include <Storages/IStorage.h>
#include <IO/ReadBufferFromString.h>
@ -1088,7 +1089,7 @@ protected:
MultiVersion<MergeTreeSettings> storage_settings;
/// Used to determine which UUIDs to send to root query executor for deduplication.
mutable DB::FastSharedMutex pinned_part_uuids_mutex;
mutable SharedMutex pinned_part_uuids_mutex;
PinnedPartUUIDsPtr pinned_part_uuids;
/// True if at least one part was created/removed with transaction.

View File

@ -25,7 +25,6 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <Common/Threading.h>
#include <base/sort.h>
#include <rocksdb/table.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <memory>
#include <Common/Threading.h>
#include <Common/SharedMutex.h>
#include <Storages/IStorage.h>
#include <Interpreters/IKeyValueEntity.h>
#include <rocksdb/status.h>
@ -86,7 +86,7 @@ private:
const String primary_key;
using RocksDBPtr = std::unique_ptr<rocksdb::DB>;
RocksDBPtr rocksdb_ptr;
mutable DB::FastSharedMutex rocksdb_ptr_mx;
mutable SharedMutex rocksdb_ptr_mx;
String rocksdb_dir;
Int32 ttl;
bool read_only;

View File

@ -49,6 +49,7 @@
#include <unistd.h>
#include <re2/re2.h>
#include <filesystem>
#include <shared_mutex>
namespace ProfileEvents

View File

@ -3,10 +3,10 @@
#include <Storages/IStorage.h>
#include <Storages/Cache/SchemaCache.h>
#include <Common/Threading.h>
#include <Common/logger_useful.h>
#include <atomic>
#include <shared_mutex>
namespace DB

View File

@ -1,14 +1,15 @@
#pragma once
#include <map>
#include <shared_mutex>
#include <Disks/IDisk.h>
#include <Storages/IStorage.h>
#include <Common/FileChecker.h>
#include <Common/Threading.h>
#include <Common/escapeForFileName.h>
#include <Core/NamesAndTypes.h>
namespace DB
{

View File

@ -1,11 +1,11 @@
#pragma once
#include <map>
#include <shared_mutex>
#include <Core/Defines.h>
#include <Storages/IStorage.h>
#include <Formats/IndexForNativeFormat.h>
#include <Common/Threading.h>
#include <Common/FileChecker.h>
#include <Common/escapeForFileName.h>
#include <Disks/IDisk.h>

View File

@ -1443,7 +1443,7 @@ void StorageWindowView::writeIntoWindowView(
});
}
std::shared_lock<DB::FastSharedMutex> fire_signal_lock;
std::shared_lock<SharedMutex> fire_signal_lock;
QueryPipelineBuilder builder;
if (window_view.is_proctime)
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/SharedMutex.h>
#include <Core/BackgroundSchedulePool.h>
#include <DataTypes/DataTypeInterval.h>
#include <Interpreters/InterpreterSelectQuery.h>
@ -213,7 +214,7 @@ private:
/// Mutex for the blocks and ready condition
std::mutex mutex;
DB::FastSharedMutex fire_signal_mutex;
SharedMutex fire_signal_mutex;
mutable std::mutex sample_block_lock; /// Mutex to protect access to sample block
IntervalKind::Kind window_kind;