Fix db iterator wait during async metrics collection

This commit is contained in:
serxa 2024-03-18 15:39:21 +00:00
parent e8a08baf23
commit 051103b0e0
33 changed files with 85 additions and 56 deletions

View File

@ -416,9 +416,9 @@ void DatabaseAtomic::assertCanBeDetached(bool cleanup)
}
DatabaseTablesIteratorPtr
DatabaseAtomic::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction & filter_by_table_name) const
DatabaseAtomic::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const
{
auto base_iter = DatabaseOrdinary::getTablesIterator(local_context, filter_by_table_name);
auto base_iter = DatabaseOrdinary::getTablesIterator(local_context, filter_by_table_name, skip_not_loaded);
return std::make_unique<AtomicDatabaseTablesSnapshotIterator>(std::move(typeid_cast<DatabaseTablesSnapshotIterator &>(*base_iter)));
}

View File

@ -46,7 +46,7 @@ public:
void drop(ContextPtr /*context*/) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
void beforeLoadingMetadata(ContextMutablePtr context, LoadingStrictnessLevel mode) override;

View File

@ -80,7 +80,7 @@ StoragePtr DatabaseDictionary::tryGetTable(const String & table_name, ContextPtr
return createStorageDictionary(getDatabaseName(), load_result, getContext());
}
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseDictionary::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name, bool /* skip_not_loaded */) const
{
return std::make_unique<DatabaseTablesSnapshotIterator>(listTables(filter_by_table_name), getDatabaseName());
}

View File

@ -34,7 +34,7 @@ public:
StoragePtr tryGetTable(const String & table_name, ContextPtr context) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
bool empty() const override;

View File

@ -229,7 +229,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseFilesystem::getTablesForBacku
* Returns an empty iterator because the database does not have its own tables
* But only caches them for quick access
*/
DatabaseTablesIteratorPtr DatabaseFilesystem::getTablesIterator(ContextPtr, const FilterByNameFunction &) const
DatabaseTablesIteratorPtr DatabaseFilesystem::getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const
{
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}

View File

@ -45,7 +45,7 @@ public:
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const override;
protected:
StoragePtr getTableImpl(const String & name, ContextPtr context, bool throw_on_error) const;

View File

@ -45,7 +45,7 @@ public:
void shutdown() override;
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const override;
protected:
StoragePtr getTableImpl(const String & name, ContextPtr context) const;

View File

@ -152,7 +152,7 @@ StoragePtr DatabaseLazy::tryGetTable(const String & table_name) const
return loadTable(table_name);
}
DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name, bool /* skip_not_loaded */) const
{
std::lock_guard lock(mutex);
Strings filtered_tables;

View File

@ -62,7 +62,7 @@ public:
bool empty() const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
void attachTable(ContextPtr context, const String & table_name, const StoragePtr & table, const String & relative_table_path) override;

View File

@ -438,24 +438,40 @@ void DatabaseOrdinary::stopLoading()
stop_load_table.clear();
}
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseOrdinary::getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const
{
// Wait for every table (matching the filter) to be loaded and started up before we make the snapshot.
// It is important, because otherwise table might be:
// - not attached and thus will be missed in the snapshot;
// - not started, which is not good for DDL operations.
LoadTaskPtrs tasks_to_wait;
if (!skip_not_loaded)
{
// Wait for every table (matching the filter) to be loaded and started up before we make the snapshot.
// It is important, because otherwise table might be:
// - not attached and thus will be missed in the snapshot;
// - not started, which is not good for DDL operations.
LoadTaskPtrs tasks_to_wait;
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)
tasks_to_wait.reserve(startup_table.size());
for (const auto & [table_name, task] : startup_table)
if (!filter_by_table_name || filter_by_table_name(table_name))
tasks_to_wait.emplace_back(task);
}
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), tasks_to_wait);
}
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name, skip_not_loaded);
}
Strings DatabaseOrdinary::getAllTableNames(ContextPtr context) const
{
std::set<String> unique_names;
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)
tasks_to_wait.reserve(startup_table.size());
for (const auto & [table_name, task] : startup_table)
if (!filter_by_table_name || filter_by_table_name(table_name))
tasks_to_wait.emplace_back(task);
for (const auto & [table_name, _] : tables)
unique_names.emplace(table_name);
// Not yet loaded table are not listed in `tables`, so we have to add table names from tasks
for (const auto & [table_name, _] : startup_table)
unique_names.emplace(table_name);
}
waitLoad(currentPoolOr(TablesLoaderForegroundPoolId), tasks_to_wait);
return DatabaseWithOwnTablesBase::getTablesIterator(local_context, filter_by_table_name);
return {unique_names.begin(), unique_names.end()};
}
void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & table_id, const StorageInMemoryMetadata & metadata)

View File

