mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Merge pull request #69670 from ClickHouse/sync-executeToDatabaseImpl-with-private-fork
sync changes to `InterpreterDropQuery::executeToDatabaseImpl` from the private fork
This commit is contained in:
commit
9eba103c5e
@ -380,100 +380,99 @@ BlockIO InterpreterDropQuery::executeToDatabase(const ASTDropQuery & query)
|
|||||||
|
|
||||||
BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector<UUID> & uuids_to_wait)
|
BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query, DatabasePtr & database, std::vector<UUID> & uuids_to_wait)
|
||||||
{
|
{
|
||||||
|
if (query.kind != ASTDropQuery::Kind::Detach && query.kind != ASTDropQuery::Kind::Drop && query.kind != ASTDropQuery::Kind::Truncate)
|
||||||
|
return {};
|
||||||
|
|
||||||
const auto & database_name = query.getDatabase();
|
const auto & database_name = query.getDatabase();
|
||||||
auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, "");
|
auto ddl_guard = DatabaseCatalog::instance().getDDLGuard(database_name, "");
|
||||||
|
|
||||||
database = tryGetDatabase(database_name, query.if_exists);
|
database = tryGetDatabase(database_name, query.if_exists);
|
||||||
if (database)
|
if (!database)
|
||||||
|
return {};
|
||||||
|
|
||||||
|
bool drop = query.kind == ASTDropQuery::Kind::Drop;
|
||||||
|
bool truncate = query.kind == ASTDropQuery::Kind::Truncate;
|
||||||
|
|
||||||
|
getContext()->checkAccess(AccessType::DROP_DATABASE, database_name);
|
||||||
|
|
||||||
|
if (query.kind == ASTDropQuery::Kind::Detach && query.permanently)
|
||||||
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DETACH PERMANENTLY is not implemented for databases");
|
||||||
|
|
||||||
|
if (query.if_empty)
|
||||||
|
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DROP IF EMPTY is not implemented for databases");
|
||||||
|
|
||||||
|
if (!truncate && database->hasReplicationThread())
|
||||||
|
database->stopReplication();
|
||||||
|
|
||||||
|
if (database->shouldBeEmptyOnDetach())
|
||||||
{
|
{
|
||||||
if (query.kind == ASTDropQuery::Kind::Detach || query.kind == ASTDropQuery::Kind::Drop
|
/// Cancel restarting replicas in that database, wait for remaining RESTART queries to finish.
|
||||||
|| query.kind == ASTDropQuery::Kind::Truncate)
|
/// So it will not startup tables concurrently with the flushAndPrepareForShutdown call below.
|
||||||
|
auto restart_replica_lock = DatabaseCatalog::instance().getLockForDropDatabase(database_name);
|
||||||
|
|
||||||
|
ASTDropQuery query_for_table;
|
||||||
|
query_for_table.kind = query.kind;
|
||||||
|
// For truncate operation on database, drop the tables
|
||||||
|
if (truncate)
|
||||||
|
query_for_table.kind = query.has_all_tables ? ASTDropQuery::Kind::Truncate : ASTDropQuery::Kind::Drop;
|
||||||
|
query_for_table.if_exists = true;
|
||||||
|
query_for_table.if_empty = false;
|
||||||
|
query_for_table.setDatabase(database_name);
|
||||||
|
query_for_table.sync = query.sync;
|
||||||
|
|
||||||
|
/// Flush should not be done if shouldBeEmptyOnDetach() == false,
|
||||||
|
/// since in this case getTablesIterator() may do some additional work,
|
||||||
|
/// see DatabaseMaterializedMySQL::getTablesIterator()
|
||||||
|
auto table_context = Context::createCopy(getContext());
|
||||||
|
table_context->setInternalQuery(true);
|
||||||
|
/// Do not hold extra shared pointers to tables
|
||||||
|
std::vector<std::pair<StorageID, bool>> tables_to_drop;
|
||||||
|
// NOTE: This means we wait for all tables to be loaded inside getTablesIterator() call in case of `async_load_databases = true`.
|
||||||
|
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
|
||||||
{
|
{
|
||||||
bool drop = query.kind == ASTDropQuery::Kind::Drop;
|
auto table_ptr = iterator->table();
|
||||||
bool truncate = query.kind == ASTDropQuery::Kind::Truncate;
|
tables_to_drop.push_back({table_ptr->getStorageID(), table_ptr->isDictionary()});
|
||||||
|
}
|
||||||
|
|
||||||
getContext()->checkAccess(AccessType::DROP_DATABASE, database_name);
|
/// Prepare tables for shutdown in parallel.
|
||||||
|
ThreadPoolCallbackRunnerLocal<void> runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables");
|
||||||
if (query.kind == ASTDropQuery::Kind::Detach && query.permanently)
|
for (const auto & [name, _] : tables_to_drop)
|
||||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DETACH PERMANENTLY is not implemented for databases");
|
{
|
||||||
|
auto table_ptr = DatabaseCatalog::instance().getTable(name, table_context);
|
||||||
if (query.if_empty)
|
runner([my_table_ptr = std::move(table_ptr)]()
|
||||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "DROP IF EMPTY is not implemented for databases");
|
|
||||||
|
|
||||||
if (!truncate && database->hasReplicationThread())
|
|
||||||
database->stopReplication();
|
|
||||||
|
|
||||||
if (database->shouldBeEmptyOnDetach())
|
|
||||||
{
|
{
|
||||||
/// Cancel restarting replicas in that database, wait for remaining RESTART queries to finish.
|
my_table_ptr->flushAndPrepareForShutdown();
|
||||||
/// So it will not startup tables concurrently with the flushAndPrepareForShutdown call below.
|
});
|
||||||
auto restart_replica_lock = DatabaseCatalog::instance().getLockForDropDatabase(database_name);
|
}
|
||||||
|
runner.waitForAllToFinishAndRethrowFirstError();
|
||||||
|
|
||||||
ASTDropQuery query_for_table;
|
for (const auto & table : tables_to_drop)
|
||||||
query_for_table.kind = query.kind;
|
{
|
||||||
// For truncate operation on database, drop the tables
|
query_for_table.setTable(table.first.getTableName());
|
||||||
if (truncate)
|
query_for_table.is_dictionary = table.second;
|
||||||
query_for_table.kind = query.has_all_tables ? ASTDropQuery::Kind::Truncate : ASTDropQuery::Kind::Drop;
|
DatabasePtr db;
|
||||||
query_for_table.if_exists = true;
|
UUID table_to_wait = UUIDHelpers::Nil;
|
||||||
query_for_table.if_empty = false;
|
executeToTableImpl(table_context, query_for_table, db, table_to_wait);
|
||||||
query_for_table.setDatabase(database_name);
|
uuids_to_wait.push_back(table_to_wait);
|
||||||
query_for_table.sync = query.sync;
|
|
||||||
|
|
||||||
/// Flush should not be done if shouldBeEmptyOnDetach() == false,
|
|
||||||
/// since in this case getTablesIterator() may do some additional work,
|
|
||||||
/// see DatabaseMaterializedMySQL::getTablesIterator()
|
|
||||||
auto table_context = Context::createCopy(getContext());
|
|
||||||
table_context->setInternalQuery(true);
|
|
||||||
/// Do not hold extra shared pointers to tables
|
|
||||||
std::vector<std::pair<StorageID, bool>> tables_to_drop;
|
|
||||||
// NOTE: This means we wait for all tables to be loaded inside getTablesIterator() call in case of `async_load_databases = true`.
|
|
||||||
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
|
|
||||||
{
|
|
||||||
auto table_ptr = iterator->table();
|
|
||||||
tables_to_drop.push_back({table_ptr->getStorageID(), table_ptr->isDictionary()});
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Prepare tables for shutdown in parallel.
|
|
||||||
ThreadPoolCallbackRunnerLocal<void> runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables");
|
|
||||||
for (const auto & [name, _] : tables_to_drop)
|
|
||||||
{
|
|
||||||
auto table_ptr = DatabaseCatalog::instance().getTable(name, table_context);
|
|
||||||
runner([my_table_ptr = std::move(table_ptr)]()
|
|
||||||
{
|
|
||||||
my_table_ptr->flushAndPrepareForShutdown();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
runner.waitForAllToFinishAndRethrowFirstError();
|
|
||||||
|
|
||||||
for (const auto & table : tables_to_drop)
|
|
||||||
{
|
|
||||||
query_for_table.setTable(table.first.getTableName());
|
|
||||||
query_for_table.is_dictionary = table.second;
|
|
||||||
DatabasePtr db;
|
|
||||||
UUID table_to_wait = UUIDHelpers::Nil;
|
|
||||||
executeToTableImpl(table_context, query_for_table, db, table_to_wait);
|
|
||||||
uuids_to_wait.push_back(table_to_wait);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// only if operation is DETACH
|
|
||||||
if ((!drop || !truncate) && query.sync)
|
|
||||||
{
|
|
||||||
/// Avoid "some tables are still in use" when sync mode is enabled
|
|
||||||
for (const auto & table_uuid : uuids_to_wait)
|
|
||||||
database->waitDetachedTableNotInUse(table_uuid);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Protects from concurrent CREATE TABLE queries
|
|
||||||
auto db_guard = DatabaseCatalog::instance().getExclusiveDDLGuardForDatabase(database_name);
|
|
||||||
// only if operation is DETACH
|
|
||||||
if (!drop || !truncate)
|
|
||||||
database->assertCanBeDetached(true);
|
|
||||||
|
|
||||||
/// DETACH or DROP database itself. If TRUNCATE skip dropping/erasing the database.
|
|
||||||
if (!truncate)
|
|
||||||
DatabaseCatalog::instance().detachDatabase(getContext(), database_name, drop, database->shouldBeEmptyOnDetach());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// only if operation is DETACH
|
||||||
|
if ((!drop || !truncate) && query.sync)
|
||||||
|
{
|
||||||
|
/// Avoid "some tables are still in use" when sync mode is enabled
|
||||||
|
for (const auto & table_uuid : uuids_to_wait)
|
||||||
|
database->waitDetachedTableNotInUse(table_uuid);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Protects from concurrent CREATE TABLE queries
|
||||||
|
auto db_guard = DatabaseCatalog::instance().getExclusiveDDLGuardForDatabase(database_name);
|
||||||
|
// only if operation is DETACH
|
||||||
|
if (!drop || !truncate)
|
||||||
|
database->assertCanBeDetached(true);
|
||||||
|
|
||||||
|
/// DETACH or DROP database itself. If TRUNCATE skip dropping/erasing the database.
|
||||||
|
if (!truncate)
|
||||||
|
DatabaseCatalog::instance().detachDatabase(getContext(), database_name, drop, database->shouldBeEmptyOnDetach());
|
||||||
|
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user