Merge pull request #28313 from amosbird/fastload

Fast data parts loading by delaying table startup process
This commit is contained in:
tavplubix 2021-08-30 13:53:55 +03:00 committed by GitHub
commit b47f34aa17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 71 additions and 34 deletions

View File

@ -416,7 +416,8 @@ UUID DatabaseAtomic::tryGetTableUUID(const String & table_name) const
return UUIDHelpers::Nil;
}
void DatabaseAtomic::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach)
void DatabaseAtomic::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
if (has_force_restore_data_flag)
@ -433,7 +434,7 @@ void DatabaseAtomic::loadStoredObjects(ContextMutablePtr local_context, bool has
}
}
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);
DatabaseOrdinary::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
if (has_force_restore_data_flag)
{

View File

@ -47,7 +47,7 @@ public:
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
/// Atomic database cannot be detached if there is detached table which still in use
void assertCanBeDetached(bool cleanup) override;

View File

@ -36,9 +36,7 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
void DatabaseLazy::loadStoredObjects(
ContextMutablePtr local_context,
bool /* has_force_restore_data_flag */,
bool /*force_attach*/)
ContextMutablePtr local_context, bool /* has_force_restore_data_flag */, bool /*force_attach*/, bool /* skip_startup_tables */)
{
iterateMetadataFiles(local_context, [this](const String & file_name)
{
@ -246,6 +244,8 @@ StoragePtr DatabaseLazy::loadTable(const String & table_name) const
if (!ast || !endsWith(table->getName(), "Log"))
throw Exception("Only *Log tables can be used with Lazy database engine.", ErrorCodes::LOGICAL_ERROR);
table->startup();
{
std::lock_guard lock(mutex);
auto it = tables_cache.find(table_name);

View File

@ -26,9 +26,7 @@ public:
bool canContainDistributedTables() const override { return false; }
void loadStoredObjects(
ContextMutablePtr context,
bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void createTable(
ContextPtr context,

View File

@ -83,7 +83,8 @@ DatabaseOrdinary::DatabaseOrdinary(
{
}
void DatabaseOrdinary::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool /*force_attach*/)
void DatabaseOrdinary::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool /*force_attach*/, bool skip_startup_tables)
{
/** Tables load faster if they are loaded in sorted (by name) order.
* Otherwise (for the ext4 filesystem), `DirectoryIterator` iterates through them in some order,
@ -201,12 +202,20 @@ void DatabaseOrdinary::loadStoredObjects(ContextMutablePtr local_context, bool h
pool.wait();
/// After all tables was basically initialized, startup them.
startupTables(pool);
if (!skip_startup_tables)
{
/// After all tables was basically initialized, startup them.
startupTablesImpl(pool);
}
}
void DatabaseOrdinary::startupTables()
{
ThreadPool pool;
startupTablesImpl(pool);
}
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
void DatabaseOrdinary::startupTablesImpl(ThreadPool & thread_pool)
{
LOG_INFO(log, "Starting up tables.");

View File

@ -20,7 +20,9 @@ public:
String getEngineName() const override { return "Ordinary"; }
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void startupTables() override;
void alterTable(
ContextPtr context,
@ -35,7 +37,7 @@ protected:
const String & statement,
ContextPtr query_context);
void startupTables(ThreadPool & thread_pool);
void startupTablesImpl(ThreadPool & thread_pool);
};
}

View File

@ -305,11 +305,12 @@ void DatabaseReplicated::createReplicaNodesInZooKeeper(const zkutil::ZooKeeperPt
createEmptyLogEntry(current_zookeeper);
}
void DatabaseReplicated::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach)
void DatabaseReplicated::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{
tryConnectToZooKeeperAndInitDatabase(force_attach);
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
ddl_worker = std::make_unique<DatabaseReplicatedDDLWorker>(this, getContext());
ddl_worker->startup();

View File

@ -57,7 +57,7 @@ public:
void drop(ContextPtr /*context*/) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void shutdown() override;
friend struct DatabaseReplicatedTask;

View File

@ -123,7 +123,15 @@ public:
/// Load a set of existing tables.
/// You can call only once, right after the object is created.
virtual void loadStoredObjects(ContextMutablePtr /*context*/, bool /*has_force_restore_data_flag*/, bool /*force_attach*/ = false) {}
virtual void loadStoredObjects(
ContextMutablePtr /*context*/,
bool /*has_force_restore_data_flag*/,
bool /*force_attach*/ = false,
bool /* skip_startup_tables */ = false)
{
}
virtual void startupTables() {}
/// Check the existence of the table.
virtual bool isTableExist(const String & name, ContextPtr context) const = 0;

View File

@ -93,10 +93,11 @@ void DatabaseMaterializedMySQL<Base>::setException(const std::exception_ptr & ex
exception = exception_;
}
template<typename Base>
void DatabaseMaterializedMySQL<Base>::loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach)
template <typename Base>
void DatabaseMaterializedMySQL<Base>::loadStoredObjects(
ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{
Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach);
Base::loadStoredObjects(context_, has_force_restore_data_flag, force_attach, skip_startup_tables);
if (!force_attach)
materialize_thread.assertMySQLAvailable();

View File

@ -43,7 +43,7 @@ protected:
public:
String getEngineName() const override { return "MaterializedMySQL"; }
void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr context_, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables) override;
void createTable(ContextPtr context_, const String & name, const StoragePtr & table, const ASTPtr & query) override;

View File

@ -405,7 +405,7 @@ String DatabaseMySQL::getMetadataPath() const
return metadata_path;
}
void DatabaseMySQL::loadStoredObjects(ContextMutablePtr, bool, bool /*force_attach*/)
void DatabaseMySQL::loadStoredObjects(ContextMutablePtr, bool, bool /*force_attach*/, bool /* skip_startup_tables */)
{
std::lock_guard<std::mutex> lock{mutex};

View File

@ -75,7 +75,7 @@ public:
void createTable(ContextPtr, const String & table_name, const StoragePtr & storage, const ASTPtr & create_query) override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
StoragePtr detachTable(const String & table_name) override;

View File

@ -109,9 +109,10 @@ void DatabaseMaterializedPostgreSQL::startSynchronization()
}
void DatabaseMaterializedPostgreSQL::loadStoredObjects(ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach)
void DatabaseMaterializedPostgreSQL::loadStoredObjects(
ContextMutablePtr local_context, bool has_force_restore_data_flag, bool force_attach, bool skip_startup_tables)
{
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach);
DatabaseAtomic::loadStoredObjects(local_context, has_force_restore_data_flag, force_attach, skip_startup_tables);
try
{
@ -124,7 +125,6 @@ void DatabaseMaterializedPostgreSQL::loadStoredObjects(ContextMutablePtr local_c
if (!force_attach)
throw;
}
}

View File

@ -43,10 +43,10 @@ public:
String getMetadataPath() const override { return metadata_path; }
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
DatabaseTablesIteratorPtr getTablesIterator(
ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
DatabaseTablesIteratorPtr
getTablesIterator(ContextPtr context, const DatabaseOnDisk::FilterByNameFunction & filter_by_table_name) const override;
StoragePtr tryGetTable(const String & name, ContextPtr context) const override;

View File

@ -280,7 +280,7 @@ void DatabasePostgreSQL::drop(ContextPtr /*context*/)
}
void DatabasePostgreSQL::loadStoredObjects(ContextMutablePtr /* context */, bool, bool /*force_attach*/)
void DatabasePostgreSQL::loadStoredObjects(ContextMutablePtr /* context */, bool, bool /*force_attach*/, bool /* skip_startup_tables */)
{
{
std::lock_guard<std::mutex> lock{mutex};

View File

@ -48,7 +48,7 @@ public:
bool empty() const override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach) override;
void loadStoredObjects(ContextMutablePtr, bool, bool force_attach, bool skip_startup_tables) override;
DatabaseTablesIteratorPtr getTablesIterator(ContextPtr context, const FilterByNameFunction & filter_by_table_name) const override;

View File

@ -157,6 +157,15 @@ void DatabaseCatalog::loadDatabases()
/// Another background thread which drops temporary LiveViews.
/// We should start it after loadMarkedAsDroppedTables() to avoid race condition.
TemporaryLiveViewCleaner::instance().startup();
/// Start up tables after all databases are loaded.
for (const auto & [database_name, database] : databases)
{
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
continue;
database->startupTables();
}
}
void DatabaseCatalog::shutdownImpl()

View File

@ -272,7 +272,8 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
}
/// We use global context here, because storages lifetime is bigger than query context lifetime
database->loadStoredObjects(getContext()->getGlobalContext(), has_force_restore_data_flag, create.attach && force_attach); //-V560
database->loadStoredObjects(
getContext()->getGlobalContext(), has_force_restore_data_flag, create.attach && force_attach, skip_startup_tables); //-V560
}
catch (...)
{

View File

@ -52,6 +52,11 @@ public:
force_attach = force_attach_;
}
void setSkipStartupTables(bool skip_startup_tables_)
{
skip_startup_tables = skip_startup_tables_;
}
/// Obtain information about columns, their types, default values and column comments,
/// for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool attach);
@ -94,6 +99,7 @@ private:
/// Is this an internal query - not from the user.
bool internal = false;
bool force_attach = false;
bool skip_startup_tables = false;
mutable String as_database_saved;
mutable String as_table_saved;

View File

@ -43,6 +43,7 @@ static void executeCreateQuery(
interpreter.setInternal(true);
interpreter.setForceAttach(true);
interpreter.setForceRestoreData(has_force_restore_data_flag);
interpreter.setSkipStartupTables(true);
interpreter.execute();
}