Merge pull request #38068 from ClickHouse/clang-tsa

Support for Clang Thread Safety Analysis (TSA)
This commit is contained in:
Robert Schulze 2022-06-21 20:19:33 +02:00 committed by GitHub
commit 0d80874d40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 127 additions and 75 deletions

View File

@ -7,6 +7,7 @@
#include <replxx.hxx>
#include <base/types.h>
#include <base/defines.h>
class LineReader
{
@ -20,8 +21,8 @@ public:
void addWords(Words && new_words);
private:
Words words;
Words words_no_case;
Words words TSA_GUARDED_BY(mutex);
Words words_no_case TSA_GUARDED_BY(mutex);
std::mutex mutex;
};
@ -29,7 +30,7 @@ public:
using Patterns = std::vector<const char *>;
LineReader(const String & history_file_path, bool multiline, Patterns extenders, Patterns delimiters);
virtual ~LineReader() {}
virtual ~LineReader() = default;
/// Reads the whole line until delimiter (in multiline mode) or until the last line without extender.
/// If resulting line is empty, it means the user interrupted the input.

View File

@ -124,6 +124,23 @@
#endif
#endif
// Macros for Clang Thread Safety Analysis (TSA). They can be safely ignored by other compilers.
// Feel free to extend, but please stay close to https://clang.llvm.org/docs/ThreadSafetyAnalysis.html#mutexheader
#if defined(__clang__)
# define TSA_GUARDED_BY(...) __attribute__((guarded_by(__VA_ARGS__))) // data is protected by given capability
# define TSA_PT_GUARDED_BY(...) __attribute__((pt_guarded_by(__VA_ARGS__))) // pointed-to data is protected by the given capability
# define TSA_REQUIRES(...) __attribute__((requires_capability(__VA_ARGS__))) // thread needs exclusive possession of given capability
# define TSA_REQUIRES_SHARED(...) __attribute__((requires_shared_capability(__VA_ARGS__))) // thread needs shared possession of given capability
# define TSA_ACQUIRED_AFTER(...) __attribute__((acquired_after(__VA_ARGS__))) // annotated lock must be locked after given lock
# define TSA_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis)) // disable TSA for a function
#else
# define TSA_GUARDED_BY(...)
# define TSA_PT_GUARDED_BY(...)
# define TSA_REQUIRES(...)
# define TSA_REQUIRES_SHARED(...)
# define TSA_NO_THREAD_SAFETY_ANALYSIS
#endif
/// A template function for suppressing warnings about unused variables or function results.
template <typename... Args>
constexpr void UNUSED(Args &&... args [[maybe_unused]])

View File

@ -19,7 +19,6 @@ if (COMPILER_CLANG)
# Add some warnings that are not available even with -Wall -Wextra -Wpedantic.
# We want to get everything out of the compiler for code quality.
add_warning(everything)
add_warning(pedantic)
no_warning(vla-extension)
no_warning(zero-length-array)
@ -51,6 +50,7 @@ if (COMPILER_CLANG)
no_warning(vla)
no_warning(weak-template-vtables)
no_warning(weak-vtables)
no_warning(thread-safety-negative) # experimental flag, too many false positives
# TODO Enable conversion, sign-conversion, double-promotion warnings.
elseif (COMPILER_GCC)
# Add compiler options only to c++ compiler

View File

@ -78,6 +78,9 @@ target_compile_options(cxx PUBLIC $<$<COMPILE_LANGUAGE:CXX>:-nostdinc++>)
# Third party library may have substandard code.
target_compile_options(cxx PRIVATE -w)
# Enable support for Clang-Thread-Safety-Analysis in libcxx
target_compile_definitions(cxx PUBLIC -D_LIBCPP_ENABLE_THREAD_SAFETY_ANNOTATIONS)
target_link_libraries(cxx PUBLIC cxxabi)
# For __udivmodti4, __divmodti4.

View File

