allow synchronous drop/detach table for Atomic

This commit is contained in:
Alexander Tokmakov 2020-09-29 16:42:58 +03:00
parent 51b9aaf4d8
commit 5d19ed04f6
9 changed files with 66 additions and 14 deletions

View File

@ -83,7 +83,7 @@ void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table,
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
std::unique_lock lock(mutex);
not_in_use = cleenupDetachedTables();
not_in_use = cleanupDetachedTables();
auto table_id = table->getStorageID();
assertDetachedTableNotInUse(table_id.uuid);
DatabaseWithDictionaries::attachTableUnlocked(name, table, lock);
@ -97,7 +97,7 @@ StoragePtr DatabaseAtomic::detachTable(const String & name)
auto table = DatabaseWithDictionaries::detachTableUnlocked(name, lock);
table_name_to_path.erase(name);
detached_tables.emplace(table->getStorageID().uuid, table);
not_in_use = cleenupDetachedTables();
not_in_use = cleanupDetachedTables();
return table;
}
@ -263,7 +263,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
if (query.database != database_name)
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed to `{}`, cannot create table in `{}`",
database_name, query.database);
not_in_use = cleenupDetachedTables();
not_in_use = cleanupDetachedTables();
assertDetachedTableNotInUse(query.uuid);
renameNoReplace(table_metadata_tmp_path, table_metadata_path);
attachTableUnlocked(query.table, table, lock); /// Should never throw
@ -306,7 +306,7 @@ void DatabaseAtomic::assertDetachedTableNotInUse(const UUID & uuid)
", because it was detached but still used by some query. Retry later.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
DatabaseAtomic::DetachedTables DatabaseAtomic::cleenupDetachedTables()
DatabaseAtomic::DetachedTables DatabaseAtomic::cleanupDetachedTables()
{
DetachedTables not_in_use;
auto it = detached_tables.begin();
@ -324,14 +324,14 @@ DatabaseAtomic::DetachedTables DatabaseAtomic::cleenupDetachedTables()
return not_in_use;
}
void DatabaseAtomic::assertCanBeDetached(bool cleenup)
void DatabaseAtomic::assertCanBeDetached(bool cleanup)
{
if (cleenup)
if (cleanup)
{
DetachedTables not_in_use;
{
std::lock_guard lock(mutex);
not_in_use = cleenupDetachedTables();
not_in_use = cleanupDetachedTables();
}
}
std::lock_guard lock(mutex);
@ -500,6 +500,28 @@ void DatabaseAtomic::renameDictionaryInMemoryUnlocked(const StorageID & old_name
const auto & dict = dynamic_cast<const IDictionaryBase &>(*result.object);
dict.updateDictionaryName(new_name);
}
void DatabaseAtomic::waitDetachedTableNotInUse(const UUID & uuid)
{
{
std::lock_guard lock{mutex};
if (detached_tables.count(uuid) == 0)
return;
}
/// Table is in use while its shared_ptr counter is greater than 1.
/// We cannot trigger condvar on shared_ptr destruction, so it's busy wait.
while (true)
{
DetachedTables not_in_use;
{
std::lock_guard lock{mutex};
not_in_use = cleanupDetachedTables();
if (detached_tables.count(uuid) == 0)
return;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
}

View File

@ -58,6 +58,8 @@ public:
void tryCreateSymlink(const String & table_name, const String & actual_data_path);
void tryRemoveSymlink(const String & table_name);
void waitDetachedTableNotInUse(const UUID & uuid);
private:
void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path) override;
void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table,
@ -65,7 +67,7 @@ private:
void assertDetachedTableNotInUse(const UUID & uuid);
typedef std::unordered_map<UUID, StoragePtr> DetachedTables;
[[nodiscard]] DetachedTables cleenupDetachedTables();
[[nodiscard]] DetachedTables cleanupDetachedTables();
void tryCreateMetadataSymlink();

View File

@ -701,6 +701,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
tables_marked_dropped.push_front({table_id, table, dropped_metadata_path, 0});
else
tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time});
tables_marked_dropped_ids.insert(table_id.uuid);
/// If list of dropped tables was empty, start a drop task
if (drop_task && tables_marked_dropped.size() == 1)
(*drop_task)->schedule();
@ -742,6 +743,9 @@ void DatabaseCatalog::dropTableDataTask()
try
{
dropTableFinally(table);
std::lock_guard lock(tables_marked_dropped_mutex);
auto removed = tables_marked_dropped_ids.erase(table.table_id.uuid);
assert(removed);
}
catch (...)
{
@ -755,6 +759,8 @@ void DatabaseCatalog::dropTableDataTask()
need_reschedule = true;
}
}
wait_table_finally_dropped.notify_all();
}
/// Do not schedule a task if there is no tables to drop
@ -814,6 +820,16 @@ String DatabaseCatalog::resolveDictionaryName(const String & name) const
return toString(db_and_table.second->getStorageID().uuid);
}
void DatabaseCatalog::waitTableFinallyDropped(const UUID & uuid)
{
if (uuid == UUIDHelpers::Nil)
return;
std::unique_lock lock{tables_marked_dropped_mutex};
wait_table_finally_dropped.wait(lock, [&](){
return tables_marked_dropped_ids.count(uuid) == 0;
});
}
DDLGuard::DDLGuard(Map & map_, std::shared_mutex & db_mutex_, std::unique_lock<std::mutex> guards_lock_, const String & elem)
: map(map_), db_mutex(db_mutex_), guards_lock(std::move(guards_lock_))

