add some TSA aanotations

This commit is contained in:
Alexander Tokmakov 2022-06-27 22:48:27 +02:00
parent bed8ee556c
commit f4883f1f7e
31 changed files with 206 additions and 183 deletions

View File

@ -124,21 +124,37 @@
#endif
#endif
// Macros for Clang Thread Safety Analysis (TSA). They can be safely ignored by other compilers.
// Feel free to extend, but please stay close to https://clang.llvm.org/docs/ThreadSafetyAnalysis.html#mutexheader
/// Macros for Clang Thread Safety Analysis (TSA). They can be safely ignored by other compilers.
/// Feel free to extend, but please stay close to https://clang.llvm.org/docs/ThreadSafetyAnalysis.html#mutexheader
#if defined(__clang__)
# define TSA_GUARDED_BY(...) __attribute__((guarded_by(__VA_ARGS__))) // data is protected by given capability
# define TSA_PT_GUARDED_BY(...) __attribute__((pt_guarded_by(__VA_ARGS__))) // pointed-to data is protected by the given capability
# define TSA_REQUIRES(...) __attribute__((requires_capability(__VA_ARGS__))) // thread needs exclusive possession of given capability
# define TSA_REQUIRES_SHARED(...) __attribute__((requires_shared_capability(__VA_ARGS__))) // thread needs shared possession of given capability
# define TSA_ACQUIRED_AFTER(...) __attribute__((acquired_after(__VA_ARGS__))) // annotated lock must be locked after given lock
# define TSA_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis)) // disable TSA for a function
# define TSA_GUARDED_BY(...) __attribute__((guarded_by(__VA_ARGS__))) /// data is protected by given capability
# define TSA_PT_GUARDED_BY(...) __attribute__((pt_guarded_by(__VA_ARGS__))) /// pointed-to data is protected by the given capability
# define TSA_REQUIRES(...) __attribute__((requires_capability(__VA_ARGS__))) /// thread needs exclusive possession of given capability
# define TSA_REQUIRES_SHARED(...) __attribute__((requires_shared_capability(__VA_ARGS__))) /// thread needs shared possession of given capability
# define TSA_ACQUIRED_AFTER(...) __attribute__((acquired_after(__VA_ARGS__))) /// annotated lock must be locked after given lock
# define TSA_NO_THREAD_SAFETY_ANALYSIS __attribute__((no_thread_safety_analysis)) /// disable TSA for a function
/// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function)
/// Consider adding a comment before using these macros.
# define READ_NO_TSA(x) [&]() TSA_NO_THREAD_SAFETY_ANALYSIS -> const auto & { return (x); }()
# define WRITE_NO_TSA(x) [&]() TSA_NO_THREAD_SAFETY_ANALYSIS -> auto & { return (x); }()
/// This macro is useful when only one thread writes to a member
/// and you want to read this member from the same thread without locking a mutex.
/// It's safe (because no concurrent writes are possible), but TSA generates a waring.
/// (Seems like there's no way to verify it, but it makes sense to distinguish it from READ_NO_TSA for readability)
# define READ_ONE_THREAD(x) READ_NO_TSA(x)
#else
# define TSA_GUARDED_BY(...)
# define TSA_PT_GUARDED_BY(...)
# define TSA_REQUIRES(...)
# define TSA_REQUIRES_SHARED(...)
# define TSA_NO_THREAD_SAFETY_ANALYSIS
# define READ_NO_TSA(x)
# define WRITE_NO_TSA(x)
# define TSA_READ_UNSAFE(x)
#endif
/// A template function for suppressing warnings about unused variables or function results.

View File

