From 6ea1b19476b9dc2331d79793c59e21640a4610a7 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Thu, 16 Jul 2020 23:35:23 +0300 Subject: [PATCH] Remove data for Distributed tables (blocks from async INSERTs) on DROP TABLE --- src/Storages/StorageDistributed.cpp | 24 +++++++++++++++++++ src/Storages/StorageDistributed.h | 1 + .../test.py | 12 +++++++++- 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 37703f9a719..ab47eb6f8e0 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -618,6 +618,26 @@ void StorageDistributed::shutdown() std::lock_guard lock(cluster_nodes_mutex); cluster_nodes_data.clear(); } +void StorageDistributed::drop() +{ + // shutdown() should be already called + // and by the same reason we cannot use truncate() here, since + // cluster_nodes_data already cleaned + if (!cluster_nodes_data.empty()) + throw Exception("drop called before shutdown", ErrorCodes::LOGICAL_ERROR); + + // Distributed table w/o sharding_key does not allows INSERTs + if (relative_data_path.empty()) + return; + + LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on DROP TABLE"); + + auto disks = volume->getDisks(); + for (const auto & disk : disks) + disk->removeRecursive(relative_data_path); + + LOG_DEBUG(log, "Removed"); +} Strings StorageDistributed::getDataPaths() const { @@ -636,11 +656,15 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, co { std::lock_guard lock(cluster_nodes_mutex); + LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on TRUNCATE TABLE"); + for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();) { it->second.shutdownAndDropAllData(); it = cluster_nodes_data.erase(it); } + + LOG_DEBUG(log, "Removed"); } StoragePolicyPtr StorageDistributed::getStoragePolicy() const diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index ea60c481de8..c2277b23f11 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -95,6 +95,7 @@ public: void startup() override; void shutdown() override; + void drop() override; Strings getDataPaths() const override; diff --git a/tests/integration/test_distributed_storage_configuration/test.py b/tests/integration/test_distributed_storage_configuration/test.py index 019db11d5a5..8dfaab659cb 100644 --- a/tests/integration/test_distributed_storage_configuration/test.py +++ b/tests/integration/test_distributed_storage_configuration/test.py @@ -29,7 +29,7 @@ def _files_in_dist_mon(node, root, table): 'find /{root}/data/test/{table}/default@127%2E0%2E0%2E2:9000 -maxdepth 1 -type f 2>/dev/null | wc -l'.format(root=root, table=table) ]).split('\n')[0]) -def test_different_versions(start_cluster): +def test_insert(start_cluster): node.query('CREATE TABLE test.foo (key Int) Engine=Memory()') node.query(""" CREATE TABLE test.dist_foo (key Int) @@ -64,3 +64,13 @@ def test_different_versions(start_cluster): assert node.query('SELECT count() FROM test.dist2_foo') == '300\n' node.query('SYSTEM FLUSH DISTRIBUTED test.dist2_foo') assert node.query('SELECT count() FROM test.dist2_foo') == '400\n' + + # + # DROP + # + node.query('DROP TABLE test.dist2_foo') + for disk in ['disk1', 'disk2']: + node.exec_in_container([ + 'bash', '-c', + 'test ! -e /{}/data/test/dist2_foo'.format(disk) + ])