Merge pull request #66065 from ClickHouse/chesema-faster-drop

DatabaseCatalog drops tables faster
This commit is contained in:
Sema Checherinda 2024-07-10 17:04:41 +00:00 committed by GitHub
commit be80f4e5d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 167 additions and 80 deletions

View File

@ -1,6 +1,7 @@
#include <algorithm> #include <algorithm>
#include <string> #include <string>
#include <mutex> #include <mutex>
#include <utility>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/TableNameHints.h> #include <Interpreters/TableNameHints.h>
@ -27,6 +28,7 @@
#include <Common/filesystemHelpers.h> #include <Common/filesystemHelpers.h>
#include <Common/noexcept_scope.h> #include <Common/noexcept_scope.h>
#include <Common/checkStackSize.h> #include <Common/checkStackSize.h>
#include <base/scope_guard.h>
#include <base/isSharedPtrUnique.h> #include <base/isSharedPtrUnique.h>
#include <boost/range/adaptor/map.hpp> #include <boost/range/adaptor/map.hpp>
@ -192,6 +194,7 @@ void DatabaseCatalog::initializeAndLoadTemporaryDatabase()
unused_dir_rm_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_rm_timeout_sec", unused_dir_rm_timeout_sec); unused_dir_rm_timeout_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_rm_timeout_sec", unused_dir_rm_timeout_sec);
unused_dir_cleanup_period_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_cleanup_period_sec", unused_dir_cleanup_period_sec); unused_dir_cleanup_period_sec = getContext()->getConfigRef().getInt64("database_catalog_unused_dir_cleanup_period_sec", unused_dir_cleanup_period_sec);
drop_error_cooldown_sec = getContext()->getConfigRef().getInt64("database_catalog_drop_error_cooldown_sec", drop_error_cooldown_sec); drop_error_cooldown_sec = getContext()->getConfigRef().getInt64("database_catalog_drop_error_cooldown_sec", drop_error_cooldown_sec);
drop_table_concurrency = getContext()->getConfigRef().getInt64("database_catalog_drop_table_concurrency", drop_table_concurrency);
auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, getContext()); auto db_for_temporary_and_external_tables = std::make_shared<DatabaseMemory>(TEMPORARY_DATABASE, getContext());
attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables); attachDatabase(TEMPORARY_DATABASE, db_for_temporary_and_external_tables);
@ -1145,7 +1148,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
(*drop_task)->schedule(); (*drop_task)->schedule();
} }
void DatabaseCatalog::dequeueDroppedTableCleanup(StorageID table_id) void DatabaseCatalog::undropTable(StorageID table_id)
{ {
String latest_metadata_dropped_path; String latest_metadata_dropped_path;
TableMarkedAsDropped dropped_table; TableMarkedAsDropped dropped_table;
@ -1220,91 +1223,166 @@ void DatabaseCatalog::dequeueDroppedTableCleanup(StorageID table_id)
LOG_INFO(log, "Table {} was successfully undropped.", dropped_table.table_id.getNameForLogs()); LOG_INFO(log, "Table {} was successfully undropped.", dropped_table.table_id.getNameForLogs());
} }
std::tuple<size_t, size_t> DatabaseCatalog::getDroppedTablesCountAndInuseCount()
{
std::lock_guard lock(tables_marked_dropped_mutex);
size_t in_use_count = 0;
for (const auto & item : tables_marked_dropped)
{
bool in_use = item.table && !isSharedPtrUnique(item.table);
in_use_count += in_use;
}
return {tables_marked_dropped.size(), in_use_count};
}
time_t DatabaseCatalog::getMinDropTime()
{
time_t min_drop_time = std::numeric_limits<time_t>::max();
for (const auto & item : tables_marked_dropped)
{
min_drop_time = std::min(min_drop_time, item.drop_time);
}
return min_drop_time;
}
std::vector<DatabaseCatalog::TablesMarkedAsDropped::iterator> DatabaseCatalog::getTablesToDrop()
{
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
decltype(getTablesToDrop()) result;
std::lock_guard lock(tables_marked_dropped_mutex);
for (auto it = tables_marked_dropped.begin(); it != tables_marked_dropped.end(); ++it)
{
bool in_use = it->table && !isSharedPtrUnique(it->table);
bool old_enough = it->drop_time <= current_time;
if (!in_use && old_enough)
result.emplace_back(it);
}
return result;
}
void DatabaseCatalog::rescheduleDropTableTask()
{
std::lock_guard lock(tables_marked_dropped_mutex);
if (tables_marked_dropped.empty())
return;
if (first_async_drop_in_queue != tables_marked_dropped.begin())
{
(*drop_task)->scheduleAfter(0);
return;
}
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
auto min_drop_time = getMinDropTime();
time_t schedule_after_ms = min_drop_time > current_time ? (min_drop_time - current_time) * 1000 : 0;
LOG_TRACE(
log,
"Have {} tables in queue to drop. Schedule background task in {} seconds",
tables_marked_dropped.size(), schedule_after_ms / 1000);
(*drop_task)->scheduleAfter(schedule_after_ms);
}
void DatabaseCatalog::dropTablesParallel(std::vector<DatabaseCatalog::TablesMarkedAsDropped::iterator> tables_to_drop)
{
if (tables_to_drop.empty())
return;
ThreadPool pool(
CurrentMetrics::DatabaseCatalogThreads,
CurrentMetrics::DatabaseCatalogThreadsActive,
CurrentMetrics::DatabaseCatalogThreadsScheduled,
/* max_threads */drop_table_concurrency,
/* max_free_threads */0,
/* queue_size */tables_to_drop.size());
for (const auto & item : tables_to_drop)
{
auto job = [&, table_iterator = item] ()
{
try
{
dropTableFinally(*table_iterator);
TableMarkedAsDropped table_to_delete_without_lock;
{
std::lock_guard lock(tables_marked_dropped_mutex);
if (first_async_drop_in_queue == table_iterator)
++first_async_drop_in_queue;
[[maybe_unused]] auto removed = tables_marked_dropped_ids.erase(table_iterator->table_id.uuid);
chassert(removed);
table_to_delete_without_lock = std::move(*table_iterator);
tables_marked_dropped.erase(table_iterator);
wait_table_finally_dropped.notify_all();
}
}
catch (...)
{
tryLogCurrentException(log, "Cannot drop table " + table_iterator->table_id.getNameForLogs() +
". Will retry later.");
{
std::lock_guard lock(tables_marked_dropped_mutex);
if (first_async_drop_in_queue == table_iterator)
++first_async_drop_in_queue;
tables_marked_dropped.splice(tables_marked_dropped.end(), tables_marked_dropped, table_iterator);
table_iterator->drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + drop_error_cooldown_sec;
if (first_async_drop_in_queue == tables_marked_dropped.end())
--first_async_drop_in_queue;
}
}
};
try
{
pool.scheduleOrThrowOnError(std::move(job));
}
catch (...)
{
tryLogCurrentException(log, "Cannot drop tables. Will retry later.");
break;
}
}
try
{
pool.wait();
}
catch (...)
{
tryLogCurrentException(log, "Cannot drop tables. Will retry later.");
}
}
void DatabaseCatalog::dropTableDataTask() void DatabaseCatalog::dropTableDataTask()
{ {
/// Background task that removes data of tables which were marked as dropped by Atomic databases. /// Background task that removes data of tables which were marked as dropped by Atomic databases.
/// Table can be removed when it's not used by queries and drop_delay_sec elapsed since it was marked as dropped. /// Table can be removed when it's not used by queries and drop_delay_sec elapsed since it was marked as dropped.
bool need_reschedule = true; auto [drop_tables_count, drop_tables_in_use_count] = getDroppedTablesCountAndInuseCount();
/// Default reschedule time for the case when we are waiting for reference count to become 1.
size_t schedule_after_ms = reschedule_time_ms; auto tables_to_drop = getTablesToDrop();
TableMarkedAsDropped table;
try if (!tables_to_drop.empty())
{ {
std::lock_guard lock(tables_marked_dropped_mutex); LOG_INFO(log, "Have {} tables in drop queue ({} of them are in use), will try drop {} tables",
if (tables_marked_dropped.empty()) drop_tables_count, drop_tables_in_use_count, tables_to_drop.size());
return;
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()); dropTablesParallel(tables_to_drop);
time_t min_drop_time = std::numeric_limits<time_t>::max();
size_t tables_in_use_count = 0;
auto it = std::find_if(tables_marked_dropped.begin(), tables_marked_dropped.end(), [&](const auto & elem)
{
bool not_in_use = !elem.table || isSharedPtrUnique(elem.table);
bool old_enough = elem.drop_time <= current_time;
min_drop_time = std::min(min_drop_time, elem.drop_time);
tables_in_use_count += !not_in_use;
return not_in_use && old_enough;
});
if (it != tables_marked_dropped.end())
{
table = std::move(*it);
LOG_INFO(log, "Have {} tables in drop queue ({} of them are in use), will try drop {}",
tables_marked_dropped.size(), tables_in_use_count, table.table_id.getNameForLogs());
if (first_async_drop_in_queue == it)
++first_async_drop_in_queue;
tables_marked_dropped.erase(it);
/// Schedule the task as soon as possible, while there are suitable tables to drop.
schedule_after_ms = 0;
}
else if (current_time < min_drop_time)
{
/// We are waiting for drop_delay_sec to exceed, no sense to wakeup until min_drop_time.
/// If new table is added to the queue with ignore_delay flag, schedule() is called to wakeup the task earlier.
schedule_after_ms = (min_drop_time - current_time) * 1000;
LOG_TRACE(log, "Not found any suitable tables to drop, still have {} tables in drop queue ({} of them are in use). "
"Will check again after {} seconds", tables_marked_dropped.size(), tables_in_use_count, min_drop_time - current_time);
}
need_reschedule = !tables_marked_dropped.empty();
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
} }
if (table.table_id) rescheduleDropTableTask();
{
try
{
dropTableFinally(table);
std::lock_guard lock(tables_marked_dropped_mutex);
[[maybe_unused]] auto removed = tables_marked_dropped_ids.erase(table.table_id.uuid);
assert(removed);
}
catch (...)
{
tryLogCurrentException(log, "Cannot drop table " + table.table_id.getNameForLogs() +
". Will retry later.");
{
table.drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + drop_error_cooldown_sec;
std::lock_guard lock(tables_marked_dropped_mutex);
tables_marked_dropped.emplace_back(std::move(table));
if (first_async_drop_in_queue == tables_marked_dropped.end())
--first_async_drop_in_queue;
/// If list of dropped tables was empty, schedule a task to retry deletion.
if (tables_marked_dropped.size() == 1)
{
need_reschedule = true;
schedule_after_ms = drop_error_cooldown_sec * 1000;
}
}
}
wait_table_finally_dropped.notify_all();
}
/// Do not schedule a task if there is no tables to drop
if (need_reschedule)
(*drop_task)->scheduleAfter(schedule_after_ms);
} }
void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table) void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)