@ -73,7 +73,7 @@ String DatabaseAtomic::getTableDataPath(const ASTCreateQuery & query) const
void DatabaseAtomic::drop(ContextPtr)
{
assert(tables.empty());
assert(READ_NO_TSA(tables).empty());
try
{
fs::remove(path_to_metadata_symlink);
@ -90,19 +90,19 @@ void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name,
{
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
not_in_use = cleanupDetachedTables();
auto table_id = table->getStorageID();
assertDetachedTableNotInUse(table_id.uuid);
DatabaseOrdinary::attachTableUnlocked(name, table, lock);
DatabaseOrdinary::attachTableUnlocked(name, table);
table_name_to_path.emplace(std::make_pair(name, relative_table_path));
}
StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String & name)
{
DetachedTables not_in_use;
std::unique_lock lock(mutex);
auto table = DatabaseOrdinary::detachTableUnlocked(name, lock);
std::lock_guard lock(mutex);
auto table = DatabaseOrdinary::detachTableUnlocked(name);
table_name_to_path.erase(name);
detached_tables.emplace(table->getStorageID().uuid, table);
not_in_use = cleanupDetachedTables(); //-V1001
@ -118,12 +118,12 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
table->dropInnerTableIfAny(no_delay, local_context);
else
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist",
backQuote(database_name), backQuote(table_name));
backQuote(getDatabaseName()), backQuote(table_name));
String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop;
{
std::unique_lock lock(mutex);
std::lock_guard lock(mutex);
table_metadata_path_drop = DatabaseCatalog::instance().getPathForDroppedMetadata(table->getStorageID());
auto txn = local_context->getZooKeeperMetadataTransaction();
if (txn && !local_context->isInternalSubquery())
@ -136,7 +136,7 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
/// TODO better detection and recovery
fs::rename(table_metadata_path, table_metadata_path_drop); /// Mark table as dropped
DatabaseOrdinary::detachTableUnlocked(table_name, lock); /// Should never throw
DatabaseOrdinary::detachTableUnlocked(table_name); /// Should never throw
table_name_to_path.erase(table_name);
}
@ -150,6 +150,7 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_name, IDatabase & to_database,
const String & to_table_name, bool exchange, bool dictionary)
TSA_NO_THREAD_SAFETY_ANALYSIS /// TSA does not support conditional locking
{
if (typeid(*this) != typeid(to_database))
{
@ -173,7 +174,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
String old_metadata_path = getObjectMetadataPath(table_name);
String new_metadata_path = to_database.getObjectMetadataPath(to_table_name);
auto detach = [](DatabaseAtomic & db, const String & table_name_, bool has_symlink)
auto detach = [](DatabaseAtomic & db, const String & table_name_, bool has_symlink) TSA_REQUIRES(db.mutex)
{
auto it = db.table_name_to_path.find(table_name_);
String table_data_path_saved;
@ -188,7 +189,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
return table_data_path_saved;
};
auto attach = [](DatabaseAtomic & db, const String & table_name_, const String & table_data_path_, const StoragePtr & table_)
auto attach = [](DatabaseAtomic & db, const String & table_name_, const String & table_data_path_, const StoragePtr & table_) TSA_REQUIRES(db.mutex)
{
db.tables.emplace(table_name_, table_);
if (table_data_path_.empty())
@ -229,9 +230,9 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
}
if (!exchange)
other_db.checkMetadataFilenameAvailabilityUnlocked(to_table_name, inside_database ? db_lock : other_db_lock);
other_db.checkMetadataFilenameAvailabilityUnlocked(to_table_name);
StoragePtr table = getTableUnlocked(table_name, db_lock);
StoragePtr table = getTableUnlocked(table_name);
if (dictionary && !table->isDictionary())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
@ -244,7 +245,7 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
StorageID other_table_new_id = StorageID::createEmpty();
if (exchange)
{
other_table = other_db.getTableUnlocked(to_table_name, other_db_lock);
other_table = other_db.getTableUnlocked(to_table_name);
if (dictionary && !other_table->isDictionary())
throw Exception(ErrorCodes::INCORRECT_QUERY, "Use RENAME/EXCHANGE TABLE (instead of RENAME/EXCHANGE DICTIONARY) for tables");
other_table_new_id = {database_name, table_name, other_table->getStorageID().uuid};
@ -294,7 +295,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
auto table_data_path = getTableDataPath(query);
try
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
if (query.getDatabase() != database_name)
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed to `{}`, cannot create table in `{}`",
database_name, query.getDatabase());
@ -312,7 +313,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
/// It throws if `table_metadata_path` already exists (it's possible if table was detached)
renameNoReplace(table_metadata_tmp_path, table_metadata_path); /// Commit point (a sort of)
attachTableUnlocked(query.getTable(), table, lock); /// Should never throw
attachTableUnlocked(query.getTable(), table); /// Should never throw
table_name_to_path.emplace(query.getTable(), table_data_path);
}
catch (...)
@ -330,8 +331,8 @@ void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String &
bool check_file_exists = true;
SCOPE_EXIT({ std::error_code code; if (check_file_exists) std::filesystem::remove(table_metadata_tmp_path, code); });
std::unique_lock lock{mutex};
auto actual_table_id = getTableUnlocked(table_id.table_name, lock)->getStorageID();
std::lock_guard lock{mutex};
auto actual_table_id = getTableUnlocked(table_id.table_name)->getStorageID();
if (table_id.uuid != actual_table_id.uuid)
throw Exception("Cannot alter table because it was renamed", ErrorCodes::CANNOT_ASSIGN_ALTER);
@ -363,7 +364,7 @@ void DatabaseAtomic::assertDetachedTableNotInUse(const UUID & uuid)
void DatabaseAtomic::setDetachedTableNotInUseForce(const UUID & uuid)
{
std::unique_lock lock{mutex};
std::lock_guard lock{mutex};
detached_tables.erase(uuid);
}

View File

@ -70,9 +70,9 @@ protected:
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
const String & table_metadata_tmp_path, const String & table_metadata_path, ContextPtr query_context) override;
void assertDetachedTableNotInUse(const UUID & uuid);
void assertDetachedTableNotInUse(const UUID & uuid) TSA_REQUIRES(mutex);
using DetachedTables = std::unordered_map<UUID, StoragePtr>;
[[nodiscard]] DetachedTables cleanupDetachedTables();
[[nodiscard]] DetachedTables cleanupDetachedTables() TSA_REQUIRES(mutex);
void tryCreateMetadataSymlink();
@ -80,9 +80,9 @@ protected:
//TODO store path in DatabaseWithOwnTables::tables
using NameToPathMap = std::unordered_map<String, String>;
NameToPathMap table_name_to_path;
NameToPathMap table_name_to_path TSA_GUARDED_BY(mutex);
DetachedTables detached_tables;
DetachedTables detached_tables TSA_GUARDED_BY(mutex);
String path_to_table_symlinks;
String path_to_metadata_symlink;
const UUID db_uuid;

View File

@ -158,6 +158,7 @@ DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const Filt
bool DatabaseLazy::empty() const
{
std::lock_guard lock(mutex);
return tables_cache.empty();
}

View File

@ -102,8 +102,8 @@ private:
const time_t expiration_time;
/// TODO use DatabaseWithOwnTablesBase::tables
mutable TablesCache tables_cache;
mutable CacheExpirationQueue cache_expiration_queue;
mutable TablesCache tables_cache TSA_GUARDED_BY(mutex);
mutable CacheExpirationQueue cache_expiration_queue TSA_GUARDED_BY(mutex);
StoragePtr loadTable(const String & table_name) const;

View File