@ -56,7 +56,8 @@ public:
LoadTaskPtr startupDatabaseAsync(AsyncLoader & async_loader, LoadJobSet startup_after, LoadingStrictnessLevel mode) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
Strings getAllTableNames(ContextPtr context) const override;
void alterTable(
ContextPtr context,

View File

@ -1323,13 +1323,13 @@ void DatabaseReplicated::drop(ContextPtr context_)
void DatabaseReplicated::stopReplication()
{
stopLoading();
if (ddl_worker)
ddl_worker->shutdown();
}
void DatabaseReplicated::shutdown()
{
stopLoading();
stopReplication();
ddl_worker_initialized = false;
ddl_worker = nullptr;

View File

@ -303,7 +303,7 @@ std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseS3::getTablesForBackup(const
* Returns an empty iterator because the database does not have its own tables
* But only caches them for quick access
*/
DatabaseTablesIteratorPtr DatabaseS3::getTablesIterator(ContextPtr, const FilterByNameFunction &) const
DatabaseTablesIteratorPtr DatabaseS3::getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const
{
return std::make_unique<DatabaseTablesSnapshotIterator>(Tables{}, getDatabaseName());
}

View File

@ -56,7 +56,7 @@ public:
void shutdown() override;
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr, const FilterByNameFunction &, bool) const override;
static Configuration parseArguments(ASTs engine_args, ContextPtr context);

View File

@ -214,7 +214,7 @@ StoragePtr DatabaseWithOwnTablesBase::tryGetTable(const String & table_name, Con
return tryGetTableNoWait(table_name);
}
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseWithOwnTablesBase::getTablesIterator(ContextPtr, const FilterByNameFunction & filter_by_table_name, bool /* skip_not_loaded */) const
{
std::lock_guard lock(mutex);
if (!filter_by_table_name)
@ -347,7 +347,7 @@ StoragePtr DatabaseWithOwnTablesBase::getTableUnlocked(const String & table_name
backQuote(database_name), backQuote(table_name));
}
std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseWithOwnTablesBase::getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context) const
std::vector<std::pair<ASTPtr, StoragePtr>> DatabaseWithOwnTablesBase::getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context, bool skip_not_loaded) const
{
std::vector<std::pair<ASTPtr, StoragePtr>> res;

View File

@ -35,7 +35,7 @@ public:
StoragePtr detachTable(ContextPtr context, const String & table_name) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
std::vector<std::pair<ASTPtr, StoragePtr>> getTablesForBackup(const FilterByNameFunction & filter, const ContextPtr & local_context) const override;
void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms) override;

View File

@ -254,7 +254,7 @@ void DatabasesOverlay::shutdown()
db->shutdown();
}
DatabaseTablesIteratorPtr DatabasesOverlay::getTablesIterator(ContextPtr context_, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabasesOverlay::getTablesIterator(ContextPtr context_, const FilterByNameFunction & filter_by_table_name, bool /*skip_not_loaded*/) const
{
Tables tables;
for (const auto & db : databases)

View File

@ -52,7 +52,7 @@ public:
void createTableRestoredFromBackup(const ASTPtr & create_table_query, ContextMutablePtr local_context, std::shared_ptr<IRestoreCoordination> restore_coordination, UInt64 timeout_ms) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
bool empty() const override;

View File

@ -229,7 +229,18 @@ public:
/// Get an iterator that allows you to pass through all the tables.
/// It is possible to have "hidden" tables that are not visible when passing through, but are visible if you get them by name using the functions above.
virtual DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}) const = 0; /// NOLINT
/// Wait for all tables to be loaded and started up. If `skip_not_loaded` is true, then not yet loaded or not yet started up (at the moment of iterator creation) tables are excluded.
virtual DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name = {}, bool skip_not_loaded = false) const = 0; /// NOLINT
/// Returns list of table names.
virtual Strings getAllTableNames(ContextPtr context) const
{
// NOTE: This default implementation wait for all tables to be loaded and started up. It should be reimplemented for databases that support async loading.
Strings result;
for (auto table_it = getTablesIterator(context); table_it->isValid(); table_it->next())
result.emplace_back(table_it->name());
return result;
}
/// Is the database empty.
virtual bool empty() const = 0;

View File

@ -185,9 +185,9 @@ StoragePtr DatabaseMaterializedMySQL::tryGetTable(const String & name, ContextPt
}
DatabaseTablesIteratorPtr
DatabaseMaterializedMySQL::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
DatabaseMaterializedMySQL::getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const
{
DatabaseTablesIteratorPtr iterator = DatabaseAtomic::getTablesIterator(context_, filter_by_table_name);
DatabaseTablesIteratorPtr iterator = DatabaseAtomic::getTablesIterator(context_, filter_by_table_name, skip_not_loaded);
if (context_->isInternalQuery())
return iterator;
return std::make_unique<DatabaseMaterializedTablesIterator>(std::move(iterator), this);
@ -201,7 +201,6 @@ void DatabaseMaterializedMySQL::checkIsInternalQuery(ContextPtr context_, const
void DatabaseMaterializedMySQL::stopReplication()
{
stopLoading();
materialize_thread.stopSynchronization();
started_up = false;
}

View File

@ -73,7 +73,7 @@ public:
StoragePtr tryGetTable(const String & name, ContextPtr context_) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context_, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
void checkIsInternalQuery(ContextPtr context_, const char * method) const;

View File

@ -105,7 +105,7 @@ bool DatabaseMySQL::empty() const
return true;
}
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & filter_by_table_name) const
DatabaseTablesIteratorPtr DatabaseMySQL::getTablesIterator(ContextPtr local_context, const FilterByNameFunction & filter_by_table_name, bool /* skip_not_loaded */) const
{
Tables tables;
std::lock_guard lock(mutex);

View File

@ -58,7 +58,7 @@ public:
bool empty() const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_nam, bool skip_not_loaded) const override;
ASTPtr getCreateDatabaseQuery() const override;

