diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 88d5a0253d1..250c5e3b6c8 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -184,6 +184,11 @@ void LocalServer::initialize(Poco::Util::Application & self) cleanup_threads, 0, // We don't need any threads one all the parts will be deleted cleanup_threads); + + getDatabaseCatalogDropTablesThreadPool().initialize( + server_settings.database_catalog_drop_table_concurrency, + 0, // We don't need any threads if there are no DROP queries. + server_settings.database_catalog_drop_table_concurrency); } diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 039a7e2cbcd..20db4c2773c 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1043,6 +1043,11 @@ try 0, // We don't need any threads once all the tables will be created max_database_replicated_create_table_thread_pool_size); + getDatabaseCatalogDropTablesThreadPool().initialize( + server_settings.database_catalog_drop_table_concurrency, + 0, // We don't need any threads if there are no DROP queries. + server_settings.database_catalog_drop_table_concurrency); + /// Initialize global local cache for remote filesystem. if (config().has("local_cache_for_remote_fs")) { diff --git a/src/Core/ServerSettings.cpp b/src/Core/ServerSettings.cpp index fbf86d3e9ad..6c498014996 100644 --- a/src/Core/ServerSettings.cpp +++ b/src/Core/ServerSettings.cpp @@ -1,4 +1,4 @@ -#include "ServerSettings.h" +#include #include namespace DB diff --git a/src/Core/ServerSettings.h b/src/Core/ServerSettings.h index 28b32a6e6a5..f2f78f70e91 100644 --- a/src/Core/ServerSettings.h +++ b/src/Core/ServerSettings.h @@ -66,6 +66,15 @@ namespace DB M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \ M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \ \ + /* Database Catalog */ \ + M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \ + M(UInt64, database_catalog_unused_dir_hide_timeout_sec, 60 * 60, "Parameter of a task that cleans up garbage from store/ directory. If some subdirectory is not used by clickhouse-server and this directory was not modified for last database_catalog_unused_dir_hide_timeout_sec seconds, the task will 'hide' this directory by removing all access rights. It also works for directories that clickhouse-server does not expect to see inside store/. Zero means 'immediately'.", 0) \ + M(UInt64, database_catalog_unused_dir_rm_timeout_sec, 30 * 24 * 60 * 60, "Parameter of a task that cleans up garbage from store/ directory. If some subdirectory is not used by clickhouse-server and it was previously 'hidden' (see database_catalog_unused_dir_hide_timeout_sec) and this directory was not modified for last database_catalog_unused_dir_rm_timeout_sec seconds, the task will remove this directory. It also works for directories that clickhouse-server does not expect to see inside store/. Zero means 'never'.", 0) \ + M(UInt64, database_catalog_unused_dir_cleanup_period_sec, 24 * 60 * 60, "Parameter of a task that cleans up garbage from store/ directory. Sets scheduling period of the task. Zero means 'never'.", 0) \ + M(UInt64, database_catalog_drop_error_cooldown_sec, 5, "In case if drop table failed, ClickHouse will wait for this timeout before retrying the operation.", 0) \ + M(UInt64, database_catalog_drop_table_concurrency, 16, "The size of the threadpool used for dropping tables.", 0) \ + \ + \ M(UInt64, max_concurrent_queries, 0, "Maximum number of concurrently executed queries. Zero means unlimited.", 0) \ M(UInt64, max_concurrent_insert_queries, 0, "Maximum number of concurrently INSERT queries. Zero means unlimited.", 0) \ M(UInt64, max_concurrent_select_queries, 0, "Maximum number of concurrently SELECT queries. Zero means unlimited.", 0) \ diff --git a/src/IO/SharedThreadPools.cpp b/src/IO/SharedThreadPools.cpp index 3606ddd984c..cda7bc01bbf 100644 --- a/src/IO/SharedThreadPools.cpp +++ b/src/IO/SharedThreadPools.cpp @@ -23,6 +23,9 @@ namespace CurrentMetrics extern const Metric MergeTreeUnexpectedPartsLoaderThreads; extern const Metric MergeTreeUnexpectedPartsLoaderThreadsActive; extern const Metric MergeTreeUnexpectedPartsLoaderThreadsScheduled; + extern const Metric DatabaseCatalogThreads; + extern const Metric DatabaseCatalogThreadsActive; + extern const Metric DatabaseCatalogThreadsScheduled; extern const Metric DatabaseReplicatedCreateTablesThreads; extern const Metric DatabaseReplicatedCreateTablesThreadsActive; extern const Metric DatabaseReplicatedCreateTablesThreadsScheduled; @@ -166,4 +169,11 @@ StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool() return instance; } +/// ThreadPool used for dropping tables. +StaticThreadPool & getDatabaseCatalogDropTablesThreadPool() +{ + static StaticThreadPool instance("DropTablesThreadPool", CurrentMetrics::DatabaseCatalogThreads, CurrentMetrics::DatabaseCatalogThreadsActive, CurrentMetrics::DatabaseCatalogThreadsScheduled); + return instance; +} + } diff --git a/src/IO/SharedThreadPools.h b/src/IO/SharedThreadPools.h index 50adc70c9a0..06ccebd20b2 100644 --- a/src/IO/SharedThreadPools.h +++ b/src/IO/SharedThreadPools.h @@ -69,4 +69,7 @@ StaticThreadPool & getUnexpectedPartsLoadingThreadPool(); /// ThreadPool used for creating tables in DatabaseReplicated. StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool(); +/// ThreadPool used for dropping tables. +StaticThreadPool & getDatabaseCatalogDropTablesThreadPool(); + } diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index bb2dd158710..a8e5fd7e6aa 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include #include #include @@ -48,9 +50,6 @@ namespace CurrentMetrics { extern const Metric TablesToDropQueueSize; - extern const Metric DatabaseCatalogThreads; - extern const Metric DatabaseCatalogThreadsActive; - extern const Metric DatabaseCatalogThreadsScheduled; } namespace DB @@ -189,13 +188,6 @@ StoragePtr TemporaryTableHolder::getTable() const void DatabaseCatalog::initializeAndLoadTemporaryDatabase() { - drop_delay_sec = getContext()->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec); - unused_dir_hide_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_hide_timeout_sec", unused_dir_hide_timeout_sec); - unused_dir_rm_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_rm_timeout_sec", unused_dir_rm_timeout_sec); - unused_dir_cleanup_period_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_cleanup_period_sec", unused_dir_cleanup_period_sec); - drop_error_cooldown_sec = getContext()->getConfigRef().getInt64("database_catalog_drop_error_cooldown_sec", drop_error_cooldown_sec); - drop_table_concurrency = getContext()->getConfigRef().getInt64("database_catalog_drop_table_concurrency", drop_table_concurrency); - auto db_for_temporary_and_external_tables = std::make_shared(TEMPORARY_DATABASE, getContext()); attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables); } @@ -203,7 +195,7 @@ void DatabaseCatalog::initializeAndLoadTemporaryDatabase() void DatabaseCatalog::createBackgroundTasks() { /// It has to be done before databases are loaded (to avoid a race condition on initialization) - if (Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER && unused_dir_cleanup_period_sec) + if (Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER && getContext()->getServerSettings().database_catalog_unused_dir_cleanup_period_sec) { auto cleanup_task_holder = getContext()->getSchedulePool().createTask("DatabaseCatalogCleanupStoreDirectoryTask", [this]() { this->cleanupStoreDirectoryTask(); }); @@ -224,7 +216,7 @@ void DatabaseCatalog::startupBackgroundTasks() { (*cleanup_task)->activate(); /// Do not start task immediately on server startup, it's not urgent. - (*cleanup_task)->scheduleAfter(unused_dir_hide_timeout_sec * 1000); + (*cleanup_task)->scheduleAfter(static_cast(getContext()->getServerSettings().database_catalog_unused_dir_hide_timeout_sec) * 1000); } (*drop_task)->activate(); @@ -1038,15 +1030,12 @@ void DatabaseCatalog::loadMarkedAsDroppedTables() LOG_INFO(log, "Found {} partially dropped tables. Will load them and retry removal.", dropped_metadata.size()); - ThreadPool pool(CurrentMetrics::DatabaseCatalogThreads, CurrentMetrics::DatabaseCatalogThreadsActive, CurrentMetrics::DatabaseCatalogThreadsScheduled); + ThreadPoolCallbackRunnerLocal runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables"); for (const auto & elem : dropped_metadata) { - pool.scheduleOrThrowOnError([&]() - { - this->enqueueDroppedTableCleanup(elem.second, nullptr, elem.first); - }); + runner([this, &elem](){ this->enqueueDroppedTableCleanup(elem.second, nullptr, elem.first); }); } - pool.wait(); + runner.waitForAllToFinishAndRethrowFirstError(); } String DatabaseCatalog::getPathForDroppedMetadata(const StorageID & table_id) const @@ -1135,7 +1124,13 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr } else { - tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time + drop_delay_sec}); + tables_marked_dropped.push_back + ({ + table_id, + table, + dropped_metadata_path, + drop_time + static_cast(getContext()->getServerSettings().database_atomic_delay_before_drop_table_sec) + }); if (first_async_drop_in_queue == tables_marked_dropped.end()) --first_async_drop_in_queue; } @@ -1289,13 +1284,7 @@ void DatabaseCatalog::dropTablesParallel(std::vector runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables"); for (const auto & item : tables_to_drop) { @@ -1332,7 +1321,7 @@ void DatabaseCatalog::dropTablesParallel(std::vectordrop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + drop_error_cooldown_sec; + table_iterator->drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + getContext()->getServerSettings().database_catalog_drop_error_cooldown_sec; if (first_async_drop_in_queue == tables_marked_dropped.end()) --first_async_drop_in_queue; @@ -1340,25 +1329,10 @@ void DatabaseCatalog::dropTablesParallel(std::vectorscheduleAfter(unused_dir_cleanup_period_sec * 1000); + (*cleanup_task)->scheduleAfter(static_cast(getContext()->getServerSettings().database_catalog_unused_dir_cleanup_period_sec) * 1000); } bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskPtr & disk, const String & unused_dir) @@ -1742,7 +1724,7 @@ bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskP time_t current_time = time(nullptr); if (st.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO)) { - if (current_time <= max_modification_time + unused_dir_hide_timeout_sec) + if (current_time <= max_modification_time + static_cast(getContext()->getServerSettings().database_catalog_unused_dir_hide_timeout_sec)) return false; LOG_INFO(log, "Removing access rights for unused directory {} from disk {} (will remove it when timeout exceed)", unused_dir, disk_name); @@ -1758,6 +1740,8 @@ bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskP } else { + auto unused_dir_rm_timeout_sec = static_cast(getContext()->getServerSettings().database_catalog_unused_dir_rm_timeout_sec); + if (!unused_dir_rm_timeout_sec) return false; diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index 23e38a6445e..83a302f117d 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -354,23 +354,8 @@ private: mutable std::mutex tables_marked_dropped_mutex; std::unique_ptr drop_task; - static constexpr time_t default_drop_delay_sec = 8 * 60; - time_t drop_delay_sec = default_drop_delay_sec; std::condition_variable wait_table_finally_dropped; - std::unique_ptr cleanup_task; - static constexpr time_t default_unused_dir_hide_timeout_sec = 60 * 60; /// 1 hour - time_t unused_dir_hide_timeout_sec = default_unused_dir_hide_timeout_sec; - static constexpr time_t default_unused_dir_rm_timeout_sec = 30 * 24 * 60 * 60; /// 30 days - time_t unused_dir_rm_timeout_sec = default_unused_dir_rm_timeout_sec; - static constexpr time_t default_unused_dir_cleanup_period_sec = 24 * 60 * 60; /// 1 day - time_t unused_dir_cleanup_period_sec = default_unused_dir_cleanup_period_sec; - - static constexpr time_t default_drop_error_cooldown_sec = 5; - time_t drop_error_cooldown_sec = default_drop_error_cooldown_sec; - - static constexpr size_t default_drop_table_concurrency = 10; - size_t drop_table_concurrency = default_drop_table_concurrency; std::unique_ptr reload_disks_task; std::mutex reload_disks_mutex; diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index bad3e5277db..ef560ec3405 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -424,18 +425,29 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query, auto table_context = Context::createCopy(getContext()); table_context->setInternalQuery(true); /// Do not hold extra shared pointers to tables - std::vector> tables_to_drop; + std::vector> tables_to_drop; // NOTE: This means we wait for all tables to be loaded inside getTablesIterator() call in case of `async_load_databases = true`. for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next()) { auto table_ptr = iterator->table(); - table_ptr->flushAndPrepareForShutdown(); - tables_to_drop.push_back({iterator->name(), table_ptr->isDictionary()}); + tables_to_drop.push_back({table_ptr->getStorageID(), table_ptr->isDictionary()}); } + /// Prepare tables for shutdown in parallel. + ThreadPoolCallbackRunnerLocal runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables"); + for (const auto & [name, _] : tables_to_drop) + { + auto table_ptr = DatabaseCatalog::instance().getTable(name, table_context); + runner([my_table_ptr = std::move(table_ptr)]() + { + my_table_ptr->flushAndPrepareForShutdown(); + }); + } + runner.waitForAllToFinishAndRethrowFirstError(); + for (const auto & table : tables_to_drop) { - query_for_table.setTable(table.first); + query_for_table.setTable(table.first.getTableName()); query_for_table.is_dictionary = table.second; DatabasePtr db; UUID table_to_wait = UUIDHelpers::Nil;