@ -32,8 +32,8 @@ void DatabaseMemory::createTable(
const StoragePtr & table,
const ASTPtr & query)
{
std::unique_lock lock{mutex};
attachTableUnlocked(table_name, table, lock);
std::lock_guard lock{mutex};
attachTableUnlocked(table_name, table);
/// Clean the query from temporary flags.
ASTPtr query_to_store = query;
@ -54,21 +54,22 @@ void DatabaseMemory::dropTable(
const String & table_name,
bool /*no_delay*/)
{
std::unique_lock lock{mutex};
auto table = detachTableUnlocked(table_name, lock);
StoragePtr table;
{
std::lock_guard lock{mutex};
table = detachTableUnlocked(table_name);
}
try
{
/// Remove table without lock since:
/// - it does not require it
/// - it may cause lock-order-inversion if underlying storage need to
/// resolve tables (like StorageLiveView)
SCOPE_EXIT(lock.lock());
lock.unlock();
table->drop();
if (table->storesDataOnDisk())
{
assert(database_name != DatabaseCatalog::TEMPORARY_DATABASE);
assert(getDatabaseName() != DatabaseCatalog::TEMPORARY_DATABASE);
fs::path table_data_dir{getTableDataPath(table_name)};
if (fs::exists(table_data_dir))
fs::remove_all(table_data_dir);
@ -76,10 +77,13 @@ void DatabaseMemory::dropTable(
}
catch (...)
{
std::lock_guard lock{mutex};
assert(database_name != DatabaseCatalog::TEMPORARY_DATABASE);
attachTableUnlocked(table_name, table, lock);
attachTableUnlocked(table_name, table);
throw;
}
std::lock_guard lock{mutex};
table->is_dropped = true;
create_queries.erase(table_name);
UUID table_uuid = table->getStorageID().uuid;

View File

@ -51,9 +51,9 @@ public:
void alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) override;
private:
String data_path;
const String data_path;
using NameToASTCreate = std::unordered_map<String, ASTPtr>;
NameToASTCreate create_queries;
NameToASTCreate create_queries TSA_GUARDED_BY(mutex);
};
}

View File

@ -321,11 +321,11 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
void DatabaseOnDisk::checkMetadataFilenameAvailability(const String & to_table_name) const
{
std::unique_lock lock(mutex);
checkMetadataFilenameAvailabilityUnlocked(to_table_name, lock);
std::lock_guard lock(mutex);
checkMetadataFilenameAvailabilityUnlocked(to_table_name);
}
void DatabaseOnDisk::checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name, std::unique_lock<std::mutex> &) const
void DatabaseOnDisk::checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name) const
{
String table_metadata_path = getObjectMetadataPath(to_table_name);
@ -503,7 +503,7 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const
void DatabaseOnDisk::drop(ContextPtr local_context)
{
assert(tables.empty());
assert(READ_NO_TSA(tables).empty());
if (local_context->getSettingsRef().force_remove_data_recursively_on_drop)
{
fs::remove_all(local_context->getPath() + getDataPath());
@ -725,8 +725,6 @@ ASTPtr DatabaseOnDisk::getCreateQueryFromStorage(const String & table_name, cons
void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context)
{
std::lock_guard lock(modify_settings_mutex);
auto create_query = getCreateDatabaseQuery()->clone();
auto * create = create_query->as<ASTCreateQuery>();
auto * settings = create->storage->settings;
@ -759,7 +757,7 @@ void DatabaseOnDisk::modifySettingsMetadata(const SettingsChanges & settings_cha
writeChar('\n', statement_buf);
String statement = statement_buf.str();
String database_name_escaped = escapeForFileName(database_name);
String database_name_escaped = escapeForFileName(READ_NO_TSA(database_name)); /// FIXME
fs::path metadata_root_path = fs::canonical(query_context->getGlobalContext()->getPath());
fs::path metadata_file_tmp_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql.tmp");
fs::path metadata_file_path = fs::path(metadata_root_path) / "metadata" / (database_name_escaped + ".sql");

View File

@ -70,7 +70,7 @@ public:
/// will throw when the table we want to attach already exists (in active / detached / detached permanently form)
void checkMetadataFilenameAvailability(const String & to_table_name) const override;
void checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name, std::unique_lock<std::mutex> &) const;
void checkMetadataFilenameAvailabilityUnlocked(const String & to_table_name) const TSA_REQUIRES(mutex);
void modifySettingsMetadata(const SettingsChanges & settings_changes, ContextPtr query_context);
@ -99,9 +99,6 @@ protected:
const String metadata_path;
const String data_path;
/// For alter settings.
std::mutex modify_settings_mutex;
};
}

View File

@ -174,7 +174,8 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
if (ast)
{
auto * create_query = ast->as<ASTCreateQuery>();
create_query->setDatabase(database_name);
/// NOTE No concurrent writes are possible during database loading
create_query->setDatabase(READ_NO_TSA(database_name));
/// Even if we don't load the table we can still mark the uuid of it as taken.
if (create_query->uuid != UUIDHelpers::Nil)
@ -201,7 +202,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
return;
}
QualifiedTableName qualified_name{database_name, create_query->getTable()};
QualifiedTableName qualified_name{READ_NO_TSA(database_name), create_query->getTable()};
TableNamesSet loading_dependencies = getDependenciesSetFromCreateQuery(getContext(), qualified_name, ast);
std::lock_guard lock{metadata.mutex};
@ -234,12 +235,12 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
size_t tables_in_database = objects_in_database - dictionaries_in_database;
LOG_INFO(log, "Metadata processed, database {} has {} tables and {} dictionaries in total.",
database_name, tables_in_database, dictionaries_in_database);
READ_NO_TSA(database_name), tables_in_database, dictionaries_in_database);
}
void DatabaseOrdinary::loadTableFromMetadata(ContextMutablePtr local_context, const String & file_path, const QualifiedTableName & name, const ASTPtr & ast, bool force_restore)
{
assert(name.database == database_name);
assert(name.database == READ_NO_TSA(database_name));
const auto & create_query = ast->as<const ASTCreateQuery &>();
tryAttachTable(
@ -255,7 +256,8 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, bool /*force_rest
{
LOG_INFO(log, "Starting up tables.");
const size_t total_tables = tables.size();
/// NOTE No concurrent writes are possible during database loading
const size_t total_tables = READ_NO_TSA(tables).size();
if (!total_tables)
return;
@ -271,7 +273,7 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool, bool /*force_rest
try
{
for (const auto & table : tables)
for (const auto & table : READ_NO_TSA(tables))
thread_pool.scheduleOrThrowOnError([&]() { startup_one_table(table.second); });
}
catch (...)

View File

@ -148,7 +148,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
if (hosts.empty())
throw Exception(ErrorCodes::NO_ACTIVE_REPLICAS, "No replicas of database {} found. "
"It's possible if the first replica is not fully created yet "
"or if the last replica was just dropped or due to logical error", database_name);
"or if the last replica was just dropped or due to logical error", zookeeper_path);
Int32 cversion = stat.cversion;
::sort(hosts.begin(), hosts.end());
@ -213,7 +213,7 @@ ClusterPtr DatabaseReplicated::getClusterImpl() const
treat_local_port_as_remote,
cluster_auth_info.cluster_secure_connection,
/*priority=*/1,
database_name,
READ_NO_TSA(database_name), /// FIXME
cluster_auth_info.cluster_secret);
}
@ -588,7 +588,7 @@ void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeep
query_context->makeQueryContext();
query_context->getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
query_context->getClientInfo().is_replicated_database_internal = true;
query_context->setCurrentDatabase(database_name);
query_context->setCurrentDatabase(getDatabaseName());
query_context->setCurrentQueryId("");
auto txn = std::make_shared<ZooKeeperMetadataTransaction>(current_zookeeper, zookeeper_path, false, "");
query_context->initZooKeeperMetadataTransaction(txn);

