mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Merge pull request #67228 from ClickHouse/drop-tables-even-faster
Cleanup code and speedup table removal
This commit is contained in:
commit
14d0cc2bf3
@ -184,6 +184,11 @@ void LocalServer::initialize(Poco::Util::Application & self)
|
||||
cleanup_threads,
|
||||
0, // We don't need any threads one all the parts will be deleted
|
||||
cleanup_threads);
|
||||
|
||||
getDatabaseCatalogDropTablesThreadPool().initialize(
|
||||
server_settings.database_catalog_drop_table_concurrency,
|
||||
0, // We don't need any threads if there are no DROP queries.
|
||||
server_settings.database_catalog_drop_table_concurrency);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1043,6 +1043,11 @@ try
|
||||
0, // We don't need any threads once all the tables will be created
|
||||
max_database_replicated_create_table_thread_pool_size);
|
||||
|
||||
getDatabaseCatalogDropTablesThreadPool().initialize(
|
||||
server_settings.database_catalog_drop_table_concurrency,
|
||||
0, // We don't need any threads if there are no DROP queries.
|
||||
server_settings.database_catalog_drop_table_concurrency);
|
||||
|
||||
/// Initialize global local cache for remote filesystem.
|
||||
if (config().has("local_cache_for_remote_fs"))
|
||||
{
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include "ServerSettings.h"
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
namespace DB
|
||||
|
@ -66,6 +66,15 @@ namespace DB
|
||||
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
|
||||
M(Bool, ignore_empty_sql_security_in_create_view_query, true, "If true, ClickHouse doesn't write defaults for empty SQL security statement in CREATE VIEW queries. This setting is only necessary for the migration period and will become obsolete in 24.4", 0) \
|
||||
\
|
||||
/* Database Catalog */ \
|
||||
M(UInt64, database_atomic_delay_before_drop_table_sec, 8 * 60, "The delay during which a dropped table can be restored using the UNDROP statement. If DROP TABLE ran with a SYNC modifier, the setting is ignored.", 0) \
|
||||
M(UInt64, database_catalog_unused_dir_hide_timeout_sec, 60 * 60, "Parameter of a task that cleans up garbage from store/ directory. If some subdirectory is not used by clickhouse-server and this directory was not modified for last database_catalog_unused_dir_hide_timeout_sec seconds, the task will 'hide' this directory by removing all access rights. It also works for directories that clickhouse-server does not expect to see inside store/. Zero means 'immediately'.", 0) \
|
||||
M(UInt64, database_catalog_unused_dir_rm_timeout_sec, 30 * 24 * 60 * 60, "Parameter of a task that cleans up garbage from store/ directory. If some subdirectory is not used by clickhouse-server and it was previously 'hidden' (see database_catalog_unused_dir_hide_timeout_sec) and this directory was not modified for last database_catalog_unused_dir_rm_timeout_sec seconds, the task will remove this directory. It also works for directories that clickhouse-server does not expect to see inside store/. Zero means 'never'.", 0) \
|
||||
M(UInt64, database_catalog_unused_dir_cleanup_period_sec, 24 * 60 * 60, "Parameter of a task that cleans up garbage from store/ directory. Sets scheduling period of the task. Zero means 'never'.", 0) \
|
||||
M(UInt64, database_catalog_drop_error_cooldown_sec, 5, "In case if drop table failed, ClickHouse will wait for this timeout before retrying the operation.", 0) \
|
||||
M(UInt64, database_catalog_drop_table_concurrency, 16, "The size of the threadpool used for dropping tables.", 0) \
|
||||
\
|
||||
\
|
||||
M(UInt64, max_concurrent_queries, 0, "Maximum number of concurrently executed queries. Zero means unlimited.", 0) \
|
||||
M(UInt64, max_concurrent_insert_queries, 0, "Maximum number of concurrently INSERT queries. Zero means unlimited.", 0) \
|
||||
M(UInt64, max_concurrent_select_queries, 0, "Maximum number of concurrently SELECT queries. Zero means unlimited.", 0) \
|
||||
|
@ -23,6 +23,9 @@ namespace CurrentMetrics
|
||||
extern const Metric MergeTreeUnexpectedPartsLoaderThreads;
|
||||
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsActive;
|
||||
extern const Metric MergeTreeUnexpectedPartsLoaderThreadsScheduled;
|
||||
extern const Metric DatabaseCatalogThreads;
|
||||
extern const Metric DatabaseCatalogThreadsActive;
|
||||
extern const Metric DatabaseCatalogThreadsScheduled;
|
||||
extern const Metric DatabaseReplicatedCreateTablesThreads;
|
||||
extern const Metric DatabaseReplicatedCreateTablesThreadsActive;
|
||||
extern const Metric DatabaseReplicatedCreateTablesThreadsScheduled;
|
||||
@ -166,4 +169,11 @@ StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool()
|
||||
return instance;
|
||||
}
|
||||
|
||||
/// ThreadPool used for dropping tables.
|
||||
StaticThreadPool & getDatabaseCatalogDropTablesThreadPool()
|
||||
{
|
||||
static StaticThreadPool instance("DropTablesThreadPool", CurrentMetrics::DatabaseCatalogThreads, CurrentMetrics::DatabaseCatalogThreadsActive, CurrentMetrics::DatabaseCatalogThreadsScheduled);
|
||||
return instance;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -69,4 +69,7 @@ StaticThreadPool & getUnexpectedPartsLoadingThreadPool();
|
||||
/// ThreadPool used for creating tables in DatabaseReplicated.
|
||||
StaticThreadPool & getDatabaseReplicatedCreateTablesThreadPool();
|
||||
|
||||
/// ThreadPool used for dropping tables.
|
||||
StaticThreadPool & getDatabaseCatalogDropTablesThreadPool();
|
||||
|
||||
}
|
||||
|
@ -19,6 +19,8 @@
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Core/ServerSettings.h>
|
||||
#include <IO/SharedThreadPools.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/atomicRename.h>
|
||||
@ -48,9 +50,6 @@
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric TablesToDropQueueSize;
|
||||
extern const Metric DatabaseCatalogThreads;
|
||||
extern const Metric DatabaseCatalogThreadsActive;
|
||||
extern const Metric DatabaseCatalogThreadsScheduled;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -189,13 +188,6 @@ StoragePtr TemporaryTableHolder::getTable() const
|
||||
|
||||
void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
|
||||
{
|
||||
drop_delay_sec = getContext()->getConfigRef().getInt("database_atomic_delay_before_drop_table_sec", default_drop_delay_sec);
|
||||
unused_dir_hide_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_hide_timeout_sec", unused_dir_hide_timeout_sec);
|
||||
unused_dir_rm_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_rm_timeout_sec", unused_dir_rm_timeout_sec);
|
||||
unused_dir_cleanup_period_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_cleanup_period_sec", unused_dir_cleanup_period_sec);
|
||||
drop_error_cooldown_sec = getContext()->getConfigRef().getInt64("database_catalog_drop_error_cooldown_sec", drop_error_cooldown_sec);
|
||||
drop_table_concurrency = getContext()->getConfigRef().getInt64("database_catalog_drop_table_concurrency", drop_table_concurrency);
|
||||
|
||||
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, getContext());
|
||||
attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables);
|
||||
}
|
||||
@ -203,7 +195,7 @@ void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
|
||||
void DatabaseCatalog::createBackgroundTasks()
|
||||
{
|
||||
/// It has to be done before databases are loaded (to avoid a race condition on initialization)
|
||||
if (Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER && unused_dir_cleanup_period_sec)
|
||||
if (Context::getGlobalContextInstance()->getApplicationType() == Context::ApplicationType::SERVER && getContext()->getServerSettings().database_catalog_unused_dir_cleanup_period_sec)
|
||||
{
|
||||
auto cleanup_task_holder
|
||||
= getContext()->getSchedulePool().createTask("DatabaseCatalogCleanupStoreDirectoryTask", [this]() { this->cleanupStoreDirectoryTask(); });
|
||||
@ -224,7 +216,7 @@ void DatabaseCatalog::startupBackgroundTasks()
|
||||
{
|
||||
(*cleanup_task)->activate();
|
||||
/// Do not start task immediately on server startup, it's not urgent.
|
||||
(*cleanup_task)->scheduleAfter(unused_dir_hide_timeout_sec * 1000);
|
||||
(*cleanup_task)->scheduleAfter(static_cast<time_t>(getContext()->getServerSettings().database_catalog_unused_dir_hide_timeout_sec) * 1000);
|
||||
}
|
||||
|
||||
(*drop_task)->activate();
|
||||
@ -1038,15 +1030,12 @@ void DatabaseCatalog::loadMarkedAsDroppedTables()
|
||||
|
||||
LOG_INFO(log, "Found {} partially dropped tables. Will load them and retry removal.", dropped_metadata.size());
|
||||
|
||||
ThreadPool pool(CurrentMetrics::DatabaseCatalogThreads, CurrentMetrics::DatabaseCatalogThreadsActive, CurrentMetrics::DatabaseCatalogThreadsScheduled);
|
||||
ThreadPoolCallbackRunnerLocal<void> runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables");
|
||||
for (const auto & elem : dropped_metadata)
|
||||
{
|
||||
pool.scheduleOrThrowOnError([&]()
|
||||
{
|
||||
this->enqueueDroppedTableCleanup(elem.second, nullptr, elem.first);
|
||||
});
|
||||
runner([this, &elem](){ this->enqueueDroppedTableCleanup(elem.second, nullptr, elem.first); });
|
||||
}
|
||||
pool.wait();
|
||||
runner.waitForAllToFinishAndRethrowFirstError();
|
||||
}
|
||||
|
||||
String DatabaseCatalog::getPathForDroppedMetadata(const StorageID & table_id) const
|
||||
@ -1135,7 +1124,13 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
|
||||
}
|
||||
else
|
||||
{
|
||||
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time + drop_delay_sec});
|
||||
tables_marked_dropped.push_back
|
||||
({
|
||||
table_id,
|
||||
table,
|
||||
dropped_metadata_path,
|
||||
drop_time + static_cast<time_t>(getContext()->getServerSettings().database_atomic_delay_before_drop_table_sec)
|
||||
});
|
||||
if (first_async_drop_in_queue == tables_marked_dropped.end())
|
||||
--first_async_drop_in_queue;
|
||||
}
|
||||
@ -1289,13 +1284,7 @@ void DatabaseCatalog::dropTablesParallel(std::vector<DatabaseCatalog::TablesMark
|
||||
if (tables_to_drop.empty())
|
||||
return;
|
||||
|
||||
ThreadPool pool(
|
||||
CurrentMetrics::DatabaseCatalogThreads,
|
||||
CurrentMetrics::DatabaseCatalogThreadsActive,
|
||||
CurrentMetrics::DatabaseCatalogThreadsScheduled,
|
||||
/* max_threads */drop_table_concurrency,
|
||||
/* max_free_threads */0,
|
||||
/* queue_size */tables_to_drop.size());
|
||||
ThreadPoolCallbackRunnerLocal<void> runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables");
|
||||
|
||||
for (const auto & item : tables_to_drop)
|
||||
{
|
||||
@ -1332,7 +1321,7 @@ void DatabaseCatalog::dropTablesParallel(std::vector<DatabaseCatalog::TablesMark
|
||||
++first_async_drop_in_queue;
|
||||
|
||||
tables_marked_dropped.splice(tables_marked_dropped.end(), tables_marked_dropped, table_iterator);
|
||||
table_iterator->drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + drop_error_cooldown_sec;
|
||||
table_iterator->drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + getContext()->getServerSettings().database_catalog_drop_error_cooldown_sec;
|
||||
|
||||
if (first_async_drop_in_queue == tables_marked_dropped.end())
|
||||
--first_async_drop_in_queue;
|
||||
@ -1340,25 +1329,10 @@ void DatabaseCatalog::dropTablesParallel(std::vector<DatabaseCatalog::TablesMark
|
||||
}
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
pool.scheduleOrThrowOnError(std::move(job));
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log, "Cannot drop tables. Will retry later.");
|
||||
break;
|
||||
}
|
||||
runner(std::move(job));
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
pool.wait();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log, "Cannot drop tables. Will retry later.");
|
||||
}
|
||||
runner.waitForAllToFinishAndRethrowFirstError();
|
||||
}
|
||||
|
||||
void DatabaseCatalog::dropTableDataTask()
|
||||
@ -1375,8 +1349,16 @@ void DatabaseCatalog::dropTableDataTask()
|
||||
LOG_INFO(log, "Have {} tables in drop queue ({} of them are in use), will try drop {} tables",
|
||||
drop_tables_count, drop_tables_in_use_count, tables_to_drop.size());
|
||||
|
||||
try
|
||||
{
|
||||
dropTablesParallel(tables_to_drop);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// We don't re-throw exception, because we are in a background pool.
|
||||
tryLogCurrentException(log, "Cannot drop tables. Will retry later.");
|
||||
}
|
||||
}
|
||||
|
||||
rescheduleDropTableTask();
|
||||
}
|
||||
@ -1718,7 +1700,7 @@ void DatabaseCatalog::cleanupStoreDirectoryTask()
|
||||
LOG_TEST(log, "Nothing to clean up from store/ on disk {}", disk_name);
|
||||
}
|
||||
|
||||
(*cleanup_task)->scheduleAfter(unused_dir_cleanup_period_sec * 1000);
|
||||
(*cleanup_task)->scheduleAfter(static_cast<time_t>(getContext()->getServerSettings().database_catalog_unused_dir_cleanup_period_sec) * 1000);
|
||||
}
|
||||
|
||||
bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskPtr & disk, const String & unused_dir)
|
||||
@ -1742,7 +1724,7 @@ bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskP
|
||||
time_t current_time = time(nullptr);
|
||||
if (st.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO))
|
||||
{
|
||||
if (current_time <= max_modification_time + unused_dir_hide_timeout_sec)
|
||||
if (current_time <= max_modification_time + static_cast<time_t>(getContext()->getServerSettings().database_catalog_unused_dir_hide_timeout_sec))
|
||||
return false;
|
||||
|
||||
LOG_INFO(log, "Removing access rights for unused directory {} from disk {} (will remove it when timeout exceed)", unused_dir, disk_name);
|
||||
@ -1758,6 +1740,8 @@ bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskP
|
||||
}
|
||||
else
|
||||
{
|
||||
auto unused_dir_rm_timeout_sec = static_cast<time_t>(getContext()->getServerSettings().database_catalog_unused_dir_rm_timeout_sec);
|
||||
|
||||
if (!unused_dir_rm_timeout_sec)
|
||||
return false;
|
||||
|
||||
|
@ -354,23 +354,8 @@ private:
|
||||
mutable std::mutex tables_marked_dropped_mutex;
|
||||
|
||||
std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task;
|
||||
static constexpr time_t default_drop_delay_sec = 8 * 60;
|
||||
time_t drop_delay_sec = default_drop_delay_sec;
|
||||
std::condition_variable wait_table_finally_dropped;
|
||||
|
||||
std::unique_ptr<BackgroundSchedulePoolTaskHolder> cleanup_task;
|
||||
static constexpr time_t default_unused_dir_hide_timeout_sec = 60 * 60; /// 1 hour
|
||||
time_t unused_dir_hide_timeout_sec = default_unused_dir_hide_timeout_sec;
|
||||
static constexpr time_t default_unused_dir_rm_timeout_sec = 30 * 24 * 60 * 60; /// 30 days
|
||||
time_t unused_dir_rm_timeout_sec = default_unused_dir_rm_timeout_sec;
|
||||
static constexpr time_t default_unused_dir_cleanup_period_sec = 24 * 60 * 60; /// 1 day
|
||||
time_t unused_dir_cleanup_period_sec = default_unused_dir_cleanup_period_sec;
|
||||
|
||||
static constexpr time_t default_drop_error_cooldown_sec = 5;
|
||||
time_t drop_error_cooldown_sec = default_drop_error_cooldown_sec;
|
||||
|
||||
static constexpr size_t default_drop_table_concurrency = 10;
|
||||
size_t drop_table_concurrency = default_drop_table_concurrency;
|
||||
|
||||
std::unique_ptr<BackgroundSchedulePoolTaskHolder> reload_disks_task;
|
||||
std::mutex reload_disks_mutex;
|
||||
|
@ -6,6 +6,7 @@
|
||||
#include <Interpreters/InterpreterDropQuery.h>
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Interpreters/QueryLog.h>
|
||||
#include <IO/SharedThreadPools.h>
|
||||
#include <Access/Common/AccessRightsElement.h>
|
||||
#include <Parsers/ASTDropQuery.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
@ -424,18 +425,29 @@ BlockIO InterpreterDropQuery::executeToDatabaseImpl(const ASTDropQuery & query,
|
||||
auto table_context = Context::createCopy(getContext());
|
||||
table_context->setInternalQuery(true);
|
||||
/// Do not hold extra shared pointers to tables
|
||||
std::vector<std::pair<String, bool>> tables_to_drop;
|
||||
std::vector<std::pair<StorageID, bool>> tables_to_drop;
|
||||
// NOTE: This means we wait for all tables to be loaded inside getTablesIterator() call in case of `async_load_databases = true`.
|
||||
for (auto iterator = database->getTablesIterator(table_context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
auto table_ptr = iterator->table();
|
||||
table_ptr->flushAndPrepareForShutdown();
|
||||
tables_to_drop.push_back({iterator->name(), table_ptr->isDictionary()});
|
||||
tables_to_drop.push_back({table_ptr->getStorageID(), table_ptr->isDictionary()});
|
||||
}
|
||||
|
||||
/// Prepare tables for shutdown in parallel.
|
||||
ThreadPoolCallbackRunnerLocal<void> runner(getDatabaseCatalogDropTablesThreadPool().get(), "DropTables");
|
||||
for (const auto & [name, _] : tables_to_drop)
|
||||
{
|
||||
auto table_ptr = DatabaseCatalog::instance().getTable(name, table_context);
|
||||
runner([my_table_ptr = std::move(table_ptr)]()
|
||||
{
|
||||
my_table_ptr->flushAndPrepareForShutdown();
|
||||
});
|
||||
}
|
||||
runner.waitForAllToFinishAndRethrowFirstError();
|
||||
|
||||
for (const auto & table : tables_to_drop)
|
||||
{
|
||||
query_for_table.setTable(table.first);
|
||||
query_for_table.setTable(table.first.getTableName());
|
||||
query_for_table.is_dictionary = table.second;
|
||||
DatabasePtr db;
|
||||
UUID table_to_wait = UUIDHelpers::Nil;
|
||||
|
Loading…
Reference in New Issue
Block a user