@ -3,6 +3,7 @@
#include <mutex>
#include <Poco/Util/Application.h>
#include <base/defines.h>
namespace DB
{
@ -24,9 +25,9 @@ public:
private:
mutable std::mutex keeper_dispatcher_mutex;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
mutable std::shared_ptr<KeeperDispatcher> keeper_dispatcher TSA_GUARDED_BY(keeper_dispatcher_mutex);
ConfigurationPtr config;
ConfigurationPtr config TSA_GUARDED_BY(keeper_dispatcher_mutex);
};
}

View File

@ -1,6 +1,8 @@
#pragma once
#include "SharedLibraryHandler.h"
#include <base/defines.h>
#include <unordered_map>
#include <mutex>
@ -30,7 +32,7 @@ public:
private:
/// map: dict_id -> sharedLibraryHandler
std::unordered_map<std::string, SharedLibraryHandlerPtr> library_handlers;
std::unordered_map<std::string, SharedLibraryHandlerPtr> library_handlers TSA_GUARDED_BY(mutex);
std::mutex mutex;
};

View File

@ -4,6 +4,7 @@
#include <nanodbc/nanodbc.h>
#include <mutex>
#include <base/BorrowedObjectPool.h>
#include <base/defines.h>
#include <unordered_map>
@ -165,7 +166,7 @@ public:
private:
/// [connection_settings_string] -> [connection_pool]
using PoolFactory = std::unordered_map<std::string, nanodbc::PoolPtr>;
PoolFactory factory;
PoolFactory factory TSA_GUARDED_BY(mutex);
std::mutex mutex;
};

View File

@ -19,6 +19,7 @@
#include <Backups/BackupEntriesCollector.h>
#include <Backups/RestorerFromBackup.h>
#include <Core/Settings.h>
#include <base/defines.h>
#include <base/find_symbols.h>
#include <Poco/AccessExpireCache.h>
#include <boost/algorithm/string/join.hpp>
@ -133,7 +134,7 @@ public:
}
private:
Strings registered_prefixes;
Strings registered_prefixes TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
};

View File

@ -1,6 +1,7 @@
#pragma once
#include <Access/SettingsProfileElement.h>
#include <base/defines.h>
#include <Core/UUID.h>
#include <boost/container/flat_set.hpp>
#include <mutex>
@ -42,7 +43,7 @@ private:
void setInfo(const std::shared_ptr<const SettingsProfilesInfo> & info_);
const Params params;
std::shared_ptr<const SettingsProfilesInfo> info;
std::shared_ptr<const SettingsProfilesInfo> info TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
};
}

View File

@ -231,18 +231,23 @@ void parseLDAPRoleSearchParams(LDAPClient::RoleSearchParams & params, const Poco
params.prefix = config.getString(prefix + ".prefix");
}
void ExternalAuthenticators::reset()
void ExternalAuthenticators::resetImpl()
{
std::scoped_lock lock(mutex);
ldap_client_params_blueprint.clear();
ldap_caches.clear();
kerberos_params.reset();
}
void ExternalAuthenticators::reset()
{
std::scoped_lock lock(mutex);
resetImpl();
}
void ExternalAuthenticators::setConfiguration(const Poco::Util::AbstractConfiguration & config, Poco::Logger * log)
{
std::scoped_lock lock(mutex);
reset();
resetImpl();
Poco::Util::AbstractConfiguration::Keys all_keys;
config.keys("", all_keys);

View File

@ -3,6 +3,7 @@
#include <Access/LDAPClient.h>
#include <Access/Credentials.h>
#include <Access/GSSAcceptor.h>
#include <base/defines.h>
#include <base/types.h>
#include <chrono>
@ -22,7 +23,6 @@ namespace Poco
}
}
namespace DB
{
@ -51,10 +51,12 @@ private:
using LDAPCaches = std::map<String, LDAPCache>; // server name -> cache
using LDAPParams = std::map<String, LDAPClient::Params>; // server name -> params
mutable std::recursive_mutex mutex;
LDAPParams ldap_client_params_blueprint;
mutable LDAPCaches ldap_caches;
std::optional<GSSAcceptorContext::Params> kerberos_params;
mutable std::mutex mutex;
LDAPParams ldap_client_params_blueprint TSA_GUARDED_BY(mutex) ;
mutable LDAPCaches ldap_caches TSA_GUARDED_BY(mutex) ;
std::optional<GSSAcceptorContext::Params> kerberos_params TSA_GUARDED_BY(mutex) ;
void resetImpl() TSA_REQUIRES(mutex);
};
void parseLDAPRoleSearchParams(LDAPClient::RoleSearchParams & params, const Poco::Util::AbstractConfiguration & config, const String & prefix);