View File

@ -218,11 +218,11 @@ bool DatabaseWithOwnTablesBase::empty() const
StoragePtr DatabaseWithOwnTablesBase::detachTable(ContextPtr /* context_ */, const String & table_name)
{
std::unique_lock lock(mutex);
return detachTableUnlocked(table_name, lock);
std::lock_guard lock(mutex);
return detachTableUnlocked(table_name);
}
StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_name, std::unique_lock<std::mutex> &)
StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_name)
{
StoragePtr res;
@ -245,11 +245,11 @@ StoragePtr DatabaseWithOwnTablesBase::detachTableUnlocked(const String & table_n
void DatabaseWithOwnTablesBase::attachTable(ContextPtr /* context_ */, const String & table_name, const StoragePtr & table, const String &)
{
std::unique_lock lock(mutex);
attachTableUnlocked(table_name, table, lock);
std::lock_guard lock(mutex);
attachTableUnlocked(table_name, table);
}
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table, std::unique_lock<std::mutex> &)
void DatabaseWithOwnTablesBase::attachTableUnlocked(const String & table_name, const StoragePtr & table)
{
auto table_id = table->getStorageID();
if (table_id.database_name != database_name)
@ -313,7 +313,7 @@ DatabaseWithOwnTablesBase::~DatabaseWithOwnTablesBase()
}
}
StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name, std::unique_lock<std::mutex> &) const
StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name) const
{
auto it = tables.find(table_name);
if (it != tables.end())

View File

@ -45,14 +45,14 @@ public:
~DatabaseWithOwnTablesBase() override;
protected:
Tables tables;
Tables tables TSA_GUARDED_BY(mutex);
Poco::Logger * log;
DatabaseWithOwnTablesBase(const String & name_, const String & logger, ContextPtr context);
void attachTableUnlocked(const String & table_name, const StoragePtr & table, std::unique_lock<std::mutex> & lock);
StoragePtr detachTableUnlocked(const String & table_name, std::unique_lock<std::mutex> & lock);
StoragePtr getTableUnlocked(const String & table_name, std::unique_lock<std::mutex> & lock) const;
void attachTableUnlocked(const String & table_name, const StoragePtr & table) TSA_REQUIRES(mutex);
StoragePtr detachTableUnlocked(const String & table_name) TSA_REQUIRES(mutex);
StoragePtr getTableUnlocked(const String & table_name) const TSA_REQUIRES(mutex);
};
}

View File

@ -19,7 +19,7 @@ StoragePtr IDatabase::getTable(const String & name, ContextPtr context) const
{
if (auto storage = tryGetTable(name, context))
return storage;
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuoteIfNeed(database_name), backQuoteIfNeed(name));
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
}
ASTPtr IDatabase::getCreateDatabaseQueryForBackup() const

View File

@ -356,8 +356,8 @@ protected:
}
mutable std::mutex mutex;
String database_name;
String comment;
String database_name TSA_GUARDED_BY(mutex);
String comment TSA_GUARDED_BY(mutex);
};
using DatabasePtr = std::shared_ptr<IDatabase>;

View File

@ -109,15 +109,15 @@ private:
void cleanOutdatedTables();
void fetchTablesIntoLocalCache(ContextPtr context) const;
void fetchTablesIntoLocalCache(ContextPtr context) const TSA_REQUIRES(mutex);
std::map<String, UInt64> fetchTablesWithModificationTime(ContextPtr local_context) const;
std::map<String, ColumnsDescription> fetchTablesColumnsList(const std::vector<String> & tables_name, ContextPtr context) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const TSA_REQUIRES(mutex);
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time, ContextPtr context) const;
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time, ContextPtr context) const TSA_REQUIRES(mutex);
ThreadFromGlobalPool thread;
};

View File

