From 5d19ed04f6d638791f96dad01c8c0bb414e96bc9 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Tue, 29 Sep 2020 16:42:58 +0300 Subject: [PATCH] allow synchronous drop/detach table for Atomic --- src/Databases/DatabaseAtomic.cpp | 36 +++++++++++++++---- src/Databases/DatabaseAtomic.h | 4 ++- src/Interpreters/DatabaseCatalog.cpp | 16 +++++++++ src/Interpreters/DatabaseCatalog.h | 5 +++ src/Interpreters/InterpreterDropQuery.cpp | 13 +++++++ tests/integration/test_merge_tree_s3/test.py | 3 -- tests/integration/test_rename_column/test.py | 1 - tests/integration/test_ttl_move/test.py | 1 - tests/integration/test_ttl_replicated/test.py | 1 - 9 files changed, 66 insertions(+), 14 deletions(-) diff --git a/src/Databases/DatabaseAtomic.cpp b/src/Databases/DatabaseAtomic.cpp index 6ad547898e2..fa4511cd104 100644 --- a/src/Databases/DatabaseAtomic.cpp +++ b/src/Databases/DatabaseAtomic.cpp @@ -83,7 +83,7 @@ void DatabaseAtomic::attachTable(const String & name, const StoragePtr & table, assert(relative_table_path != data_path && !relative_table_path.empty()); DetachedTables not_in_use; std::unique_lock lock(mutex); - not_in_use = cleenupDetachedTables(); + not_in_use = cleanupDetachedTables(); auto table_id = table->getStorageID(); assertDetachedTableNotInUse(table_id.uuid); DatabaseWithDictionaries::attachTableUnlocked(name, table, lock); @@ -97,7 +97,7 @@ StoragePtr DatabaseAtomic::detachTable(const String & name) auto table = DatabaseWithDictionaries::detachTableUnlocked(name, lock); table_name_to_path.erase(name); detached_tables.emplace(table->getStorageID().uuid, table); - not_in_use = cleenupDetachedTables(); + not_in_use = cleanupDetachedTables(); return table; } @@ -263,7 +263,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora if (query.database != database_name) throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database was renamed to `{}`, cannot create table in `{}`", database_name, query.database); - not_in_use = cleenupDetachedTables(); + not_in_use = cleanupDetachedTables(); assertDetachedTableNotInUse(query.uuid); renameNoReplace(table_metadata_tmp_path, table_metadata_path); attachTableUnlocked(query.table, table, lock); /// Should never throw @@ -306,7 +306,7 @@ void DatabaseAtomic::assertDetachedTableNotInUse(const UUID & uuid) ", because it was detached but still used by some query. Retry later.", ErrorCodes::TABLE_ALREADY_EXISTS); } -DatabaseAtomic::DetachedTables DatabaseAtomic::cleenupDetachedTables() +DatabaseAtomic::DetachedTables DatabaseAtomic::cleanupDetachedTables() { DetachedTables not_in_use; auto it = detached_tables.begin(); @@ -324,14 +324,14 @@ DatabaseAtomic::DetachedTables DatabaseAtomic::cleenupDetachedTables() return not_in_use; } -void DatabaseAtomic::assertCanBeDetached(bool cleenup) +void DatabaseAtomic::assertCanBeDetached(bool cleanup) { - if (cleenup) + if (cleanup) { DetachedTables not_in_use; { std::lock_guard lock(mutex); - not_in_use = cleenupDetachedTables(); + not_in_use = cleanupDetachedTables(); } } std::lock_guard lock(mutex); @@ -500,6 +500,28 @@ void DatabaseAtomic::renameDictionaryInMemoryUnlocked(const StorageID & old_name const auto & dict = dynamic_cast(*result.object); dict.updateDictionaryName(new_name); } +void DatabaseAtomic::waitDetachedTableNotInUse(const UUID & uuid) +{ + { + std::lock_guard lock{mutex}; + if (detached_tables.count(uuid) == 0) + return; + } + + /// Table is in use while its shared_ptr counter is greater than 1. + /// We cannot trigger condvar on shared_ptr destruction, so it's busy wait. + while (true) + { + DetachedTables not_in_use; + { + std::lock_guard lock{mutex}; + not_in_use = cleanupDetachedTables(); + if (detached_tables.count(uuid) == 0) + return; + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } +} } diff --git a/src/Databases/DatabaseAtomic.h b/src/Databases/DatabaseAtomic.h index 02c922f8b91..8fa0d1837e7 100644 --- a/src/Databases/DatabaseAtomic.h +++ b/src/Databases/DatabaseAtomic.h @@ -58,6 +58,8 @@ public: void tryCreateSymlink(const String & table_name, const String & actual_data_path); void tryRemoveSymlink(const String & table_name); + void waitDetachedTableNotInUse(const UUID & uuid); + private: void commitAlterTable(const StorageID & table_id, const String & table_metadata_tmp_path, const String & table_metadata_path) override; void commitCreateTable(const ASTCreateQuery & query, const StoragePtr & table, @@ -65,7 +67,7 @@ private: void assertDetachedTableNotInUse(const UUID & uuid); typedef std::unordered_map DetachedTables; - [[nodiscard]] DetachedTables cleenupDetachedTables(); + [[nodiscard]] DetachedTables cleanupDetachedTables(); void tryCreateMetadataSymlink(); diff --git a/src/Interpreters/DatabaseCatalog.cpp b/src/Interpreters/DatabaseCatalog.cpp index 049341918b9..e58213fe366 100644 --- a/src/Interpreters/DatabaseCatalog.cpp +++ b/src/Interpreters/DatabaseCatalog.cpp @@ -701,6 +701,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr tables_marked_dropped.push_front({table_id, table, dropped_metadata_path, 0}); else tables_marked_dropped.push_back({table_id, table, dropped_metadata_path, drop_time}); + tables_marked_dropped_ids.insert(table_id.uuid); /// If list of dropped tables was empty, start a drop task if (drop_task && tables_marked_dropped.size() == 1) (*drop_task)->schedule(); @@ -742,6 +743,9 @@ void DatabaseCatalog::dropTableDataTask() try { dropTableFinally(table); + std::lock_guard lock(tables_marked_dropped_mutex); + auto removed = tables_marked_dropped_ids.erase(table.table_id.uuid); + assert(removed); } catch (...) { @@ -755,6 +759,8 @@ void DatabaseCatalog::dropTableDataTask() need_reschedule = true; } } + + wait_table_finally_dropped.notify_all(); } /// Do not schedule a task if there is no tables to drop @@ -814,6 +820,16 @@ String DatabaseCatalog::resolveDictionaryName(const String & name) const return toString(db_and_table.second->getStorageID().uuid); } +void DatabaseCatalog::waitTableFinallyDropped(const UUID & uuid) +{ + if (uuid == UUIDHelpers::Nil) + return; + std::unique_lock lock{tables_marked_dropped_mutex}; + wait_table_finally_dropped.wait(lock, [&](){ + return tables_marked_dropped_ids.count(uuid) == 0; + }); +} + DDLGuard::DDLGuard(Map & map_, std::shared_mutex & db_mutex_, std::unique_lock guards_lock_, const String & elem) : map(map_), db_mutex(db_mutex_), guards_lock(std::move(guards_lock_)) diff --git a/src/Interpreters/DatabaseCatalog.h b/src/Interpreters/DatabaseCatalog.h index 8ef3ecfe656..7bc6923bde4 100644 --- a/src/Interpreters/DatabaseCatalog.h +++ b/src/Interpreters/DatabaseCatalog.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -179,6 +180,8 @@ public: /// Try convert qualified dictionary name to persistent UUID String resolveDictionaryName(const String & name) const; + void waitTableFinallyDropped(const UUID & uuid); + private: // The global instance of database catalog. unique_ptr is to allow // deferred initialization. Thought I'd use std::optional, but I can't @@ -249,11 +252,13 @@ private: mutable std::mutex ddl_guards_mutex; TablesMarkedAsDropped tables_marked_dropped; + std::unordered_set tables_marked_dropped_ids; mutable std::mutex tables_marked_dropped_mutex; std::unique_ptr drop_task; static constexpr time_t default_drop_delay_sec = 8 * 60; time_t drop_delay_sec = default_drop_delay_sec; + std::condition_variable wait_table_finally_dropped; }; } diff --git a/src/Interpreters/InterpreterDropQuery.cpp b/src/Interpreters/InterpreterDropQuery.cpp index dec6a275872..b6d61b6c969 100644 --- a/src/Interpreters/InterpreterDropQuery.cpp +++ b/src/Interpreters/InterpreterDropQuery.cpp @@ -124,6 +124,19 @@ BlockIO InterpreterDropQuery::executeToTable( } } + table.reset(); + ddl_guard = {}; + if (query.no_delay) + { + if (query.kind == ASTDropQuery::Kind::Drop) + DatabaseCatalog::instance().waitTableFinallyDropped(table_id.uuid); + else if (query.kind == ASTDropQuery::Kind::Detach) + { + if (auto * atomic = typeid_cast(database.get())) + atomic->waitDetachedTableNotInUse(table_id.uuid); + } + } + return {}; } diff --git a/tests/integration/test_merge_tree_s3/test.py b/tests/integration/test_merge_tree_s3/test.py index 7acb8c5fe00..45b3c3c65f0 100644 --- a/tests/integration/test_merge_tree_s3/test.py +++ b/tests/integration/test_merge_tree_s3/test.py @@ -75,7 +75,6 @@ def drop_table(cluster): minio = cluster.minio_client node.query("DROP TABLE IF EXISTS s3_test NO DELAY") - time.sleep(1) try: assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == 0 finally: @@ -317,7 +316,6 @@ def test_move_replace_partition_to_another_table(cluster): minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD * 2 + FILES_OVERHEAD_PER_PART_WIDE * 4 node.query("DROP TABLE s3_clone NO DELAY") - time.sleep(1) assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)" assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)" # Data should remain in S3 @@ -330,7 +328,6 @@ def test_move_replace_partition_to_another_table(cluster): list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4 node.query("DROP TABLE s3_test NO DELAY") - time.sleep(1) # Backup data should remain in S3. assert len(list(minio.list_objects(cluster.minio_bucket, 'data/'))) == FILES_OVERHEAD_PER_PART_WIDE * 4 diff --git a/tests/integration/test_rename_column/test.py b/tests/integration/test_rename_column/test.py index 51921c3385c..b7ab8a75ba5 100644 --- a/tests/integration/test_rename_column/test.py +++ b/tests/integration/test_rename_column/test.py @@ -37,7 +37,6 @@ def started_cluster(): def drop_table(nodes, table_name): for node in nodes: node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name)) - time.sleep(1) def create_table(nodes, table_name, with_storage_policy=False, with_time_column=False, diff --git a/tests/integration/test_ttl_move/test.py b/tests/integration/test_ttl_move/test.py index 377ee0e5d75..59705f7b3dd 100644 --- a/tests/integration/test_ttl_move/test.py +++ b/tests/integration/test_ttl_move/test.py @@ -403,7 +403,6 @@ def test_moves_to_disk_eventually_work(started_cluster, name, engine): node1.query("DROP TABLE {} NO DELAY".format(name_temp)) - time.sleep(2) used_disks = get_used_disks_for_table(node1, name) assert set(used_disks) == {"jbod2"} diff --git a/tests/integration/test_ttl_replicated/test.py b/tests/integration/test_ttl_replicated/test.py index 878db2da11f..13fb779a6e6 100644 --- a/tests/integration/test_ttl_replicated/test.py +++ b/tests/integration/test_ttl_replicated/test.py @@ -27,7 +27,6 @@ def started_cluster(): def drop_table(nodes, table_name): for node in nodes: node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name)) - time.sleep(1) # Column TTL works only with wide parts, because it's very expensive to apply it for compact parts def test_ttl_columns(started_cluster):