Remove data for Distributed tables (blocks from async INSERTs) on DROP TABLE

This commit is contained in:
Azat Khuzhin 2020-07-16 23:35:23 +03:00
parent 10e12c32cb
commit 6ea1b19476
3 changed files with 36 additions and 1 deletions

View File

@ -618,6 +618,26 @@ void StorageDistributed::shutdown()
std::lock_guard lock(cluster_nodes_mutex);
cluster_nodes_data.clear();
}
void StorageDistributed::drop()
{
// shutdown() should be already called
// and by the same reason we cannot use truncate() here, since
// cluster_nodes_data already cleaned
if (!cluster_nodes_data.empty())
throw Exception("drop called before shutdown", ErrorCodes::LOGICAL_ERROR);
// Distributed table w/o sharding_key does not allows INSERTs
if (relative_data_path.empty())
return;
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on DROP TABLE");
auto disks = volume->getDisks();
for (const auto & disk : disks)
disk->removeRecursive(relative_data_path);
LOG_DEBUG(log, "Removed");
}
Strings StorageDistributed::getDataPaths() const
{
@ -636,11 +656,15 @@ void StorageDistributed::truncate(const ASTPtr &, const StorageMetadataPtr &, co
{
std::lock_guard lock(cluster_nodes_mutex);
LOG_DEBUG(log, "Removing pending blocks for async INSERT from filesystem on TRUNCATE TABLE");
for (auto it = cluster_nodes_data.begin(); it != cluster_nodes_data.end();)
{
it->second.shutdownAndDropAllData();
it = cluster_nodes_data.erase(it);
}
LOG_DEBUG(log, "Removed");
}
StoragePolicyPtr StorageDistributed::getStoragePolicy() const

View File

@ -95,6 +95,7 @@ public:
void startup() override;
void shutdown() override;
void drop() override;
Strings getDataPaths() const override;

View File

@ -29,7 +29,7 @@ def _files_in_dist_mon(node, root, table):
'find /{root}/data/test/{table}/default@127%2E0%2E0%2E2:9000 -maxdepth 1 -type f 2>/dev/null | wc -l'.format(root=root, table=table)
]).split('\n')[0])
def test_different_versions(start_cluster):
def test_insert(start_cluster):
node.query('CREATE TABLE test.foo (key Int) Engine=Memory()')
node.query("""
CREATE TABLE test.dist_foo (key Int)
@ -64,3 +64,13 @@ def test_different_versions(start_cluster):
assert node.query('SELECT count() FROM test.dist2_foo') == '300\n'
node.query('SYSTEM FLUSH DISTRIBUTED test.dist2_foo')
assert node.query('SELECT count() FROM test.dist2_foo') == '400\n'
#
# DROP
#
node.query('DROP TABLE test.dist2_foo')
for disk in ['disk1', 'disk2']:
node.exec_in_container([
'bash', '-c',
'test ! -e /{}/data/test/dist2_foo'.format(disk)
])