@ -63,9 +63,9 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
return;
replication_handler = std::make_unique<PostgreSQLReplicationHandler>(
/* replication_identifier */database_name,
/* replication_identifier */ READ_NO_TSA(database_name), /// FIXME
remote_database_name,
database_name,
READ_NO_TSA(database_name), /// FIXME
connection_info,
getContext(),
is_attach,
@ -99,7 +99,8 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
else
{
/// Nested table does not exist and will be created by replication thread.
storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
/// FIXME TSA
storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(READ_NO_TSA(database_name), table_name), getContext(), remote_database_name, table_name);
}
/// Cache MaterializedPostgreSQL wrapper over nested table.
@ -210,7 +211,8 @@ ASTPtr DatabaseMaterializedPostgreSQL::getCreateTableQueryImpl(const String & ta
std::lock_guard lock(handler_mutex);
auto storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(database_name, table_name), getContext(), remote_database_name, table_name);
/// FIXME TSA
auto storage = std::make_shared<StorageMaterializedPostgreSQL>(StorageID(READ_NO_TSA(database_name), table_name), getContext(), remote_database_name, table_name);
auto ast_storage = replication_handler->getCreateNestedTableQuery(storage.get(), table_name);
assert_cast<ASTCreateQuery *>(ast_storage.get())->uuid = UUIDHelpers::generateV4();
return ast_storage;
@ -234,7 +236,7 @@ ASTPtr DatabaseMaterializedPostgreSQL::createAlterSettingsQuery(const SettingCha
auto * alter = query->as<ASTAlterQuery>();
alter->alter_object = ASTAlterQuery::AlterObjectType::DATABASE;
alter->setDatabase(database_name);
alter->setDatabase(READ_NO_TSA(database_name)); /// FIXME
alter->set(alter->command_list, command_list);
return query;

View File

@ -369,7 +369,11 @@ ASTPtr DatabasePostgreSQL::getCreateDatabaseQuery() const
ASTPtr DatabasePostgreSQL::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
auto storage = fetchTable(table_name, local_context, false);
StoragePtr storage;
{
std::lock_guard lock{mutex};
storage = fetchTable(table_name, local_context, false);
}
if (!storage)
{
if (throw_on_error)

View File

@ -81,7 +81,7 @@ private:
bool checkPostgresTable(const String & table_name) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const TSA_REQUIRES(mutex);
void removeOutdatedTables();

View File

@ -173,12 +173,16 @@ ASTPtr DatabaseSQLite::getCreateDatabaseQuery() const
ASTPtr DatabaseSQLite::getCreateTableQueryImpl(const String & table_name, ContextPtr local_context, bool throw_on_error) const
{
auto storage = fetchTable(table_name, local_context, false);
StoragePtr storage;
{
std::lock_guard<std::mutex> lock(mutex);
storage = fetchTable(table_name, local_context, false);
}
if (!storage)
{
if (throw_on_error)
throw Exception(ErrorCodes::UNKNOWN_TABLE, "SQLite table {}.{} does not exist",
database_name, table_name);
getDatabaseName(), table_name);
return nullptr;
}
auto table_storage_define = database_engine_define->clone();

View File

@ -54,9 +54,9 @@ private:
bool checkSQLiteTable(const String & table_name) const;
NameSet fetchTablesList() const;
NameSet fetchTablesList() const TSA_REQUIRES(mutex);
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const;
StoragePtr fetchTable(const String & table_name, ContextPtr context, bool table_checked) const TSA_REQUIRES(mutex);
};

View File

@ -124,7 +124,7 @@ protected:
std::string queue_dir; /// dir with queue of queries
mutable std::mutex zookeeper_mutex;
ZooKeeperPtr current_zookeeper;
ZooKeeperPtr current_zookeeper TSA_GUARDED_BY(zookeeper_mutex);
/// Save state of executed task to avoid duplicate execution on ZK error
std::optional<String> last_skipped_entry_name;

View File

@ -205,7 +205,10 @@ void DatabaseCatalog::shutdownImpl()
for (auto & database : current_databases)
database.second->shutdown();
tables_marked_dropped.clear();
{
std::lock_guard lock(tables_marked_dropped_mutex);
tables_marked_dropped.clear();
}
std::lock_guard lock(databases_mutex);
for (const auto & db : databases)
@ -223,6 +226,7 @@ void DatabaseCatalog::shutdownImpl()
auto & table = mapping.second.second;
return db || table;
};
std::lock_guard map_lock{elem.mutex};
auto it = std::find_if(elem.map.begin(), elem.map.end(), not_empty_mapping);
return it != elem.map.end();
}) == uuid_map.end());
@ -689,7 +693,8 @@ DatabaseCatalog::updateDependency(const StorageID & old_from, const StorageID &
DDLGuardPtr DatabaseCatalog::getDDLGuard(const String & database, const String & table)
{
std::unique_lock lock(ddl_guards_mutex);
auto db_guard_iter = ddl_guards.try_emplace(database).first;
/// TSA does not support unique_lock
auto db_guard_iter = WRITE_NO_TSA(ddl_guards).try_emplace(database).first;
DatabaseGuard & db_guard = db_guard_iter->second;
return std::make_unique<DDLGuard>(db_guard.first, db_guard.second, std::move(lock), table, database);
}
@ -698,7 +703,7 @@ std::unique_lock<std::shared_mutex> DatabaseCatalog::getExclusiveDDLGuardForData
{
DDLGuards::iterator db_guard_iter;
{
std::unique_lock lock(ddl_guards_mutex);
std::lock_guard lock(ddl_guards_mutex);
db_guard_iter = ddl_guards.try_emplace(database).first;
assert(db_guard_iter->second.first.contains(""));
}
@ -999,7 +1004,7 @@ void DatabaseCatalog::waitTableFinallyDropped(const UUID & uuid)
LOG_DEBUG(log, "Waiting for table {} to be finally dropped", toString(uuid));
std::unique_lock lock{tables_marked_dropped_mutex};
wait_table_finally_dropped.wait(lock, [&]()
wait_table_finally_dropped.wait(lock, [&]() TSA_REQUIRES(tables_marked_dropped_mutex) -> bool
{
return !tables_marked_dropped_ids.contains(uuid);
});

View File

@ -221,7 +221,7 @@ public:
DependenciesInfo getLoadingDependenciesInfo(const StorageID & table_id) const;
TableNamesSet tryRemoveLoadingDependencies(const StorageID & table_id, bool check_dependencies, bool is_drop_database = false);
TableNamesSet tryRemoveLoadingDependenciesUnlocked(const QualifiedTableName & removing_table, bool check_dependencies, bool is_drop_database = false);
TableNamesSet tryRemoveLoadingDependenciesUnlocked(const QualifiedTableName & removing_table, bool check_dependencies, bool is_drop_database = false) TSA_REQUIRES(databases_mutex);
void checkTableCanBeRemovedOrRenamed(const StorageID & table_id) const;
void updateLoadingDependencies(const StorageID & table_id, TableNamesSet && new_dependencies);
@ -233,15 +233,15 @@ private:
static std::unique_ptr<DatabaseCatalog> database_catalog;
explicit DatabaseCatalog(ContextMutablePtr global_context_);
void assertDatabaseExistsUnlocked(const String & database_name) const;
void assertDatabaseDoesntExistUnlocked(const String & database_name) const;
void assertDatabaseExistsUnlocked(const String & database_name) const TSA_REQUIRES(databases_mutex) TSA_REQUIRES(databases_mutex);
void assertDatabaseDoesntExistUnlocked(const String & database_name) const TSA_REQUIRES(databases_mutex) TSA_REQUIRES(databases_mutex);
void shutdownImpl();
struct UUIDToStorageMapPart
{
std::unordered_map<UUID, DatabaseAndTable> map;
std::unordered_map<UUID, DatabaseAndTable> map TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
};
@ -273,12 +273,12 @@ private:
mutable std::mutex databases_mutex;
ViewDependencies view_dependencies;
ViewDependencies view_dependencies TSA_GUARDED_BY(databases_mutex);
Databases databases;
Databases databases TSA_GUARDED_BY(databases_mutex);
UUIDToStorageMap uuid_map;
DependenciesInfos loading_dependencies;
DependenciesInfos loading_dependencies TSA_GUARDED_BY(databases_mutex);
Poco::Logger * log;
@ -290,12 +290,12 @@ private:
/// In case the element already exists, waits when query will be executed in other thread. See class DDLGuard below.
using DatabaseGuard = std::pair<DDLGuard::Map, std::shared_mutex>;
using DDLGuards = std::map<String, DatabaseGuard>;
DDLGuards ddl_guards;
DDLGuards ddl_guards TSA_GUARDED_BY(ddl_guards_mutex);
/// If you capture mutex and ddl_guards_mutex, then you need to grab them strictly in this order.
mutable std::mutex ddl_guards_mutex;
TablesMarkedAsDropped tables_marked_dropped;
std::unordered_set<UUID> tables_marked_dropped_ids;
TablesMarkedAsDropped tables_marked_dropped TSA_GUARDED_BY(tables_marked_dropped_mutex);
std::unordered_set<UUID> tables_marked_dropped_ids TSA_GUARDED_BY(tables_marked_dropped_mutex);
mutable std::mutex tables_marked_dropped_mutex;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task;

View File

@ -24,16 +24,17 @@ static TableLockHolder getLockForOrdinary(const StoragePtr & storage)
return storage->lockForShare(RWLockImpl::NO_QUERY, default_timeout);
}
MergeTreeTransaction::MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id)
MergeTreeTransaction::MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id, std::list<CSN>::iterator snapshot_it_)
: tid({snapshot_, local_tid_, host_id})
, snapshot(snapshot_)
, snapshot_in_use_it(snapshot_it_)
, csn(Tx::UnknownCSN)
{
}
void MergeTreeTransaction::setSnapshot(CSN new_snapshot)
{
snapshot = new_snapshot;
snapshot.store(new_snapshot, std::memory_order_relaxed);
}
MergeTreeTransaction::State MergeTreeTransaction::getState() const
@ -219,19 +220,31 @@ void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
/// It's not a problem if server crash before CSN is written, because we already have TID in data part and entry in the log.
[[maybe_unused]] CSN prev_value = csn.exchange(assigned_csn);
chassert(prev_value == Tx::CommittingCSN);
for (const auto & part : creating_parts)
DataPartsVector created_parts;
DataPartsVector removed_parts;
RunningMutationsList committed_mutations;
{
/// We don't really need mutex here, because no concurrent modifications of transaction object may happen after comit.
std::lock_guard lock{mutex};
created_parts = creating_parts;
removed_parts = removing_parts;
committed_mutations = mutations;
}
for (const auto & part : created_parts)
{
part->version.creation_csn.store(csn);
part->appendCSNToVersionMetadata(VersionMetadata::WhichCSN::CREATION);
}
for (const auto & part : removing_parts)
for (const auto & part : removed_parts)
{
part->version.removal_csn.store(csn);
part->appendCSNToVersionMetadata(VersionMetadata::WhichCSN::REMOVAL);
}
for (const auto & storage_and_mutation : mutations)
for (const auto & storage_and_mutation : committed_mutations)
storage_and_mutation.first->setMutationCSN(storage_and_mutation.second, csn);
}
@ -313,7 +326,7 @@ void MergeTreeTransaction::onException()
String MergeTreeTransaction::dumpDescription() const
{
String res = fmt::format("{} state: {}, snapshot: {}", tid, getState(), snapshot);
String res = fmt::format("{} state: {}, snapshot: {}", tid, getState(), getSnapshot());
if (isReadOnly())
{
@ -335,7 +348,7 @@ String MergeTreeTransaction::dumpDescription() const
{
String info = fmt::format("{} (created by {}, {})", part->name, part->version.getCreationTID(), part->version.creation_csn);
std::get<1>(storage_to_changes[&(part->storage)]).push_back(std::move(info));
chassert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
chassert(!part->version.creation_csn || part->version.creation_csn <= getSnapshot());
}
for (const auto & mutation : mutations)

View File

@ -31,13 +31,13 @@ public:
ROLLED_BACK,
};
CSN getSnapshot() const { return snapshot; }
CSN getSnapshot() const { return snapshot.load(std::memory_order_relaxed); }
void setSnapshot(CSN new_snapshot);
State getState() const;
const TransactionID tid;
MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id);
MergeTreeTransaction(CSN snapshot_, LocalTID local_tid_, UUID host_id, std::list<CSN>::iterator snapshot_it_);
void addNewPart(const StoragePtr & storage, const DataPartPtr & new_part);
void removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, const TransactionInfoContext & context);
@ -71,16 +71,16 @@ private:
Stopwatch elapsed;
/// Usually it's equal to tid.start_csn, but can be changed by SET SNAPSHOT query (for introspection purposes and time-traveling)
CSN snapshot;
std::list<CSN>::iterator snapshot_in_use_it;
std::atomic<CSN> snapshot;
const std::list<CSN>::iterator snapshot_in_use_it;
/// Lists of changes made by transaction
std::unordered_set<StoragePtr> storages;
std::vector<TableLockHolder> table_read_locks_for_ordinary_db;
DataPartsVector creating_parts;
DataPartsVector removing_parts;
std::unordered_set<StoragePtr> storages TSA_GUARDED_BY(mutex);
std::vector<TableLockHolder> table_read_locks_for_ordinary_db TSA_GUARDED_BY(mutex);
DataPartsVector creating_parts TSA_GUARDED_BY(mutex);
DataPartsVector removing_parts TSA_GUARDED_BY(mutex);
using RunningMutationsList = std::vector<std::pair<StoragePtr, String>>;
RunningMutationsList mutations;
RunningMutationsList mutations TSA_GUARDED_BY(mutex);
std::atomic<CSN> csn;
};

