mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #63848 from MikhailBurdukov/lazy_database_cleanup
Remove data from all disks after DROP with Lazy database.
This commit is contained in:
commit
40e3527d36
@ -5,6 +5,7 @@
|
||||
#include <span>
|
||||
#include <Databases/DatabaseAtomic.h>
|
||||
#include <Databases/DatabaseOrdinary.h>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
@ -326,31 +327,36 @@ void DatabaseOnDisk::dropTable(ContextPtr local_context, const String & table_na
|
||||
|
||||
StoragePtr table = detachTable(local_context, table_name);
|
||||
|
||||
/// This is possible for Lazy database.
|
||||
if (!table)
|
||||
return;
|
||||
|
||||
bool renamed = false;
|
||||
try
|
||||
{
|
||||
fs::rename(table_metadata_path, table_metadata_path_drop);
|
||||
renamed = true;
|
||||
// The table might be not loaded for Lazy database engine.
|
||||
if (table)
|
||||
{
|
||||
table->drop();
|
||||
table->is_dropped = true;
|
||||
|
||||
fs::path table_data_dir(local_context->getPath() + table_data_path_relative);
|
||||
if (fs::exists(table_data_dir))
|
||||
(void)fs::remove_all(table_data_dir);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_WARNING(log, getCurrentExceptionMessageAndPattern(/* with_stacktrace */ true));
|
||||
if (table)
|
||||
attachTable(local_context, table_name, table, table_data_path_relative);
|
||||
if (renamed)
|
||||
fs::rename(table_metadata_path_drop, table_metadata_path);
|
||||
throw;
|
||||
}
|
||||
|
||||
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
|
||||
{
|
||||
if (disk->isReadOnly() || !disk->exists(table_data_path_relative))
|
||||
continue;
|
||||
|
||||
LOG_INFO(log, "Removing data directory from disk {} with path {} for dropped table {} ", disk_name, table_data_path_relative, table_name);
|
||||
disk->removeRecursive(table_data_path_relative);
|
||||
}
|
||||
(void)fs::remove(table_metadata_path_drop);
|
||||
}
|
||||
|
||||
|
0
tests/integration/test_lazy_database/__init__.py
Normal file
0
tests/integration/test_lazy_database/__init__.py
Normal file
@ -0,0 +1,12 @@
|
||||
<clickhouse>
|
||||
<storage_configuration>
|
||||
<disks>
|
||||
<s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://minio1:9001/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
</s3>
|
||||
</disks>
|
||||
</storage_configuration>
|
||||
</clickhouse>
|
88
tests/integration/test_lazy_database/test.py
Normal file
88
tests/integration/test_lazy_database/test.py
Normal file
@ -0,0 +1,88 @@
|
||||
import logging
|
||||
import time
|
||||
import pytest
|
||||
import os
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def cluster():
|
||||
try:
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
cluster.add_instance(
|
||||
"node",
|
||||
main_configs=["configs/storage_policy.xml"],
|
||||
with_minio=True,
|
||||
)
|
||||
logging.info("Starting cluster...")
|
||||
cluster.start()
|
||||
logging.info("Cluster started")
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def assert_objects_count(cluster, objects_count, path="data/"):
|
||||
minio = cluster.minio_client
|
||||
s3_objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
|
||||
if objects_count != len(s3_objects):
|
||||
for s3_object in s3_objects:
|
||||
object_meta = minio.stat_object(cluster.minio_bucket, s3_object.object_name)
|
||||
logging.info("Existing S3 object: %s", str(object_meta))
|
||||
assert objects_count == len(s3_objects)
|
||||
|
||||
|
||||
def list_of_files_on_ch_disk(node, disk, path):
|
||||
disk_path = node.query(
|
||||
f"SELECT path FROM system.disks WHERE name='{disk}'"
|
||||
).splitlines()[0]
|
||||
return node.exec_in_container(
|
||||
["bash", "-c", f"ls {os.path.join(disk_path, path)}"], user="root"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"engine",
|
||||
[
|
||||
pytest.param("Log"),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"disk,check_s3",
|
||||
[
|
||||
pytest.param("default", False),
|
||||
pytest.param("s3", True),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"delay",
|
||||
[
|
||||
pytest.param(0),
|
||||
pytest.param(4),
|
||||
],
|
||||
)
|
||||
def test_drop_table(cluster, engine, disk, check_s3, delay):
|
||||
node = cluster.instances["node"]
|
||||
|
||||
node.query("DROP DATABASE IF EXISTS lazy")
|
||||
node.query("CREATE DATABASE lazy ENGINE=Lazy(2)")
|
||||
node.query(
|
||||
"CREATE TABLE lazy.table (id UInt64) ENGINE={} SETTINGS disk = '{}'".format(
|
||||
engine,
|
||||
disk,
|
||||
)
|
||||
)
|
||||
|
||||
node.query("INSERT INTO lazy.table SELECT number FROM numbers(10)")
|
||||
assert node.query("SELECT count(*) FROM lazy.table") == "10\n"
|
||||
if delay:
|
||||
time.sleep(delay)
|
||||
node.query("DROP TABLE lazy.table SYNC")
|
||||
|
||||
if check_s3:
|
||||
# There mustn't be any orphaned data
|
||||
assert_objects_count(cluster, 0)
|
||||
|
||||
# Local data must be removed
|
||||
assert list_of_files_on_ch_disk(node, disk, "data/lazy/") == ""
|
Loading…
Reference in New Issue
Block a user