View File

@ -36,6 +36,7 @@ LDAPAccessStorage::LDAPAccessStorage(const String & storage_name_, AccessControl
String LDAPAccessStorage::getLDAPServerName() const
{
std::scoped_lock lock(mutex);
return ldap_server_name;
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Access/IAccessStorage.h>
#include <base/defines.h>
#include <list>
#include <memory>
#include <mutex>
@ -39,9 +40,9 @@ private:
bool updateImpl(const UUID & id, const UpdateFunc & update_func, bool throw_if_not_exists) override;
bool insertWithID(const UUID & id, const AccessEntityPtr & new_entity, bool replace_if_exists, bool throw_if_exists);
bool insertNoLock(const UUID & id, const AccessEntityPtr & entity, bool replace_if_exists, bool throw_if_exists);
bool removeNoLock(const UUID & id, bool throw_if_not_exists);
bool updateNoLock(const UUID & id, const UpdateFunc & update_func, bool throw_if_not_exists);
bool insertNoLock(const UUID & id, const AccessEntityPtr & entity, bool replace_if_exists, bool throw_if_exists) TSA_REQUIRES(mutex);
bool removeNoLock(const UUID & id, bool throw_if_not_exists) TSA_REQUIRES(mutex);
bool updateNoLock(const UUID & id, const UpdateFunc & update_func, bool throw_if_not_exists) TSA_REQUIRES(mutex);
struct Entry
{
@ -50,8 +51,8 @@ private:
};
mutable std::mutex mutex;
std::unordered_map<UUID, Entry> entries_by_id; /// We want to search entries both by ID and by the pair of name and type.
std::unordered_map<String, Entry *> entries_by_name_and_type[static_cast<size_t>(AccessEntityType::MAX)];
std::unordered_map<UUID, Entry> entries_by_id TSA_GUARDED_BY(mutex); /// We want to search entries both by ID and by the pair of name and type.
std::unordered_map<String, Entry *> entries_by_name_and_type[static_cast<size_t>(AccessEntityType::MAX)] TSA_GUARDED_BY(mutex);
AccessChangesNotifier & changes_notifier;
bool backup_allowed = false;
};

View File

@ -43,14 +43,14 @@ MultipleAccessStorage::~MultipleAccessStorage()
void MultipleAccessStorage::setStorages(const std::vector<StoragePtr> & storages)
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
nested_storages = std::make_shared<const Storages>(storages);
ids_cache.reset();
}
void MultipleAccessStorage::addStorage(const StoragePtr & new_storage)
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
if (boost::range::find(*nested_storages, new_storage) != nested_storages->end())
return;
auto new_storages = std::make_shared<Storages>(*nested_storages);
@ -60,7 +60,7 @@ void MultipleAccessStorage::addStorage(const StoragePtr & new_storage)
void MultipleAccessStorage::removeStorage(const StoragePtr & storage_to_remove)
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
auto it = boost::range::find(*nested_storages, storage_to_remove);
if (it == nested_storages->end())
return;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Access/IAccessStorage.h>
#include <base/defines.h>
#include <Common/LRUCache.h>
#include <mutex>
@ -61,8 +62,8 @@ private:
using Storages = std::vector<StoragePtr>;
std::shared_ptr<const Storages> getStoragesInternal() const;
std::shared_ptr<const Storages> nested_storages;
mutable LRUCache<UUID, Storage> ids_cache;
std::shared_ptr<const Storages> nested_storages TSA_GUARDED_BY(mutex);
mutable LRUCache<UUID, Storage> ids_cache TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
};

View File