View File

@ -43,16 +43,13 @@ catch (...)
TransactionLog::TransactionLog()
: log(&Poco::Logger::get("TransactionLog"))
: global_context(Context::getGlobalContextInstance())
, log(&Poco::Logger::get("TransactionLog"))
, zookeeper_path(global_context->getConfigRef().getString("transaction_log.zookeeper_path", "/clickhouse/txn"))
, zookeeper_path_log(zookeeper_path + "/log")
, fault_probability_before_commit(global_context->getConfigRef().getDouble("transaction_log.fault_probability_before_commit", 0))
, fault_probability_after_commit(global_context->getConfigRef().getDouble("transaction_log.fault_probability_after_commit", 0))
{
global_context = Context::getGlobalContextInstance();
global_context->checkTransactionsAreAllowed();
zookeeper_path = global_context->getConfigRef().getString("transaction_log.zookeeper_path", "/clickhouse/txn");
zookeeper_path_log = zookeeper_path + "/log";
fault_probability_before_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_before_commit", 0);
fault_probability_after_commit = global_context->getConfigRef().getDouble("transaction_log.fault_probability_after_commit", 0);
loadLogFromZooKeeper();
updating_thread = ThreadFromGlobalPool(&TransactionLog::runUpdatingThread, this);
@ -128,7 +125,7 @@ void TransactionLog::loadEntries(Strings::const_iterator beg, Strings::const_ite
LOG_TRACE(log, "Loading {} entries from {}: {}..{}", entries_count, zookeeper_path_log, *beg, last_entry);
futures.reserve(entries_count);
for (auto it = beg; it != end; ++it)
futures.emplace_back(zookeeper->asyncGet(fs::path(zookeeper_path_log) / *it));
futures.emplace_back(READ_ONE_THREAD(zookeeper)->asyncGet(fs::path(zookeeper_path_log) / *it));
std::vector<std::pair<TIDHash, CSNEntry>> loaded;
loaded.reserve(entries_count);
@ -213,7 +210,7 @@ void TransactionLog::runUpdatingThread()
try
{
/// Do not wait if we have some transactions to finalize
if (unknown_state_list_loaded.empty())
if (READ_ONE_THREAD(unknown_state_list_loaded).empty())
log_updated_event->wait();
if (stop_flag.load())
@ -230,7 +227,7 @@ void TransactionLog::runUpdatingThread()
/// It's possible that we connected to different [Zoo]Keeper instance
/// so we may read a bit stale state.
zookeeper->sync(zookeeper_path_log);
READ_ONE_THREAD(zookeeper)->sync(zookeeper_path_log);
}
loadNewEntries();
@ -255,13 +252,13 @@ void TransactionLog::runUpdatingThread()
void TransactionLog::loadNewEntries()
{
Strings entries_list = zookeeper->getChildren(zookeeper_path_log, nullptr, log_updated_event);
Strings entries_list = READ_ONE_THREAD(zookeeper)->getChildren(zookeeper_path_log, nullptr, log_updated_event);
chassert(!entries_list.empty());
::sort(entries_list.begin(), entries_list.end());
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), last_loaded_entry);
auto it = std::upper_bound(entries_list.begin(), entries_list.end(), READ_ONE_THREAD(last_loaded_entry));
loadEntries(it, entries_list.end());
chassert(last_loaded_entry == entries_list.back());
chassert(latest_snapshot == deserializeCSN(last_loaded_entry));
chassert(READ_ONE_THREAD(last_loaded_entry) == entries_list.back());
chassert(latest_snapshot == deserializeCSN(READ_ONE_THREAD(last_loaded_entry)));
latest_snapshot.notify_all();
}
@ -281,7 +278,7 @@ void TransactionLog::removeOldEntries()
/// TODO we will need a bit more complex logic for multiple hosts
Coordination::Stat stat;
CSN old_tail_ptr = deserializeCSN(zookeeper->get(zookeeper_path + "/tail_ptr", &stat));
CSN old_tail_ptr = deserializeCSN(READ_ONE_THREAD(zookeeper)->get(zookeeper_path + "/tail_ptr", &stat));
CSN new_tail_ptr = getOldestSnapshot();
if (new_tail_ptr < old_tail_ptr)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got unexpected tail_ptr {}, oldest snapshot is {}, it's a bug", old_tail_ptr, new_tail_ptr);
@ -290,7 +287,7 @@ void TransactionLog::removeOldEntries()
/// (it's not supposed to fail with ZBADVERSION while there is only one host)
LOG_TRACE(log, "Updating tail_ptr from {} to {}", old_tail_ptr, new_tail_ptr);
zookeeper->set(zookeeper_path + "/tail_ptr", serializeCSN(new_tail_ptr), stat.version);
READ_ONE_THREAD(zookeeper)->set(zookeeper_path + "/tail_ptr", serializeCSN(new_tail_ptr), stat.version);
tail_ptr.store(new_tail_ptr);
/// Now we can find and remove old entries
@ -314,7 +311,7 @@ void TransactionLog::removeOldEntries()
continue;
LOG_TEST(log, "Removing entry {} -> {}", elem.second.tid, elem.second.csn);
auto code = zookeeper->tryRemove(zookeeper_path_log + "/" + serializeCSN(elem.second.csn));
auto code = READ_ONE_THREAD(zookeeper)->tryRemove(zookeeper_path_log + "/" + serializeCSN(elem.second.csn));
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNONODE)
removed_entries.push_back(elem.first);
}
@ -376,11 +373,11 @@ MergeTreeTransactionPtr TransactionLog::beginTransaction()
std::lock_guard lock{running_list_mutex};
CSN snapshot = latest_snapshot.load();
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, ServerUUID::get());
auto snapshot_lock = snapshots_in_use.insert(snapshots_in_use.end(), snapshot);
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, ServerUUID::get(), snapshot_lock);
bool inserted = running_list.try_emplace(txn->tid.getHash(), txn).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
txn->snapshot_in_use_it = snapshots_in_use.insert(snapshots_in_use.end(), snapshot);
}
LOG_TEST(log, "Beginning transaction {} ({})", txn->tid, txn->tid.getHash());
@ -595,7 +592,7 @@ TransactionLog::TransactionsList TransactionLog::getTransactionsList() const
void TransactionLog::sync() const
{
Strings entries_list = zookeeper->getChildren(zookeeper_path_log);
Strings entries_list = getZooKeeper()->getChildren(zookeeper_path_log);
chassert(!entries_list.empty());
::sort(entries_list.begin(), entries_list.end());
CSN newest_csn = deserializeCSN(entries_list.back());

View File

@ -129,7 +129,7 @@ public:
void sync() const;
private:
void loadLogFromZooKeeper();
void loadLogFromZooKeeper() TSA_REQUIRES(mutex);
void runUpdatingThread();
void loadEntries(Strings::const_iterator beg, Strings::const_iterator end);
@ -149,8 +149,8 @@ private:
CSN getCSNImpl(const TIDHash & tid_hash) const;
ContextPtr global_context;
Poco::Logger * log;
const ContextPtr global_context;
Poco::Logger * const log;
/// The newest snapshot available for reading
std::atomic<CSN> latest_snapshot;
@ -167,24 +167,24 @@ private:
TransactionID tid;
};
using TIDMap = std::unordered_map<TIDHash, CSNEntry>;
TIDMap tid_to_csn;
TIDMap tid_to_csn TSA_GUARDED_BY(mutex);
mutable std::mutex running_list_mutex;
/// Transactions that are currently processed
TransactionsList running_list;
TransactionsList running_list TSA_GUARDED_BY(running_list_mutex);
/// If we lost connection on attempt to create csn- node then we don't know transaction's state.
using UnknownStateList = std::vector<std::pair<MergeTreeTransaction *, scope_guard>>;
UnknownStateList unknown_state_list;
UnknownStateList unknown_state_list_loaded;
UnknownStateList unknown_state_list TSA_GUARDED_BY(running_list_mutex);
UnknownStateList unknown_state_list_loaded TSA_GUARDED_BY(running_list_mutex);
/// Ordered list of snapshots that are currently used by some transactions. Needed for background cleanup.
std::list<CSN> snapshots_in_use;
std::list<CSN> snapshots_in_use TSA_GUARDED_BY(running_list_mutex);
ZooKeeperPtr zookeeper;
String zookeeper_path;
ZooKeeperPtr zookeeper TSA_GUARDED_BY(mutex);
const String zookeeper_path;
String zookeeper_path_log;
const String zookeeper_path_log;
/// Name of the newest entry that was loaded from log in ZK
String last_loaded_entry;
String last_loaded_entry TSA_GUARDED_BY(mutex);
/// The oldest CSN such that we store in log entries with TransactionIDs containing this CSN.
std::atomic<CSN> tail_ptr = Tx::UnknownCSN;
@ -193,8 +193,8 @@ private:
std::atomic_bool stop_flag = false;
ThreadFromGlobalPool updating_thread;
Float64 fault_probability_before_commit = 0;
Float64 fault_probability_after_commit = 0;
const Float64 fault_probability_before_commit = 0;
const Float64 fault_probability_after_commit = 0;
};
template <typename Derived>

