add startup waits in all DDL queries and global DDL worker

This commit is contained in:
serxa 2023-10-24 10:09:10 +00:00
parent e214b73844
commit d3f54a29e3
16 changed files with 142 additions and 44 deletions

View File

@ -1657,6 +1657,7 @@ try
LOG_INFO(log, "Loading metadata from {}", path_str); LOG_INFO(log, "Loading metadata from {}", path_str);
LoadTasksPtrs load_metadata_tasks;
try try
{ {
auto & database_catalog = DatabaseCatalog::instance(); auto & database_catalog = DatabaseCatalog::instance();
@ -1684,9 +1685,9 @@ try
database_catalog.loadMarkedAsDroppedTables(); database_catalog.loadMarkedAsDroppedTables();
database_catalog.createBackgroundTasks(); database_catalog.createBackgroundTasks();
/// Then, load remaining databases (some of them maybe be loaded asynchronously) /// Then, load remaining databases (some of them maybe be loaded asynchronously)
auto load_metadata = loadMetadata(global_context, default_database, server_settings.async_load_databases); load_metadata_tasks = loadMetadata(global_context, default_database, server_settings.async_load_databases);
/// If we need to convert database engines, disable async tables loading /// If we need to convert database engines, disable async tables loading
convertDatabasesEnginesIfNeed(load_metadata, global_context); convertDatabasesEnginesIfNeed(load_metadata_tasks, global_context);
database_catalog.startupBackgroundCleanup(); database_catalog.startupBackgroundCleanup();
/// After loading validate that default database exists /// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database); database_catalog.assertDatabaseExists(default_database);
@ -1851,9 +1852,14 @@ try
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "distributed_ddl.pool_size should be greater then 0"); throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "distributed_ddl.pool_size should be greater then 0");
global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, global_context, &config(), global_context->setDDLWorker(std::make_unique<DDLWorker>(pool_size, ddl_zookeeper_path, global_context, &config(),
"distributed_ddl", "DDLWorker", "distributed_ddl", "DDLWorker",
&CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID)); &CurrentMetrics::MaxDDLEntryID, &CurrentMetrics::MaxPushedDDLEntryID),
load_metadata_tasks);
} }
/// Do not keep tasks in server, they should be kept inside databases. Used here to make dependent tasks only.
load_metadata_tasks.clear();
load_metadata_tasks.shrink_to_fit();
{ {
std::lock_guard lock(servers_lock); std::lock_guard lock(servers_lock);
for (auto & server : servers) for (auto & server : servers)

View File

@ -75,6 +75,7 @@ String DatabaseAtomic::getTableDataPath(const ASTCreateQuery & query) const
void DatabaseAtomic::drop(ContextPtr) void DatabaseAtomic::drop(ContextPtr)
{ {
waitDatabaseStarted();
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty()); assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
try try
{ {
@ -90,6 +91,7 @@ void DatabaseAtomic::drop(ContextPtr)
void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path) void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name, const StoragePtr & table, const String & relative_table_path)
{ {
waitDatabaseStarted();
assert(relative_table_path != data_path && !relative_table_path.empty()); assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use; DetachedTables not_in_use;
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
@ -102,6 +104,7 @@ void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name,
StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String & name) StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String & name)
{ {
waitDatabaseStarted();
DetachedTables not_in_use; DetachedTables not_in_use;
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto table = DatabaseOrdinary::detachTableUnlocked(name); auto table = DatabaseOrdinary::detachTableUnlocked(name);
@ -113,9 +116,7 @@ StoragePtr DatabaseAtomic::detachTable(ContextPtr /* context */, const String &
void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_name, bool sync) void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{ {
// To DROP tables we need the database to be started up (including all the tables) waitDatabaseStarted();
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), getStartupTask());
auto table = tryGetTable(table_name, local_context); auto table = tryGetTable(table_name, local_context);
/// Remove the inner table (if any) to avoid deadlock /// Remove the inner table (if any) to avoid deadlock
/// (due to attempt to execute DROP from the worker thread) /// (due to attempt to execute DROP from the worker thread)
@ -127,12 +128,6 @@ void DatabaseAtomic::dropTable(ContextPtr local_context, const String & table_na
dropTableImpl(local_context, table_name, sync); dropTableImpl(local_context, table_name, sync);
} }
LoadTaskPtr DatabaseAtomic::getStartupTask()
{
std::scoped_lock lock(mutex);
return startup_atomic_database_task;
}
void DatabaseAtomic::dropTableImpl(ContextPtr local_context, const String & table_name, bool sync) void DatabaseAtomic::dropTableImpl(ContextPtr local_context, const String & table_name, bool sync)
{ {
String table_metadata_path = getObjectMetadataPath(table_name); String table_metadata_path = getObjectMetadataPath(table_name);
@ -185,6 +180,8 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
if (exchange && !supportsAtomicRename()) if (exchange && !supportsAtomicRename())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "RENAME EXCHANGE is not supported"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "RENAME EXCHANGE is not supported");
waitDatabaseStarted();
auto & other_db = dynamic_cast<DatabaseAtomic &>(to_database); auto & other_db = dynamic_cast<DatabaseAtomic &>(to_database);
bool inside_database = this == &other_db; bool inside_database = this == &other_db;
@ -454,7 +451,6 @@ void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, Loadin
LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode)
{ {
auto base = DatabaseOrdinary::startupDatabaseAsync(async_loader, std::move(startup_after), mode); auto base = DatabaseOrdinary::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
std::scoped_lock lock{mutex};
auto job = makeLoadJob( auto job = makeLoadJob(
base->goals(), base->goals(),
AsyncLoaderPoolId::BackgroundStartup, AsyncLoaderPoolId::BackgroundStartup,
@ -465,7 +461,7 @@ LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, Loa
return; return;
NameToPathMap table_names; NameToPathMap table_names;
{ {
std::lock_guard lock2{mutex}; std::lock_guard lock{mutex};
table_names = table_name_to_path; table_names = table_name_to_path;
} }
@ -476,6 +472,12 @@ LoadTaskPtr DatabaseAtomic::startupDatabaseAsync(AsyncLoader & async_loader, Loa
return startup_atomic_database_task = makeLoadTask(async_loader, {job}); return startup_atomic_database_task = makeLoadTask(async_loader, {job});
} }
void DatabaseAtomic::waitDatabaseStarted() const
{
assert(startup_atomic_database_task);
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_atomic_database_task);
}
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist) void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist)
{ {
try try
@ -543,6 +545,8 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new
{ {
/// CREATE, ATTACH, DROP, DETACH and RENAME DATABASE must hold DDLGuard /// CREATE, ATTACH, DROP, DETACH and RENAME DATABASE must hold DDLGuard
waitDatabaseStarted();
bool check_ref_deps = query_context->getSettingsRef().check_referential_table_dependencies; bool check_ref_deps = query_context->getSettingsRef().check_referential_table_dependencies;
bool check_loading_deps = !check_ref_deps && query_context->getSettingsRef().check_table_dependencies; bool check_loading_deps = !check_ref_deps && query_context->getSettingsRef().check_table_dependencies;
if (check_ref_deps || check_loading_deps) if (check_ref_deps || check_loading_deps)

View File

@ -51,6 +51,7 @@ public:
void beforeLoadingMetadata(ContextMutablePtr context, LoadingStrictnessLevel mode) override; void beforeLoadingMetadata(ContextMutablePtr context, LoadingStrictnessLevel mode) override;
LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override; LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
void waitDatabaseStarted() const override;
/// Atomic database cannot be detached if there is detached table which still in use /// Atomic database cannot be detached if there is detached table which still in use
void assertCanBeDetached(bool cleanup) override; void assertCanBeDetached(bool cleanup) override;
@ -73,8 +74,6 @@ protected:
using DetachedTables = std::unordered_map<UUID, StoragePtr>; using DetachedTables = std::unordered_map<UUID, StoragePtr>;
[[nodiscard]] DetachedTables cleanupDetachedTables() TSA_REQUIRES(mutex); [[nodiscard]] DetachedTables cleanupDetachedTables() TSA_REQUIRES(mutex);
LoadTaskPtr getStartupTask();
void tryCreateMetadataSymlink(); void tryCreateMetadataSymlink();
virtual bool allowMoveTableToOtherDatabaseEngine(IDatabase & /*to_database*/) const { return false; } virtual bool allowMoveTableToOtherDatabaseEngine(IDatabase & /*to_database*/) const { return false; }
@ -88,7 +87,7 @@ protected:
String path_to_metadata_symlink; String path_to_metadata_symlink;
const UUID db_uuid; const UUID db_uuid;
LoadTaskPtr startup_atomic_database_task TSA_GUARDED_BY(mutex); std::atomic<LoadTaskPtr> startup_atomic_database_task;
}; };
} }

View File

@ -188,6 +188,8 @@ void DatabaseOnDisk::createTable(
throw Exception( throw Exception(
ErrorCodes::TABLE_ALREADY_EXISTS, "Table {}.{} already exists", backQuote(getDatabaseName()), backQuote(table_name)); ErrorCodes::TABLE_ALREADY_EXISTS, "Table {}.{} already exists", backQuote(getDatabaseName()), backQuote(table_name));
waitDatabaseStarted();
String table_metadata_path = getObjectMetadataPath(table_name); String table_metadata_path = getObjectMetadataPath(table_name);
if (create.attach_short_syntax) if (create.attach_short_syntax)
@ -277,6 +279,8 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora
void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const String & table_name) void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const String & table_name)
{ {
waitDatabaseStarted();
auto table = detachTable(query_context, table_name); auto table = detachTable(query_context, table_name);
fs::path detached_permanently_flag(getObjectMetadataPath(table_name) + detached_suffix); fs::path detached_permanently_flag(getObjectMetadataPath(table_name) + detached_suffix);
@ -293,6 +297,8 @@ void DatabaseOnDisk::detachTablePermanently(ContextPtr query_context, const Stri
void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/) void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_name, bool /*sync*/)
{ {
waitDatabaseStarted();
String table_metadata_path = getObjectMetadataPath(table_name); String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_path_drop = table_metadata_path + drop_suffix; String table_metadata_path_drop = table_metadata_path + drop_suffix;
String table_data_path_relative = getTableDataPath(table_name); String table_data_path_relative = getTableDataPath(table_name);
@ -377,6 +383,8 @@ void DatabaseOnDisk::renameTable(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases of different engines is not supported"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases of different engines is not supported");
} }
waitDatabaseStarted();
auto table_data_relative_path = getTableDataPath(table_name); auto table_data_relative_path = getTableDataPath(table_name);
TableExclusiveLockHolder table_lock; TableExclusiveLockHolder table_lock;
String table_metadata_path; String table_metadata_path;
@ -518,6 +526,8 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery() const
void DatabaseOnDisk::drop(ContextPtr local_context) void DatabaseOnDisk::drop(ContextPtr local_context)
{ {
waitDatabaseStarted();
assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty()); assert(TSA_SUPPRESS_WARNING_FOR_READ(tables).empty());
if (local_context->getSettingsRef().force_remove_data_recursively_on_drop) if (local_context->getSettingsRef().force_remove_data_recursively_on_drop)
{ {

View File

@ -224,38 +224,45 @@ LoadTaskPtr DatabaseOrdinary::startupDatabaseAsync(
LoadJobSet startup_after, LoadJobSet startup_after,
LoadingStrictnessLevel /*mode*/) LoadingStrictnessLevel /*mode*/)
{ {
std::scoped_lock lock(mutex);
// NOTE: this task is empty, but it is required for correct dependency handling (startup should be done after tables loading) // NOTE: this task is empty, but it is required for correct dependency handling (startup should be done after tables loading)
auto job = makeLoadJob( auto job = makeLoadJob(
std::move(startup_after), std::move(startup_after),
AsyncLoaderPoolId::BackgroundStartup, AsyncLoaderPoolId::BackgroundStartup,
fmt::format("startup Ordinary database {}", database_name)); fmt::format("startup Ordinary database {}", database_name));
return makeLoadTask(async_loader, {job}); return startup_database_task = makeLoadTask(async_loader, {job});
}
void DatabaseOrdinary::waitTableStarted(const String & name) const
{
/// Prioritize jobs (load and startup the table) to be executed in foreground pool and wait for them synchronously
LoadTaskPtr task;
{
std::scoped_lock lock(mutex);
if (auto it = startup_table.find(name); it != startup_table.end())
task = it->second;
}
if (task)
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), task);
}
void DatabaseOrdinary::waitDatabaseStarted() const
{
/// Prioritize load and startup of all tables and database itself and wait for them synchronously
assert(startup_database_task);
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_database_task);
} }
// TODO(serxa): implement // TODO(serxa): implement
// DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const // DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
// } // }
StoragePtr DatabaseOrdinary::tryGetTable(const String & name, ContextPtr local_context) const
{
const LoadTaskPtr * startup_task = nullptr;
{
std::scoped_lock lock(mutex);
if (auto it = startup_table.find(name); it != startup_table.end())
startup_task = &it->second;
}
// Prioritize jobs (load and startup the table) to be executed in foreground pool and wait for them synchronously
if (startup_task)
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), *startup_task);
return DatabaseOnDisk::tryGetTable(name, local_context);
}
void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata) void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)
{ {
waitDatabaseStarted();
String table_name = table_id.table_name; String table_name = table_id.table_name;
/// Read the definition of the table and replace the necessary parts with new ones. /// Read the definition of the table and replace the necessary parts with new ones.
String table_metadata_path = getObjectMetadataPath(table_name); String table_metadata_path = getObjectMetadataPath(table_name);
String table_metadata_tmp_path = table_metadata_path + ".tmp"; String table_metadata_tmp_path = table_metadata_path + ".tmp";

View File

@ -49,13 +49,15 @@ public:
const QualifiedTableName & name, const QualifiedTableName & name,
LoadingStrictnessLevel mode) override; LoadingStrictnessLevel mode) override;
void waitTableStarted(const String & name) const override;
void waitDatabaseStarted() const override;
LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override; LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
// TODO(serxa): implement // TODO(serxa): implement
// DatabaseTablesIteratorPtr getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override; // DatabaseTablesIteratorPtr getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
StoragePtr tryGetTable(const String & name, ContextPtr local_context) const override;
void alterTable( void alterTable(
ContextPtr context, ContextPtr context,
const StorageID & table_id, const StorageID & table_id,
@ -75,6 +77,7 @@ protected:
std::unordered_map<String, LoadTaskPtr> load_table TSA_GUARDED_BY(mutex); std::unordered_map<String, LoadTaskPtr> load_table TSA_GUARDED_BY(mutex);
std::unordered_map<String, LoadTaskPtr> startup_table TSA_GUARDED_BY(mutex); std::unordered_map<String, LoadTaskPtr> startup_table TSA_GUARDED_BY(mutex);
std::atomic<LoadTaskPtr> startup_database_task;
std::atomic<size_t> total_tables_to_startup{0}; std::atomic<size_t> total_tables_to_startup{0};
std::atomic<size_t> tables_started{0}; std::atomic<size_t> tables_started{0};
AtomicStopwatch startup_watch; AtomicStopwatch startup_watch;

View File

@ -505,7 +505,6 @@ UInt64 DatabaseReplicated::getMetadataHash(const String & table_name) const
LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode)
{ {
auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode); auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
std::scoped_lock lock{mutex};
auto job = makeLoadJob( auto job = makeLoadJob(
base->goals(), base->goals(),
AsyncLoaderPoolId::BackgroundStartup, AsyncLoaderPoolId::BackgroundStartup,
@ -531,6 +530,12 @@ LoadTaskPtr DatabaseReplicated::startupDatabaseAsync(AsyncLoader & async_loader,
return startup_replicated_database_task = makeLoadTask(async_loader, {job}); return startup_replicated_database_task = makeLoadTask(async_loader, {job});
} }
void DatabaseReplicated::waitDatabaseStarted() const
{
assert(startup_replicated_database_task);
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_replicated_database_task);
}
bool DatabaseReplicated::checkDigestValid(const ContextPtr & local_context, bool debug_check /* = true */) const bool DatabaseReplicated::checkDigestValid(const ContextPtr & local_context, bool debug_check /* = true */) const
{ {
if (debug_check) if (debug_check)
@ -688,6 +693,7 @@ void DatabaseReplicated::checkQueryValid(const ASTPtr & query, ContextPtr query_
BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal) BlockIO DatabaseReplicated::tryEnqueueReplicatedDDL(const ASTPtr & query, ContextPtr query_context, bool internal)
{ {
waitDatabaseStarted();
if (query_context->getCurrentTransaction() && query_context->getSettingsRef().throw_on_unsupported_query_inside_transaction) if (query_context->getCurrentTransaction() && query_context->getSettingsRef().throw_on_unsupported_query_inside_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Distributed DDL queries inside transactions are not supported"); throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Distributed DDL queries inside transactions are not supported");
@ -734,6 +740,8 @@ static UUID getTableUUIDIfReplicated(const String & metadata, ContextPtr context
void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr) void DatabaseReplicated::recoverLostReplica(const ZooKeeperPtr & current_zookeeper, UInt32 our_log_ptr, UInt32 & max_log_ptr)
{ {
waitDatabaseStarted();
is_recovering = true; is_recovering = true;
SCOPE_EXIT({ is_recovering = false; }); SCOPE_EXIT({ is_recovering = false; });
@ -1162,6 +1170,8 @@ void DatabaseReplicated::drop(ContextPtr context_)
return; return;
} }
waitDatabaseStarted();
auto current_zookeeper = getZooKeeper(); auto current_zookeeper = getZooKeeper();
current_zookeeper->set(replica_path, DROPPED_MARK, -1); current_zookeeper->set(replica_path, DROPPED_MARK, -1);
createEmptyLogEntry(current_zookeeper); createEmptyLogEntry(current_zookeeper);
@ -1179,6 +1189,7 @@ void DatabaseReplicated::drop(ContextPtr context_)
void DatabaseReplicated::stopReplication() void DatabaseReplicated::stopReplication()
{ {
waitDatabaseStarted();
if (ddl_worker) if (ddl_worker)
ddl_worker->shutdown(); ddl_worker->shutdown();
} }
@ -1194,6 +1205,8 @@ void DatabaseReplicated::shutdown()
void DatabaseReplicated::dropTable(ContextPtr local_context, const String & table_name, bool sync) void DatabaseReplicated::dropTable(ContextPtr local_context, const String & table_name, bool sync)
{ {
waitDatabaseStarted();
auto txn = local_context->getZooKeeperMetadataTransaction(); auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker || !ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id.")); assert(!ddl_worker || !ddl_worker->isCurrentlyActive() || txn || startsWith(table_name, ".inner_id."));
if (txn && txn->isInitialQuery() && !txn->isCreateOrReplaceQuery()) if (txn && txn->isInitialQuery() && !txn->isCreateOrReplaceQuery())
@ -1236,6 +1249,8 @@ void DatabaseReplicated::renameTable(ContextPtr local_context, const String & ta
if (exchange && !to_database.isTableExist(to_table_name, local_context)) if (exchange && !to_database.isTableExist(to_table_name, local_context))
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", to_table_name); throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", to_table_name);
waitDatabaseStarted();
String statement = readMetadataFile(table_name); String statement = readMetadataFile(table_name);
String statement_to; String statement_to;
if (exchange) if (exchange)
@ -1336,6 +1351,8 @@ bool DatabaseReplicated::canExecuteReplicatedMetadataAlter() const
void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const String & table_name) void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const String & table_name)
{ {
waitDatabaseStarted();
auto txn = local_context->getZooKeeperMetadataTransaction(); auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn); assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->isInitialQuery()) if (txn && txn->isInitialQuery())
@ -1359,6 +1376,8 @@ void DatabaseReplicated::detachTablePermanently(ContextPtr local_context, const
void DatabaseReplicated::removeDetachedPermanentlyFlag(ContextPtr local_context, const String & table_name, const String & table_metadata_path, bool attach) void DatabaseReplicated::removeDetachedPermanentlyFlag(ContextPtr local_context, const String & table_name, const String & table_metadata_path, bool attach)
{ {
waitDatabaseStarted();
auto txn = local_context->getZooKeeperMetadataTransaction(); auto txn = local_context->getZooKeeperMetadataTransaction();
assert(!ddl_worker->isCurrentlyActive() || txn); assert(!ddl_worker->isCurrentlyActive() || txn);
if (txn && txn->isInitialQuery() && attach) if (txn && txn->isInitialQuery() && attach)
@ -1395,6 +1414,8 @@ String DatabaseReplicated::readMetadataFile(const String & table_name) const
std::vector<std::pair<ASTPtr, StoragePtr>> std::vector<std::pair<ASTPtr, StoragePtr>>
DatabaseReplicated::getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr &) const DatabaseReplicated::getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr &) const
{ {
waitDatabaseStarted();
/// Here we read metadata from ZooKeeper. We could do that by simple call of DatabaseAtomic::getTablesForBackup() however /// Here we read metadata from ZooKeeper. We could do that by simple call of DatabaseAtomic::getTablesForBackup() however
/// reading from ZooKeeper is better because thus we won't be dependent on how fast the replication queue of this database is. /// reading from ZooKeeper is better because thus we won't be dependent on how fast the replication queue of this database is.
std::vector<std::pair<ASTPtr, StoragePtr>> res; std::vector<std::pair<ASTPtr, StoragePtr>> res;
@ -1442,6 +1463,8 @@ void DatabaseReplicated::createTableRestoredFromBackup(
std::shared_ptr<IRestoreCoordination> restore_coordination, std::shared_ptr<IRestoreCoordination> restore_coordination,
UInt64 timeout_ms) UInt64 timeout_ms)
{ {
waitDatabaseStarted();
/// Because of the replication multiple nodes can try to restore the same tables again and failed with "Table already exists" /// Because of the replication multiple nodes can try to restore the same tables again and failed with "Table already exists"
/// because of some table could be restored already on other node and then replicated to this node. /// because of some table could be restored already on other node and then replicated to this node.
/// To solve this problem we use the restore coordination: the first node calls /// To solve this problem we use the restore coordination: the first node calls

View File

@ -121,6 +121,8 @@ private:
UInt64 getMetadataHash(const String & table_name) const; UInt64 getMetadataHash(const String & table_name) const;
bool checkDigestValid(const ContextPtr & local_context, bool debug_check = true) const TSA_REQUIRES(metadata_mutex); bool checkDigestValid(const ContextPtr & local_context, bool debug_check = true) const TSA_REQUIRES(metadata_mutex);
void waitDatabaseStarted() const override;
String zookeeper_path; String zookeeper_path;
String shard_name; String shard_name;
String replica_name; String replica_name;
@ -147,7 +149,7 @@ private:
mutable ClusterPtr cluster; mutable ClusterPtr cluster;
LoadTaskPtr startup_replicated_database_task TSA_GUARDED_BY(mutex); std::atomic<LoadTaskPtr> startup_replicated_database_task;
}; };
} }

View File

@ -199,6 +199,7 @@ bool DatabaseWithOwnTablesBase::isTableExist(const String & table_name, ContextP
StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, ContextPtr) const StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, ContextPtr) const
{ {
waitTableStarted(table_name);
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto it = tables.find(table_name); auto it = tables.find(table_name);
if (it != tables.end()) if (it != tables.end())

View File

@ -198,6 +198,12 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Not implemented");
} }
/// Waits for specific table to be started up, i.e. task returned by `startupTableAsync()` is done
virtual void waitTableStarted(const String & /*name*/) const {}
/// Waits for the database to be started up, i.e. task returned by `startupDatabaseAsync()` is done
virtual void waitDatabaseStarted() const {}
/// Check the existence of the table in memory (attached). /// Check the existence of the table in memory (attached).
virtual bool isTableExist(const String & name, ContextPtr context) const = 0; virtual bool isTableExist(const String & name, ContextPtr context) const = 0;

View File

@ -67,7 +67,6 @@ void DatabaseMaterializedMySQL::setException(const std::exception_ptr & exceptio
LoadTaskPtr DatabaseMaterializedMySQL::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) LoadTaskPtr DatabaseMaterializedMySQL::startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode)
{ {
auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode); auto base = DatabaseAtomic::startupDatabaseAsync(async_loader, std::move(startup_after), mode);
std::scoped_lock lock{mutex};
auto job = makeLoadJob( auto job = makeLoadJob(
base->goals(), base->goals(),
AsyncLoaderPoolId::BackgroundStartup, AsyncLoaderPoolId::BackgroundStartup,
@ -84,6 +83,12 @@ LoadTaskPtr DatabaseMaterializedMySQL::startupDatabaseAsync(AsyncLoader & async_
return startup_mysql_database_task = makeLoadTask(async_loader, {job}); return startup_mysql_database_task = makeLoadTask(async_loader, {job});
} }
void DatabaseMaterializedMySQL::waitDatabaseStarted() const
{
assert(startup_mysql_database_task);
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_mysql_database_task);
}
void DatabaseMaterializedMySQL::createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) void DatabaseMaterializedMySQL::createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query)
{ {
checkIsInternalQuery(context_, "CREATE TABLE"); checkIsInternalQuery(context_, "CREATE TABLE");
@ -169,6 +174,7 @@ void DatabaseMaterializedMySQL::checkIsInternalQuery(ContextPtr context_, const
void DatabaseMaterializedMySQL::stopReplication() void DatabaseMaterializedMySQL::stopReplication()
{ {
waitDatabaseStarted();
materialize_thread.stopSynchronization(); materialize_thread.stopSynchronization();
started_up = false; started_up = false;
} }

View File

@ -46,12 +46,13 @@ protected:
std::atomic_bool started_up{false}; std::atomic_bool started_up{false};
LoadTaskPtr startup_mysql_database_task; std::atomic<LoadTaskPtr> startup_mysql_database_task;
public: public:
String getEngineName() const override { return "MaterializedMySQL"; } String getEngineName() const override { return "MaterializedMySQL"; }
LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override; LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
void waitDatabaseStarted() const override;
void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override; void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override;

View File

@ -142,6 +142,12 @@ LoadTaskPtr DatabaseMaterializedPostgreSQL::startupDatabaseAsync(AsyncLoader & a
return startup_postgresql_database_task = makeLoadTask(async_loader, {job}); return startup_postgresql_database_task = makeLoadTask(async_loader, {job});
} }
void DatabaseMaterializedPostgreSQL::waitDatabaseStarted() const
{
assert(startup_postgresql_database_task);
waitLoad(currentPoolOr(AsyncLoaderPoolId::Foreground), startup_postgresql_database_task);
}
void DatabaseMaterializedPostgreSQL::applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context) void DatabaseMaterializedPostgreSQL::applySettingsChanges(const SettingsChanges & settings_changes, ContextPtr query_context)
{ {
std::lock_guard lock(handler_mutex); std::lock_guard lock(handler_mutex);
@ -418,6 +424,8 @@ void DatabaseMaterializedPostgreSQL::shutdown()
void DatabaseMaterializedPostgreSQL::stopReplication() void DatabaseMaterializedPostgreSQL::stopReplication()
{ {
waitDatabaseStarted();
std::lock_guard lock(handler_mutex); std::lock_guard lock(handler_mutex);
if (replication_handler) if (replication_handler)
replication_handler->shutdown(); replication_handler->shutdown();

View File

@ -41,6 +41,7 @@ public:
String getMetadataPath() const override { return metadata_path; } String getMetadataPath() const override { return metadata_path; }
LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override; LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
void waitDatabaseStarted() const override;
DatabaseTablesIteratorPtr DatabaseTablesIteratorPtr
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override; getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
@ -92,7 +93,7 @@ private:
BackgroundSchedulePool::TaskHolder startup_task; BackgroundSchedulePool::TaskHolder startup_task;
bool shutdown_called = false; bool shutdown_called = false;
LoadTaskPtr startup_postgresql_database_task; std::atomic<LoadTaskPtr> startup_postgresql_database_task;
}; };
} }

View File

@ -315,6 +315,7 @@ struct ContextSharedPart : boost::noncopyable
MultiVersion<Macros> macros; /// Substitutions extracted from config. MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk. std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
std::atomic<LoadTaskPtr> ddl_worker_startup_task; /// To postpone `ddl_worker->startup()` after all tables startup
/// Rules for selecting the compression settings, depending on the size of the part. /// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector; mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
/// Storage disk chooser for MergeTree engines /// Storage disk chooser for MergeTree engines
@ -2935,17 +2936,36 @@ bool Context::hasDistributedDDL() const
return getConfigRef().has("distributed_ddl"); return getConfigRef().has("distributed_ddl");
} }
void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker) void Context::setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker, const LoadTaskPtrs & startup_after)
{ {
auto lock = getGlobalLock(); auto lock = getGlobalLock();
if (shared->ddl_worker) if (shared->ddl_worker)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DDL background thread has already been initialized"); throw Exception(ErrorCodes::LOGICAL_ERROR, "DDL background thread has already been initialized");
ddl_worker->startup();
shared->ddl_worker = std::move(ddl_worker); shared->ddl_worker = std::move(ddl_worker);
auto job = makeLoadJob(
getGoals(startup_after),
AsyncLoaderPoolId::BackgroundStartup,
"startup ddl worker",
[this] (AsyncLoader &, const LoadJobPtr &)
{
auto lock2 = getGlobalSharedLock();
shared->ddl_worker->startup();
});
shared->ddl_worker_startup_task = makeLoadTask(getAsyncLoader(), {job});
} }
DDLWorker & Context::getDDLWorker() const DDLWorker & Context::getDDLWorker() const
{ {
// We have to ensure that DDL worker will not interfere with async loading of tables.
// For example to prevent creation of a table that already exists, but has not been yet loaded.
// So we have to wait for all tables to be loaded before starting up DDL worker.
// NOTE: Possible improvement: above requirement can be loosen by waiting for specific tables to load.
if (shared->ddl_worker_startup_task)
waitLoad(shared->ddl_worker_startup_task); // Just wait and do not prioritize, because it depends on all load and startup tasks
auto lock = getGlobalSharedLock(); auto lock = getGlobalSharedLock();
if (!shared->ddl_worker) if (!shared->ddl_worker)
{ {

View File

@ -110,6 +110,7 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, ContextPtr context,
/// TODO: support per-cluster grant /// TODO: support per-cluster grant
context->checkAccess(AccessType::CLUSTER); context->checkAccess(AccessType::CLUSTER);
/// NOTE: if `async_load_databases = true`, then it block until ddl_worker is started, which includes startup of all related tables.
DDLWorker & ddl_worker = context->getDDLWorker(); DDLWorker & ddl_worker = context->getDDLWorker();
/// Enumerate hosts which will be used to send query. /// Enumerate hosts which will be used to send query.