@ -6,6 +6,7 @@
#include <mutex>
#include <unordered_map>
#include <base/defines.h>
#include <base/scope_guard.h>
#include <Common/ThreadPool.h>
@ -70,10 +71,10 @@ private:
bool refresh();
void refreshEntities(const zkutil::ZooKeeperPtr & zookeeper);
void refreshEntity(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id);
void refreshEntityNoLock(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id);
void refreshEntityNoLock(const zkutil::ZooKeeperPtr & zookeeper, const UUID & id) TSA_REQUIRES(mutex);
void setEntityNoLock(const UUID & id, const AccessEntityPtr & entity);
void removeEntityNoLock(const UUID & id);
void setEntityNoLock(const UUID & id, const AccessEntityPtr & entity) TSA_REQUIRES(mutex);
void removeEntityNoLock(const UUID & id) TSA_REQUIRES(mutex);
struct Entry
{
@ -86,8 +87,8 @@ private:
AccessEntityPtr readImpl(const UUID & id, bool throw_if_not_exists) const override;
mutable std::mutex mutex;
std::unordered_map<UUID, Entry> entries_by_id;
std::unordered_map<String, Entry *> entries_by_name_and_type[static_cast<size_t>(AccessEntityType::MAX)];
std::unordered_map<UUID, Entry> entries_by_id TSA_GUARDED_BY(mutex);
std::unordered_map<String, Entry *> entries_by_name_and_type[static_cast<size_t>(AccessEntityType::MAX)] TSA_GUARDED_BY(mutex);
AccessChangesNotifier & changes_notifier;
bool backup_allowed = false;
};

View File

@ -2,6 +2,7 @@
#include <Backups/IBackupCoordination.h>
#include <Backups/BackupCoordinationHelpers.h>
#include <base/defines.h>
#include <map>
#include <mutex>
@ -44,12 +45,12 @@ public:
private:
mutable std::mutex mutex;
BackupCoordinationReplicatedPartNames replicated_part_names;
std::unordered_map<String, Strings> replicated_data_paths;
std::map<String /* file_name */, SizeAndChecksum> file_names; /// Should be ordered alphabetically, see listFiles(). For empty files we assume checksum = 0.
std::map<SizeAndChecksum, FileInfo> file_infos; /// Information about files. Without empty files.
Strings archive_suffixes;
size_t current_archive_suffix = 0;
BackupCoordinationReplicatedPartNames replicated_part_names TSA_GUARDED_BY(mutex);
std::unordered_map<String, Strings> replicated_data_paths TSA_GUARDED_BY(mutex);
std::map<String /* file_name */, SizeAndChecksum> file_names TSA_GUARDED_BY(mutex); /// Should be ordered alphabetically, see listFiles(). For empty files we assume checksum = 0.
std::map<SizeAndChecksum, FileInfo> file_infos TSA_GUARDED_BY(mutex); /// Information about files. Without empty files.
Strings archive_suffixes TSA_GUARDED_BY(mutex);
size_t current_archive_suffix TSA_GUARDED_BY(mutex) = 0;
};

View File

@ -1,6 +1,7 @@
#pragma once
#include <Backups/IBackupEntry.h>
#include <base/defines.h>
#include <mutex>
namespace Poco { class TemporaryFile; }
@ -41,7 +42,7 @@ public:
private:
const DiskPtr disk;
const String file_path;
mutable std::optional<UInt64> file_size;
mutable std::optional<UInt64> file_size TSA_GUARDED_BY(get_file_size_mutex);
mutable std::mutex get_file_size_mutex;
const std::optional<UInt128> checksum;
const std::shared_ptr<Poco::TemporaryFile> temporary_file;

View File