View File

@ -225,7 +225,7 @@ public:
String getPathForDroppedMetadata(const StorageID & table_id) const; String getPathForDroppedMetadata(const StorageID & table_id) const;
String getPathForMetadata(const StorageID & table_id) const; String getPathForMetadata(const StorageID & table_id) const;
void enqueueDroppedTableCleanup(StorageID table_id, StoragePtr table, String dropped_metadata_path, bool ignore_delay = false); void enqueueDroppedTableCleanup(StorageID table_id, StoragePtr table, String dropped_metadata_path, bool ignore_delay = false);
void dequeueDroppedTableCleanup(StorageID table_id); void undropTable(StorageID table_id);
void waitTableFinallyDropped(const UUID & uuid); void waitTableFinallyDropped(const UUID & uuid);
@ -296,6 +296,12 @@ private:
void dropTableDataTask(); void dropTableDataTask();
void dropTableFinally(const TableMarkedAsDropped & table); void dropTableFinally(const TableMarkedAsDropped & table);
time_t getMinDropTime() TSA_REQUIRES(tables_marked_dropped_mutex);
std::tuple<size_t, size_t> getDroppedTablesCountAndInuseCount();
std::vector<TablesMarkedAsDropped::iterator> getTablesToDrop();
void dropTablesParallel(std::vector<TablesMarkedAsDropped::iterator> tables);
void rescheduleDropTableTask();
void cleanupStoreDirectoryTask(); void cleanupStoreDirectoryTask();
bool maybeRemoveDirectory(const String & disk_name, const DiskPtr & disk, const String & unused_dir); bool maybeRemoveDirectory(const String & disk_name, const DiskPtr & disk, const String & unused_dir);
@ -363,6 +369,9 @@ private:
static constexpr time_t default_drop_error_cooldown_sec = 5; static constexpr time_t default_drop_error_cooldown_sec = 5;
time_t drop_error_cooldown_sec = default_drop_error_cooldown_sec; time_t drop_error_cooldown_sec = default_drop_error_cooldown_sec;
static constexpr size_t default_drop_table_concurrency = 10;
size_t drop_table_concurrency = default_drop_table_concurrency;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> reload_disks_task; std::unique_ptr<BackgroundSchedulePoolTaskHolder> reload_disks_task;
std::mutex reload_disks_mutex; std::mutex reload_disks_mutex;
std::set<String> disks_to_reload; std::set<String> disks_to_reload;

View File

@ -64,7 +64,7 @@ BlockIO InterpreterUndropQuery::executeToTable(ASTUndropQuery & query)
database->checkMetadataFilenameAvailability(table_id.table_name); database->checkMetadataFilenameAvailability(table_id.table_name);
DatabaseCatalog::instance().dequeueDroppedTableCleanup(table_id); DatabaseCatalog::instance().undropTable(table_id);
return {}; return {};
} }