View File

@ -9,6 +9,7 @@
#include <map>
#include <set>
#include <unordered_map>
#include <unordered_set>
#include <mutex>
#include <shared_mutex>
#include <array>
@ -179,6 +180,8 @@ public:
/// Try convert qualified dictionary name to persistent UUID
String resolveDictionaryName(const String & name) const;
void waitTableFinallyDropped(const UUID & uuid);
private:
// The global instance of database catalog. unique_ptr is to allow
// deferred initialization. Thought I'd use std::optional, but I can't
@ -249,11 +252,13 @@ private:
mutable std::mutex ddl_guards_mutex;
TablesMarkedAsDropped tables_marked_dropped;
std::unordered_set<UUID> tables_marked_dropped_ids;
mutable std::mutex tables_marked_dropped_mutex;
std::unique_ptr<BackgroundSchedulePoolTaskHolder> drop_task;
static constexpr time_t default_drop_delay_sec = 8 * 60;
time_t drop_delay_sec = default_drop_delay_sec;
std::condition_variable wait_table_finally_dropped;
};
}

View File

@ -124,6 +124,19 @@ BlockIO InterpreterDropQuery::executeToTable(
}
}
table.reset();
ddl_guard = {};
if (query.no_delay)
{
if (query.kind == ASTDropQuery::Kind::Drop)
DatabaseCatalog::instance().waitTableFinallyDropped(table_id.uuid);
else if (query.kind == ASTDropQuery::Kind::Detach)
{
if (auto * atomic = typeid_cast<DatabaseAtomic *>(database.get()))
atomic->waitDetachedTableNotInUse(table_id.uuid);
}
}
return {};
}

View File

@ -75,7 +75,6 @@ def drop_table(cluster):
minio = cluster.minio_client
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
time.sleep(1)
try:
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0
finally:
@ -317,7 +316,6 @@ def test_move_replace_partition_to_another_table(cluster):
minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4
node.query("DROP TABLE s3_clone NO DELAY")
time.sleep(1)
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
# Data should remain in S3
@ -330,7 +328,6 @@ def test_move_replace_partition_to_another_table(cluster):
list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
node.query("DROP TABLE s3_test NO DELAY")
time.sleep(1)
# Backup data should remain in S3.
assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE * 4

View File

@ -37,7 +37,6 @@ def started_cluster():
def drop_table(nodes, table_name):
for node in nodes:
node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name))
time.sleep(1)
def create_table(nodes, table_name, with_storage_policy=False, with_time_column=False,

View File

@ -403,7 +403,6 @@ def test_moves_to_disk_eventually_work(started_cluster, name, engine):
node1.query("DROP TABLE {} NO DELAY".format(name_temp))
time.sleep(2)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod2"}

View File

@ -27,7 +27,6 @@ def started_cluster():
def drop_table(nodes, table_name):
for node in nodes:
node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name))
time.sleep(1)
# Column TTL works only with wide parts, because it's very expensive to apply it for compact parts
def test_ttl_columns(started_cluster):