@ -22,7 +22,7 @@ ConnectionPoolPtr ConnectionPoolFactory::get(
Key key{
max_connections, host, port, default_database, user, password, cluster, cluster_secret, client_name, compression, secure, priority};
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
auto [it, inserted] = pools.emplace(key, ConnectionPoolPtr{});
if (!inserted)
if (auto res = it->second.lock())

View File

@ -4,6 +4,7 @@
#include <Client/Connection.h>
#include <IO/ConnectionTimeouts.h>
#include <Core/Settings.h>
#include <base/defines.h>
namespace DB
{
@ -179,7 +180,7 @@ public:
private:
mutable std::mutex mutex;
using ConnectionPoolWeakPtr = std::weak_ptr<IConnectionPool>;
std::unordered_map<Key, ConnectionPoolWeakPtr, KeyHash> pools;
std::unordered_map<Key, ConnectionPoolWeakPtr, KeyHash> pools TSA_GUARDED_BY(mutex);
};
inline bool operator==(const ConnectionPoolFactory::Key & lhs, const ConnectionPoolFactory::Key & rhs)

View File

@ -6,6 +6,7 @@
#include <mutex>
#include <string_view>
#include <vector>
#include <base/defines.h>
#include <base/types.h>
/** Allows to count number of simultaneously happening error codes.
@ -57,7 +58,7 @@ namespace ErrorCodes
void increment(bool remote, const std::string & message, const FramePointers & trace);
private:
ErrorPair value;
ErrorPair value TSA_GUARDED_BY(mutex);
std::mutex mutex;
};

View File

@ -8,6 +8,7 @@
#include <atomic>
#include <Common/logger_useful.h>
#include <base/defines.h>
namespace DB
@ -48,7 +49,7 @@ public:
{
std::lock_guard lock(mutex);
auto res = getImpl(key, lock);
auto res = getImpl(key);
if (res)
++hits;
else
@ -61,7 +62,7 @@ public:
{
std::lock_guard lock(mutex);
setImpl(key, mapped, lock);
setImpl(key, mapped);
}
void remove(const Key & key)
@ -91,7 +92,7 @@ public:
{
std::lock_guard cache_lock(mutex);
auto val = getImpl(key, cache_lock);
auto val = getImpl(key);
if (val)
{
++hits;
@ -129,7 +130,7 @@ public:
auto token_it = insert_tokens.find(key);
if (token_it != insert_tokens.end() && token_it->second.get() == token)
{
setImpl(key, token->value, cache_lock);
setImpl(key, token->value);
result = true;
}
@ -189,7 +190,7 @@ protected:
using Cells = std::unordered_map<Key, Cell, HashFunction>;
Cells cells;
Cells cells TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
private:
@ -200,8 +201,8 @@ private:
explicit InsertToken(LRUCache & cache_) : cache(cache_) {}
std::mutex mutex;
bool cleaned_up = false; /// Protected by the token mutex
MappedPtr value; /// Protected by the token mutex
bool cleaned_up TSA_GUARDED_BY(mutex) = false;
MappedPtr value TSA_GUARDED_BY(mutex);
LRUCache & cache;
size_t refcount = 0; /// Protected by the cache mutex
@ -221,6 +222,7 @@ private:
InsertTokenHolder() = default;
void acquire(const Key * key_, const std::shared_ptr<InsertToken> & token_, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
TSA_NO_THREAD_SAFETY_ANALYSIS // disabled only because we can't reference the parent-level cache mutex from here
{
key = key_;
token = token_;
@ -228,6 +230,7 @@ private:
}
void cleanup([[maybe_unused]] std::lock_guard<std::mutex> & token_lock, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
TSA_NO_THREAD_SAFETY_ANALYSIS // disabled only because we can't reference the parent-level cache mutex from here
{
token->cache.insert_tokens.erase(*key);
token->cleaned_up = true;
@ -258,21 +261,21 @@ private:
friend struct InsertTokenHolder;
InsertTokenById insert_tokens;
InsertTokenById insert_tokens TSA_GUARDED_BY(mutex);
LRUQueue queue;
LRUQueue queue TSA_GUARDED_BY(mutex);
/// Total weight of values.
size_t current_size = 0;
size_t current_size TSA_GUARDED_BY(mutex) = 0;
const size_t max_size;
const size_t max_elements_size;
std::atomic<size_t> hits {0};
std::atomic<size_t> misses {0};
WeightFunction weight_function;
const WeightFunction weight_function;
MappedPtr getImpl(const Key & key, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
MappedPtr getImpl(const Key & key) TSA_REQUIRES(mutex)
{
auto it = cells.find(key);
if (it == cells.end())
@ -288,7 +291,7 @@ private:
return cell.value;
}
void setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
void setImpl(const Key & key, const MappedPtr & mapped) TSA_REQUIRES(mutex)
{
auto [it, inserted] = cells.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
@ -321,7 +324,7 @@ private:
removeOverflow();
}
void removeOverflow()
void removeOverflow() TSA_REQUIRES(mutex)
{
size_t current_weight_lost = 0;
size_t queue_size = cells.size();

View File

@ -2,6 +2,7 @@
#include <mutex>
#include <memory>
#include <base/defines.h>
/** Allow to store and read-only usage of an object in several threads,
@ -51,6 +52,6 @@ public:
}
private:
Version current_version;
Version current_version TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
};

View File

@ -4,6 +4,7 @@
#include <vector>
#include <mutex>
#include <unordered_set>
#include <base/defines.h>
namespace Poco { class URI; }
@ -28,8 +29,8 @@ private:
std::atomic_bool is_initialized = false;
mutable std::mutex hosts_mutex;
std::unordered_set<std::string> primary_hosts; /// Allowed primary (<host>) URL from config.xml
std::vector<std::string> regexp_hosts; /// Allowed regexp (<hots_regexp>) URL from config.xml
std::unordered_set<std::string> primary_hosts TSA_GUARDED_BY(hosts_mutex); /// Allowed primary (<host>) URL from config.xml
std::vector<std::string> regexp_hosts TSA_GUARDED_BY(hosts_mutex); /// Allowed regexp (<hots_regexp>) URL from config.xml
/// Checks if the primary_hosts and regexp_hosts contain str. If primary_hosts and regexp_hosts are empty return true.
bool checkForDirectEntry(const std::string & str) const;

View File

@ -59,7 +59,7 @@ protected:
TLDListsHolder();
std::mutex tld_lists_map_mutex;
Map tld_lists_map;
Map tld_lists_map TSA_GUARDED_BY(tld_lists_map_mutex);
};
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <base/defines.h>
#include <base/types.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentMetrics.h>
@ -228,13 +229,13 @@ private:
using Operations = std::map<XID, RequestInfo>;
Operations operations;
Operations operations TSA_GUARDED_BY(operations_mutex);
std::mutex operations_mutex;
using WatchCallbacks = std::vector<WatchCallback>;
using Watches = std::map<String /* path, relative of root_path */, WatchCallbacks>;
Watches watches;
Watches watches TSA_GUARDED_BY(watches_mutex);
std::mutex watches_mutex;
ThreadFromGlobalPool send_thread;

View File

@ -4,6 +4,7 @@
#include <map>
#include <mutex>
#include <Core/Types.h>
#include <base/defines.h>
#include <libnuraft/log_store.hxx>
namespace DB
@ -39,7 +40,7 @@ public:
bool flush() override { return true; }
private:
std::map<uint64_t, nuraft::ptr<nuraft::log_entry>> logs;
std::map<uint64_t, nuraft::ptr<nuraft::log_entry>> logs TSA_GUARDED_BY(logs_lock);
mutable std::mutex logs_lock;
std::atomic<uint64_t> start_idx;
};