View File

@ -79,12 +79,7 @@ function thread_partition_dst_to_src()
SET throw_on_unsupported_query_inside_transaction=0;
SYSTEM START MERGES dst;
SELECT throwIf((SELECT (count(), sum(n)) FROM merge(currentDatabase(), '') WHERE type=4) != (toUInt8($i/2 + 1), (select sum(number) from numbers(1, $i) where number % 2 or number=$i))) FORMAT Null;
$action;" || $CLICKHOUSE_CLIENT --multiquery --query "
begin transaction;
set transaction snapshot 3;
select $i, 'src', type, n, _part from src order by type, n;
select $i, 'dst', type, n, _part from dst order by type, n;
rollback" ||:
$action;"
done
}
@ -102,12 +97,7 @@ function thread_select()
SELECT _table, throwIf(arraySort(groupArrayIf(n, type=1)) != arraySort(groupArrayIf(n, type=2))) FROM merge(currentDatabase(), '') GROUP BY _table FORMAT Null;
-- all rows are inserted in insert_thread
SELECT type, throwIf(count(n) != max(n)), throwIf(sum(n) != max(n)*(max(n)+1)/2) FROM merge(currentDatabase(), '') WHERE type IN (1, 2) GROUP BY type ORDER BY type FORMAT Null;
COMMIT;" || $CLICKHOUSE_CLIENT --multiquery --query "
begin transaction;
set transaction snapshot 3;
select $i, 'src', type, n, _part from src order by type, n;
select $i, 'dst', type, n, _part from dst order by type, n;
rollback" ||:
COMMIT;"
done
}

