mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 00:52:02 +00:00
Merge pull request #16584 from ClickHouse/drop_table_task_better_scheduling
Better scheduling of drop table task
This commit is contained in:
commit
11fc6fd8c9
@ -114,7 +114,8 @@ void DatabaseAtomic::dropTable(const Context &, const String & table_name, bool
|
||||
DatabaseWithDictionaries::detachTableUnlocked(table_name, lock); /// Should never throw
|
||||
table_name_to_path.erase(table_name);
|
||||
}
|
||||
tryRemoveSymlink(table_name);
|
||||
if (table->storesDataOnDisk())
|
||||
tryRemoveSymlink(table_name);
|
||||
/// Remove the inner table (if any) to avoid deadlock
|
||||
/// (due to attempt to execute DROP from the worker thread)
|
||||
if (auto * mv = dynamic_cast<StorageMaterializedView *>(table.get()))
|
||||
@ -145,7 +146,7 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
|
||||
String old_metadata_path = getObjectMetadataPath(table_name);
|
||||
String new_metadata_path = to_database.getObjectMetadataPath(to_table_name);
|
||||
|
||||
auto detach = [](DatabaseAtomic & db, const String & table_name_)
|
||||
auto detach = [](DatabaseAtomic & db, const String & table_name_, bool has_symlink)
|
||||
{
|
||||
auto it = db.table_name_to_path.find(table_name_);
|
||||
String table_data_path_saved;
|
||||
@ -155,7 +156,7 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
|
||||
assert(!table_data_path_saved.empty() || db.dictionaries.find(table_name_) != db.dictionaries.end());
|
||||
db.tables.erase(table_name_);
|
||||
db.table_name_to_path.erase(table_name_);
|
||||
if (!table_data_path_saved.empty())
|
||||
if (has_symlink)
|
||||
db.tryRemoveSymlink(table_name_);
|
||||
return table_data_path_saved;
|
||||
};
|
||||
@ -166,7 +167,8 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
|
||||
if (table_data_path_.empty())
|
||||
return;
|
||||
db.table_name_to_path.emplace(table_name_, table_data_path_);
|
||||
db.tryCreateSymlink(table_name_, table_data_path_);
|
||||
if (table_->storesDataOnDisk())
|
||||
db.tryCreateSymlink(table_name_, table_data_path_);
|
||||
};
|
||||
|
||||
auto assert_can_move_mat_view = [inside_database](const StoragePtr & table_)
|
||||
@ -228,9 +230,9 @@ void DatabaseAtomic::renameTable(const Context & context, const String & table_n
|
||||
renameNoReplace(old_metadata_path, new_metadata_path);
|
||||
|
||||
/// After metadata was successfully moved, the following methods should not throw (if them do, it's a logical error)
|
||||
table_data_path = detach(*this, table_name);
|
||||
table_data_path = detach(*this, table_name, table->storesDataOnDisk());
|
||||
if (exchange)
|
||||
other_table_data_path = detach(other_db, to_table_name);
|
||||
other_table_data_path = detach(other_db, to_table_name, other_table->storesDataOnDisk());
|
||||
|
||||
auto old_table_id = table->getStorageID();
|
||||
|
||||
@ -286,7 +288,8 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
|
||||
DatabaseCatalog::instance().removeUUIDMappingFinally(query.uuid);
|
||||
throw;
|
||||
}
|
||||
tryCreateSymlink(query.table, table_data_path);
|
||||
if (table->storesDataOnDisk())
|
||||
tryCreateSymlink(query.table, table_data_path);
|
||||
}
|
||||
|
||||
void DatabaseAtomic::commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path)
|
||||
@ -383,17 +386,18 @@ void DatabaseAtomic::loadStoredObjects(Context & context, bool has_force_restore
|
||||
|
||||
Poco::File(path_to_table_symlinks).createDirectories();
|
||||
for (const auto & table : table_names)
|
||||
tryCreateSymlink(table.first, table.second);
|
||||
tryCreateSymlink(table.first, table.second, true);
|
||||
}
|
||||
}
|
||||
|
||||
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path)
|
||||
void DatabaseAtomic::tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist)
|
||||
{
|
||||
try
|
||||
{
|
||||
String link = path_to_table_symlinks + escapeForFileName(table_name);
|
||||
String data = Poco::Path(global_context.getPath()).makeAbsolute().toString() + actual_data_path;
|
||||
Poco::File{data}.linkTo(link, Poco::File::LINK_SYMBOLIC);
|
||||
Poco::File data = Poco::Path(global_context.getPath()).makeAbsolute().toString() + actual_data_path;
|
||||
if (!if_data_path_exist || data.exists())
|
||||
data.linkTo(link, Poco::File::LINK_SYMBOLIC);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -55,7 +55,7 @@ public:
|
||||
|
||||
UUID tryGetTableUUID(const String & table_name) const override;
|
||||
|
||||
void tryCreateSymlink(const String & table_name, const String & actual_data_path);
|
||||
void tryCreateSymlink(const String & table_name, const String & actual_data_path, bool if_data_path_exist = false);
|
||||
void tryRemoveSymlink(const String & table_name);
|
||||
|
||||
void waitDetachedTableNotInUse(const UUID & uuid);
|
||||
|
@ -321,7 +321,7 @@ void DatabaseOnDisk::renameTable(
|
||||
|
||||
/// Special case: usually no actions with symlinks are required when detaching/attaching table,
|
||||
/// but not when moving from Atomic database to Ordinary
|
||||
if (from_atomic_to_ordinary)
|
||||
if (from_atomic_to_ordinary && table->storesDataOnDisk())
|
||||
{
|
||||
auto & atomic_db = assert_cast<DatabaseAtomic &>(*this);
|
||||
atomic_db.tryRemoveSymlink(table_name);
|
||||
|
@ -134,7 +134,10 @@ void DatabaseCatalog::loadDatabases()
|
||||
loadMarkedAsDroppedTables();
|
||||
auto task_holder = global_context->getSchedulePool().createTask("DatabaseCatalog", [this](){ this->dropTableDataTask(); });
|
||||
drop_task = std::make_unique<BackgroundSchedulePoolTaskHolder>(std::move(task_holder));
|
||||
(*drop_task)->activateAndSchedule();
|
||||
(*drop_task)->activate();
|
||||
std::lock_guard lock{tables_marked_dropped_mutex};
|
||||
if (!tables_marked_dropped.empty())
|
||||
(*drop_task)->schedule();
|
||||
}
|
||||
|
||||
void DatabaseCatalog::shutdownImpl()
|
||||
@ -760,14 +763,15 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
|
||||
|
||||
std::lock_guard lock(tables_marked_dropped_mutex);
|
||||
if (ignore_delay)
|
||||
tables_marked_dropped.push_front({table_id, table, dropped_metadata_path, 0});
|
||||
tables_marked_dropped.push_front({table_id, table, dropped_metadata_path, drop_time});
|
||||
else
|
||||
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time});
|
||||
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time + drop_delay_sec});
|
||||
tables_marked_dropped_ids.insert(table_id.uuid);
|
||||
CurrentMetrics::add(CurrentMetrics::TablesToDropQueueSize, 1);
|
||||
|
||||
/// If list of dropped tables was empty, start a drop task
|
||||
if (drop_task && tables_marked_dropped.size() == 1)
|
||||
/// If list of dropped tables was empty, start a drop task.
|
||||
/// If ignore_delay is set, schedule drop task as soon as possible.
|
||||
if (drop_task && (tables_marked_dropped.size() == 1 || ignore_delay))
|
||||
(*drop_task)->schedule();
|
||||
}
|
||||
|
||||
@ -777,26 +781,40 @@ void DatabaseCatalog::dropTableDataTask()
|
||||
/// Table can be removed when it's not used by queries and drop_delay_sec elapsed since it was marked as dropped.
|
||||
|
||||
bool need_reschedule = true;
|
||||
/// Default reschedule time for the case when we are waiting for reference count to become 1.
|
||||
size_t schedule_after_ms = reschedule_time_ms;
|
||||
TableMarkedAsDropped table;
|
||||
try
|
||||
{
|
||||
std::lock_guard lock(tables_marked_dropped_mutex);
|
||||
assert(!tables_marked_dropped.empty());
|
||||
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
time_t min_drop_time = std::numeric_limits<time_t>::max();
|
||||
size_t tables_in_use_count = 0;
|
||||
auto it = std::find_if(tables_marked_dropped.begin(), tables_marked_dropped.end(), [&](const auto & elem)
|
||||
{
|
||||
bool not_in_use = !elem.table || elem.table.unique();
|
||||
bool old_enough = elem.drop_time + drop_delay_sec < current_time;
|
||||
bool old_enough = elem.drop_time <= current_time;
|
||||
min_drop_time = std::min(min_drop_time, elem.drop_time);
|
||||
tables_in_use_count += !not_in_use;
|
||||
return not_in_use && old_enough;
|
||||
});
|
||||
if (it != tables_marked_dropped.end())
|
||||
{
|
||||
table = std::move(*it);
|
||||
LOG_INFO(log, "Will try drop {}", table.table_id.getNameForLogs());
|
||||
LOG_INFO(log, "Have {} tables in drop queue ({} of them are in use), will try drop {}",
|
||||
tables_marked_dropped.size(), tables_in_use_count, table.table_id.getNameForLogs());
|
||||
tables_marked_dropped.erase(it);
|
||||
/// Schedule the task as soon as possible, while there are suitable tables to drop.
|
||||
schedule_after_ms = 0;
|
||||
}
|
||||
else
|
||||
else if (current_time < min_drop_time)
|
||||
{
|
||||
LOG_TRACE(log, "Not found any suitable tables to drop, still have {} tables in drop queue", tables_marked_dropped.size());
|
||||
/// We are waiting for drop_delay_sec to exceed, no sense to wakeup until min_drop_time.
|
||||
/// If new table is added to the queue with ignore_delay flag, schedule() is called to wakeup the task earlier.
|
||||
schedule_after_ms = (min_drop_time - current_time) * 1000;
|
||||
LOG_TRACE(log, "Not found any suitable tables to drop, still have {} tables in drop queue ({} of them are in use). "
|
||||
"Will check again after {} seconds", tables_marked_dropped.size(), tables_in_use_count, min_drop_time - current_time);
|
||||
}
|
||||
need_reschedule = !tables_marked_dropped.empty();
|
||||
}
|
||||
@ -820,11 +838,15 @@ void DatabaseCatalog::dropTableDataTask()
|
||||
tryLogCurrentException(log, "Cannot drop table " + table.table_id.getNameForLogs() +
|
||||
". Will retry later.");
|
||||
{
|
||||
table.drop_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()) + drop_error_cooldown_sec;
|
||||
std::lock_guard lock(tables_marked_dropped_mutex);
|
||||
tables_marked_dropped.emplace_back(std::move(table));
|
||||
/// If list of dropped tables was empty, schedule a task to retry deletion.
|
||||
if (tables_marked_dropped.size() == 1)
|
||||
{
|
||||
need_reschedule = true;
|
||||
schedule_after_ms = drop_error_cooldown_sec * 1000;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -833,7 +855,7 @@ void DatabaseCatalog::dropTableDataTask()
|
||||
|
||||
/// Do not schedule a task if there is no tables to drop
|
||||
if (need_reschedule)
|
||||
(*drop_task)->scheduleAfter(reschedule_time_ms);
|
||||
(*drop_task)->scheduleAfter(schedule_after_ms);
|
||||
}
|
||||
|
||||
void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
|
||||
|
@ -234,6 +234,7 @@ private:
|
||||
void dropTableFinally(const TableMarkedAsDropped & table);
|
||||
|
||||
static constexpr size_t reschedule_time_ms = 100;
|
||||
static constexpr time_t drop_error_cooldown_sec = 5;
|
||||
|
||||
private:
|
||||
using UUIDToDatabaseMap = std::unordered_map<UUID, DatabasePtr>;
|
||||
|
@ -449,6 +449,10 @@ public:
|
||||
/// We do not use mutex because it is not very important that the size could change during the operation.
|
||||
virtual void checkPartitionCanBeDropped(const ASTPtr & /*partition*/) {}
|
||||
|
||||
/// Returns true if Storage may store some data on disk.
|
||||
/// NOTE: may not be equivalent to !getDataPaths().empty()
|
||||
virtual bool storesDataOnDisk() const { return false; }
|
||||
|
||||
/// Returns data paths if storage supports it, empty vector otherwise.
|
||||
virtual Strings getDataPaths() const { return {}; }
|
||||
|
||||
|
@ -617,6 +617,7 @@ public:
|
||||
/// `additional_path` can be set if part is not located directly in table data path (e.g. 'detached/')
|
||||
std::optional<String> getFullRelativePathForPart(const String & part_name, const String & additional_path = "") const;
|
||||
|
||||
bool storesDataOnDisk() const override { return true; }
|
||||
Strings getDataPaths() const override;
|
||||
|
||||
using PathsWithDisks = std::vector<PathWithDisk>;
|
||||
|
@ -82,6 +82,7 @@ public:
|
||||
void shutdown() override;
|
||||
void drop() override;
|
||||
|
||||
bool storesDataOnDisk() const override { return true; }
|
||||
Strings getDataPaths() const override;
|
||||
|
||||
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
|
||||
|
@ -524,6 +524,11 @@ BlockOutputStreamPtr StorageFile::write(
|
||||
chooseCompressionMethod(path, compression_method), context);
|
||||
}
|
||||
|
||||
bool StorageFile::storesDataOnDisk() const
|
||||
{
|
||||
return is_db_table;
|
||||
}
|
||||
|
||||
Strings StorageFile::getDataPaths() const
|
||||
{
|
||||
if (paths.empty())
|
||||
|
@ -46,6 +46,7 @@ public:
|
||||
|
||||
void rename(const String & new_path_to_table_data, const StorageID & new_table_id) override;
|
||||
|
||||
bool storesDataOnDisk() const override;
|
||||
Strings getDataPaths() const override;
|
||||
|
||||
struct CommonArguments
|
||||
|
@ -41,6 +41,7 @@ public:
|
||||
|
||||
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
|
||||
|
||||
bool storesDataOnDisk() const override { return true; }
|
||||
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
|
||||
|
||||
protected:
|
||||
|
@ -144,6 +144,7 @@ public:
|
||||
CheckResults checkData(const ASTPtr & query , const Context & context) override { return getNested()->checkData(query, context); }
|
||||
void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); }
|
||||
void checkPartitionCanBeDropped(const ASTPtr & partition) override { getNested()->checkPartitionCanBeDropped(partition); }
|
||||
bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); }
|
||||
Strings getDataPaths() const override { return getNested()->getDataPaths(); }
|
||||
StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); }
|
||||
std::optional<UInt64> totalRows() const override { return getNested()->totalRows(); }
|
||||
|
@ -24,6 +24,7 @@ public:
|
||||
|
||||
BlockOutputStreamPtr write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, const Context & context) override;
|
||||
|
||||
bool storesDataOnDisk() const override { return true; }
|
||||
Strings getDataPaths() const override { return {path}; }
|
||||
|
||||
protected:
|
||||
|
@ -40,6 +40,7 @@ public:
|
||||
|
||||
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
|
||||
|
||||
bool storesDataOnDisk() const override { return true; }
|
||||
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
|
||||
|
||||
void truncate(const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder&) override;
|
||||
|
@ -39,6 +39,7 @@ public:
|
||||
|
||||
CheckResults checkData(const ASTPtr & /* query */, const Context & /* context */) override;
|
||||
|
||||
bool storesDataOnDisk() const override { return true; }
|
||||
Strings getDataPaths() const override { return {DB::fullPath(disk, table_path)}; }
|
||||
|
||||
void truncate(const ASTPtr &, const StorageMetadataPtr & metadata_snapshot, const Context &, TableExclusiveLockHolder &) override;
|
||||
|
@ -3,6 +3,7 @@
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP DATABASE IF EXISTS test_01114_1"
|
||||
$CLICKHOUSE_CLIENT -q "DROP DATABASE IF EXISTS test_01114_2"
|
||||
$CLICKHOUSE_CLIENT -q "DROP DATABASE IF EXISTS test_01114_3"
|
||||
|
@ -49,4 +49,4 @@ $CLICKHOUSE_CLIENT -q "SELECT if(quantile(0.5)(query_duration_ms) < $max_time_ms
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "SELECT count() * $count_multiplier, i, d, s, n.i, n.f FROM $db.table_merge GROUP BY i, d, s, n.i, n.f ORDER BY i"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "DROP DATABASE $db" --database_atomic_wait_for_drop_and_detach_synchronously=0
|
||||
$CLICKHOUSE_CLIENT -q "DROP DATABASE $db"
|
||||
|
Loading…
Reference in New Issue
Block a user