View File

@ -5,6 +5,7 @@
#include <Core/Types.h>
#include <Coordination/Changelog.h>
#include <Common/logger_useful.h>
#include <base/defines.h>
namespace DB
{
@ -70,7 +71,7 @@ public:
private:
mutable std::mutex changelog_lock;
Poco::Logger * log;
Changelog changelog;
Changelog changelog TSA_GUARDED_BY(changelog_lock);
};
}

View File

@ -1267,7 +1267,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
disk_parts.emplace_back(std::make_pair(it->name(), disk_ptr));
else if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME && settings->in_memory_parts_enable_wal)
{
std::unique_lock lock(wal_init_lock);
std::lock_guard lock(wal_init_lock);
if (write_ahead_log != nullptr)
throw Exception(
"There are multiple WAL files appeared in current storage policy. You need to resolve this manually",
@ -6602,10 +6602,10 @@ void MergeTreeData::setDataVolume(size_t bytes, size_t rows, size_t parts)
bool MergeTreeData::insertQueryIdOrThrow(const String & query_id, size_t max_queries) const
{
std::lock_guard lock(query_id_set_mutex);
return insertQueryIdOrThrowNoLock(query_id, max_queries, lock);
return insertQueryIdOrThrowNoLock(query_id, max_queries);
}
bool MergeTreeData::insertQueryIdOrThrowNoLock(const String & query_id, size_t max_queries, const std::lock_guard<std::mutex> &) const
bool MergeTreeData::insertQueryIdOrThrowNoLock(const String & query_id, size_t max_queries) const
{
if (query_id_set.find(query_id) != query_id_set.end())
return false;
@ -6619,10 +6619,10 @@ bool MergeTreeData::insertQueryIdOrThrowNoLock(const String & query_id, size_t m
void MergeTreeData::removeQueryId(const String & query_id) const
{
std::lock_guard lock(query_id_set_mutex);
removeQueryIdNoLock(query_id, lock);
removeQueryIdNoLock(query_id);
}
void MergeTreeData::removeQueryIdNoLock(const String & query_id, const std::lock_guard<std::mutex> &) const
void MergeTreeData::removeQueryIdNoLock(const String & query_id) const
{
if (query_id_set.find(query_id) == query_id_set.end())
LOG_WARNING(log, "We have query_id removed but it's not recorded. This is a bug");

View File

@ -1,5 +1,6 @@
#pragma once
#include <base/defines.h>
#include <Common/SimpleIncrement.h>
#include <Common/MultiVersion.h>
#include <Storages/IStorage.h>
@ -921,11 +922,11 @@ public:
/// Record current query id where querying the table. Throw if there are already `max_queries` queries accessing the same table.
/// Returns false if the `query_id` already exists in the running set, otherwise return true.
bool insertQueryIdOrThrow(const String & query_id, size_t max_queries) const;
bool insertQueryIdOrThrowNoLock(const String & query_id, size_t max_queries, const std::lock_guard<std::mutex> &) const;
bool insertQueryIdOrThrowNoLock(const String & query_id, size_t max_queries) const TSA_REQUIRES(query_id_set_mutex);
/// Remove current query id after query finished.
void removeQueryId(const String & query_id) const;
void removeQueryIdNoLock(const String & query_id, const std::lock_guard<std::mutex> &) const;
void removeQueryIdNoLock(const String & query_id) const TSA_REQUIRES(query_id_set_mutex);
/// Return the partition expression types as a Tuple type. Return DataTypeUInt8 if partition expression is empty.
DataTypePtr getPartitionValueType() const;
@ -1290,7 +1291,7 @@ private:
std::atomic<size_t> total_active_size_parts = 0;
// Record all query ids which access the table. It's guarded by `query_id_set_mutex` and is always mutable.
mutable std::set<String> query_id_set;
mutable std::set<String> query_id_set TSA_GUARDED_BY(query_id_set_mutex);
mutable std::mutex query_id_set_mutex;
// Get partition matcher for FREEZE / UNFREEZE queries.

View File

@ -1155,6 +1155,7 @@ std::shared_ptr<QueryIdHolder> MergeTreeDataSelectExecutor::checkLimits(
const MergeTreeData & data,
const ReadFromMergeTree::AnalysisResult & result,
const ContextPtr & context)
TSA_NO_THREAD_SAFETY_ANALYSIS // disabled because TSA is confused by guaranteed copy elision in data.getQueryIdSetLock()
{
const auto & settings = context->getSettingsRef();
const auto data_settings = data.getSettings();
@ -1180,7 +1181,7 @@ std::shared_ptr<QueryIdHolder> MergeTreeDataSelectExecutor::checkLimits(
if (!query_id.empty())
{
auto lock = data.getQueryIdSetLock();
if (data.insertQueryIdOrThrowNoLock(query_id, data_settings->max_concurrent_queries, lock))
if (data.insertQueryIdOrThrowNoLock(query_id, data_settings->max_concurrent_queries))
{
try
{
@ -1189,7 +1190,7 @@ std::shared_ptr<QueryIdHolder> MergeTreeDataSelectExecutor::checkLimits(
catch (...)
{
/// If we fail to construct the holder, remove query_id explicitly to avoid leak.
data.removeQueryIdNoLock(query_id, lock);
data.removeQueryIdNoLock(query_id);
throw;
}
}