View File

@ -88,12 +88,7 @@ function thread_select()
SELECT throwIf((SELECT (sum(nm), count() % 2) FROM dst) != (0, 1)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(nm)) FROM dst)) FORMAT Null;
SELECT throwIf((SELECT arraySort(groupArray(nm)) FROM mv) != (SELECT arraySort(groupArray(n*m)) FROM src)) FORMAT Null;
COMMIT;" || $CLICKHOUSE_CLIENT --multiquery --query "
begin transaction;
set transaction snapshot 3;
select 'src', n, m, _part from src order by n, m;
select 'dst', nm, _part from dst order by nm;
rollback" ||:
COMMIT;"
done
}
@ -113,12 +108,7 @@ function thread_select_insert()
-- now check that all results are the same
SELECT throwIf(1 != (SELECT countDistinct(arr) FROM (SELECT x, arraySort(groupArray(nm)) AS arr FROM tmp WHERE x!=4 GROUP BY x))) FORMAT Null;
SELECT throwIf((SELECT count(), sum(nm) FROM tmp WHERE x=4) != (SELECT count(), sum(nm) FROM tmp WHERE x!=4)) FORMAT Null;
ROLLBACK;" || $CLICKHOUSE_CLIENT --multiquery --query "
begin transaction;
set transaction snapshot 3;
select 'src', n, m, _part from src order by n, m;
select 'dst', nm, _part from dst order by nm;
rollback" ||:
ROLLBACK;"
done
}

View File

@ -43,8 +43,7 @@ function thread_select()
SELECT throwIf((SELECT sum(n) FROM mt) != 0) FORMAT Null;
SELECT throwIf((SELECT count() FROM mt) % 2 != 0) FORMAT Null;
SELECT arraySort(groupArray(n)), arraySort(groupArray(m)), arraySort(groupArray(_part)) FROM mt;
COMMIT;" | uniq | wc -l | grep -v "^1$" && $CLICKHOUSE_CLIENT -q "SELECT * FROM system.parts
WHERE database='$CLICKHOUSE_DATABASE' AND table='mt'" ||:;
COMMIT;"
done
}