View File

@ -455,8 +455,6 @@ void DatabaseMaterializedPostgreSQL::shutdown()
void DatabaseMaterializedPostgreSQL::stopReplication()
{
stopLoading();
std::lock_guard lock(handler_mutex);
if (replication_handler)
replication_handler->shutdown();
@ -484,10 +482,10 @@ void DatabaseMaterializedPostgreSQL::drop(ContextPtr local_context)
DatabaseTablesIteratorPtr DatabaseMaterializedPostgreSQL::getTablesIterator(
ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const
ContextPtr local_context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const
{
/// Modify context into nested_context and pass query to Atomic database.
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name);
return DatabaseAtomic::getTablesIterator(StorageMaterializedPostgreSQL::makeNestedTableContext(local_context), filter_by_table_name, skip_not_loaded);
}
void registerDatabaseMaterializedPostgreSQL(DatabaseFactory & factory)

View File

@ -45,7 +45,7 @@ public:
void stopLoading() override;
DatabaseTablesIteratorPtr
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;

View File

@ -46,7 +46,7 @@ public:
void loadStoredObjects(ContextMutablePtr, LoadingStrictnessLevel /*mode*/) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
bool isTableExist(const String & name, ContextPtr context) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;

View File

@ -46,7 +46,7 @@ bool DatabaseSQLite::empty() const
}
DatabaseTablesIteratorPtr DatabaseSQLite::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction &) const
DatabaseTablesIteratorPtr DatabaseSQLite::getTablesIterator(ContextPtr local_context, const IDatabase::FilterByNameFunction &, bool) const
{
std::lock_guard lock(mutex);

View File

@ -32,7 +32,7 @@ public:
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name, bool skip_not_loaded) const override;
bool empty() const override;

View File

@ -1602,6 +1602,9 @@ void DatabaseCatalog::reloadDisksTask()
for (auto & database : getDatabases())
{
// WARNING: In case of `async_load_databases = true` getTablesIterator() call wait for all table in the database to be loaded.
// WARNING: It means that no database will be able to update configuration until all databases are fully loaded.
// TODO: We can split this task by table or by database to make loaded table operate as usual.
auto it = database.second->getTablesIterator(getContext());
while (it->isValid())
{

View File

@ -418,11 +418,7 @@ public:
Names getAllRegisteredNames() const override
{
Names result;
if (database)
for (auto table_it = database->getTablesIterator(context); table_it->isValid(); table_it->next())
result.emplace_back(table_it->name());
return result;
return database ? database->getAllTableNames(context) : {};
}
private:

View File

@ -400,6 +400,7 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
table_context->setInternalQuery(true);
/// Do not hold extra shared pointers to tables
std::vector<std::pair<String, bool>> tables_to_drop;
// NOTE: This means we wait for all tables to be loaded inside getTablesIterator() call in case of `async_load_databases = true`.
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
{
auto table_ptr = iterator->table();

View File

@ -278,7 +278,8 @@ void ServerAsynchronousMetrics::updateImpl(TimePoint update_time, TimePoint curr
bool is_system = db.first == DatabaseCatalog::SYSTEM_DATABASE;
for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
// Note that we skip not yet loaded tables, so metrics could possibly be lower than expected on fully loaded database just after server start if `async_load_databases = true`.
for (auto iterator = db.second->getTablesIterator(getContext(), {}, true); iterator->isValid(); iterator->next())
{
++total_number_of_tables;
if (is_system)
@ -408,7 +409,7 @@ void ServerAsynchronousMetrics::updateDetachedPartsStats()
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
for (auto iterator = db.second->getTablesIterator(getContext(), {}, true); iterator->isValid(); iterator->next())
{
const auto & table = iterator->table();
if (!table)

View File

@ -51,7 +51,10 @@ void ReplicasStatusHandler::handleRequest(HTTPServerRequest & request, HTTPServe
if (!db.second->canContainMergeTreeTables())
continue;
for (auto iterator = db.second->getTablesIterator(getContext()); iterator->isValid(); iterator->next())
// Note that in case `async_load_databases = true` we do not want replica status handler to be hanging
// and waiting (in getTablesIterator() call) for every table to be load, so we just skip not-yet-loaded tables.
// If they have some lag it will be reflected as soon as they are load.
for (auto iterator = db.second->getTablesIterator(getContext(), {}, true); iterator->isValid(); iterator->next())
{
const auto & table = iterator->table();
if (!table)