diff --git a/src/Disks/IDiskRemote.cpp b/src/Disks/IDiskRemote.cpp index 637035920b1..398e617196d 100644 --- a/src/Disks/IDiskRemote.cpp +++ b/src/Disks/IDiskRemote.cpp @@ -344,17 +344,6 @@ void IDiskRemote::replaceFile(const String & from_path, const String & to_path) } -void IDiskRemote::removeFileIfExists(const String & path) -{ - RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); - if (fs::exists(fs::path(metadata_path) / path)) - { - removeMeta(path, fs_paths_keeper); - removeFromRemoteFS(fs_paths_keeper); - } -} - - void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs) { RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); @@ -364,6 +353,18 @@ void IDiskRemote::removeSharedFile(const String & path, bool keep_in_remote_fs) } +void IDiskRemote::removeSharedFileIfExists(const String & path, bool keep_in_remote_fs) +{ + RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); + if (fs::exists(fs::path(metadata_path) / path)) + { + removeMeta(path, fs_paths_keeper); + if (!keep_in_remote_fs) + removeFromRemoteFS(fs_paths_keeper); + } +} + + void IDiskRemote::removeSharedRecursive(const String & path, bool keep_in_remote_fs) { RemoteFSPathKeeperPtr fs_paths_keeper = createFSPathKeeper(); diff --git a/src/Disks/IDiskRemote.h b/src/Disks/IDiskRemote.h index 3b5c82c8b9e..80b01c3c949 100644 --- a/src/Disks/IDiskRemote.h +++ b/src/Disks/IDiskRemote.h @@ -83,12 +83,14 @@ public: void removeFile(const String & path) override { removeSharedFile(path, false); } - void removeFileIfExists(const String & path) override; + void removeFileIfExists(const String & path) override { removeSharedFileIfExists(path, false); } void removeRecursive(const String & path) override { removeSharedRecursive(path, false); } void removeSharedFile(const String & path, bool keep_in_remote_fs) override; + void removeSharedFileIfExists(const String & path, bool keep_in_remote_fs) override; + void removeSharedRecursive(const String & path, bool keep_in_remote_fs) override; void listFiles(const String & path, std::vector & file_names) override; diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index ae1947fdb22..e30da82416d 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -43,7 +43,6 @@ namespace ErrorCodes extern const int S3_ERROR; extern const int INCORRECT_PART_TYPE; extern const int ZERO_COPY_REPLICATION_ERROR; - extern const int NO_RESERVATIONS_PROVIDED; } namespace DataPartsExchange diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index b163ad7a42e..70e72c85e79 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1103,7 +1103,7 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_ std::optional IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const { - /// NOTE: It's needed for S3 zero-copy replication + /// NOTE: It's needed for zero-copy replication if (force_keep_shared_data) return true; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e01cc12fd15..0f7cbac7ae9 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2751,15 +2751,12 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy) original_active_part->force_keep_shared_data = false; - auto original_disk_type = original_active_part->volume->getDisk()->getType(); - if (original_disk_type == DiskType::Type::S3 || original_disk_type == DiskType::Type::HDFS) + if (original_active_part->volume->getDisk()->supportZeroCopyReplication() && + part_copy->volume->getDisk()->supportZeroCopyReplication() && + original_active_part->getUniqueId() == part_copy->getUniqueId()) { - if (part_copy->volume->getDisk()->getType() == original_disk_type - && original_active_part->getUniqueId() == part_copy->getUniqueId()) - { - /// May be when several volumes use the same S3 storage - original_active_part->force_keep_shared_data = true; - } + /// May be when several volumes use the same S3/HDFS storage + original_active_part->force_keep_shared_data = true; } modifyPartState(original_active_part, DataPartState::DeleteOnDestroy); diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index c389b9616f8..4129942eb92 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -7310,17 +7310,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica( if (!zookeeper) return best_replica; - String zero_copy; - if (disk_type == DiskType::Type::S3) - { - zero_copy = "zero_copy_s3"; - } - else if (disk_type == DiskType::Type::HDFS) - { - zero_copy = "zero_copy_hdfs"; - } - else - return best_replica; + String zero_copy = fmt::format("zero_copy_{}", DiskType::toString(disk_type)); String zookeeper_part_node = fs::path(zookeeper_path) / zero_copy / "shared" / part.name; Strings ids; diff --git a/tests/integration/README.md b/tests/integration/README.md index cf2acd19db5..8c353658705 100644 --- a/tests/integration/README.md +++ b/tests/integration/README.md @@ -34,6 +34,7 @@ sudo -H pip install \ protobuf \ psycopg2-binary \ pymongo \ + pytz \ pytest \ pytest-timeout \ redis \ diff --git a/tests/integration/test_replicated_merge_tree_hdfs_zero_copy/test.py b/tests/integration/test_replicated_merge_tree_hdfs_zero_copy/test.py index 9bf9e08fb2a..da1eb20a5c9 100644 --- a/tests/integration/test_replicated_merge_tree_hdfs_zero_copy/test.py +++ b/tests/integration/test_replicated_merge_tree_hdfs_zero_copy/test.py @@ -49,6 +49,19 @@ def cluster(): cluster.shutdown() +@pytest.fixture(autouse=True) +def cleanup_after_test(cluster): + try: + yield + finally: + for instance in cluster.instances.values(): + instance.query("DROP TABLE IF EXISTS hdfs_test NO DELAY") + instance.query("DROP TABLE IF EXISTS single_node_move_test NO DELAY") + instance.query("DROP TABLE IF EXISTS move_test NO DELAY") + instance.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY") + instance.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY") + + def test_hdfs_zero_copy_replication_insert(cluster): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] @@ -75,8 +88,6 @@ def test_hdfs_zero_copy_replication_insert(cluster): wait_for_hdfs_objects(cluster, "/clickhouse1", SHARDS * FILES_OVERHEAD_PER_TABLE + FILES_OVERHEAD_PER_PART_COMPACT) - node1.query("DROP TABLE IF EXISTS hdfs_test NO DELAY") - node2.query("DROP TABLE IF EXISTS hdfs_test NO DELAY") @pytest.mark.parametrize( @@ -88,7 +99,6 @@ def test_hdfs_zero_copy_replication_insert(cluster): def test_hdfs_zero_copy_replication_single_move(cluster, storage_policy, init_objects): node1 = cluster.instances["node1"] - node1.query("DROP TABLE IF EXISTS single_node_move_test NO DELAY") node1.query( Template(""" CREATE TABLE single_node_move_test (dt DateTime, id Int64) @@ -110,8 +120,6 @@ def test_hdfs_zero_copy_replication_single_move(cluster, storage_policy, init_ob node1.query("ALTER TABLE single_node_move_test MOVE PARTITION ID 'all' TO VOLUME 'main'") assert node1.query("SELECT id FROM single_node_move_test ORDER BY dt FORMAT Values") == "(10),(11)" - node1.query("DROP TABLE IF EXISTS single_node_move_test NO DELAY") - @pytest.mark.parametrize( ("storage_policy", "init_objects"), @@ -123,8 +131,6 @@ def test_hdfs_zero_copy_replication_move(cluster, storage_policy, init_objects): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] - node1.query("DROP TABLE IF EXISTS move_test NO DELAY") - node2.query("DROP TABLE IF EXISTS move_test NO DELAY") node1.query( Template(""" CREATE TABLE move_test ON CLUSTER test_cluster (dt DateTime, id Int64) @@ -151,9 +157,6 @@ def test_hdfs_zero_copy_replication_move(cluster, storage_policy, init_objects): assert node2.query("SELECT id FROM move_test ORDER BY dt FORMAT Values") == "(10),(11)" wait_for_hdfs_objects(cluster, "/clickhouse1", init_objects + FILES_OVERHEAD_PER_PART_COMPACT) - node1.query("DROP TABLE IF EXISTS move_test NO DELAY") - node2.query("DROP TABLE IF EXISTS move_test NO DELAY") - @pytest.mark.parametrize( ("storage_policy"), ["hybrid", "tiered", "tiered_copy"] @@ -162,8 +165,6 @@ def test_hdfs_zero_copy_with_ttl_move(cluster, storage_policy): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] - node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY") - node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY") node1.query( Template(""" CREATE TABLE ttl_move_test ON CLUSTER test_cluster (dt DateTime, id Int64) @@ -185,17 +186,11 @@ def test_hdfs_zero_copy_with_ttl_move(cluster, storage_policy): assert node1.query("SELECT id FROM ttl_move_test ORDER BY id FORMAT Values") == "(10),(11)" assert node2.query("SELECT id FROM ttl_move_test ORDER BY id FORMAT Values") == "(10),(11)" - node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY") - node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY") - def test_hdfs_zero_copy_with_ttl_delete(cluster): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] - node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY") - node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY") - node1.query( """ CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (dt DateTime, id Int64) @@ -216,6 +211,3 @@ def test_hdfs_zero_copy_with_ttl_delete(cluster): assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)" assert node1.query("SELECT id FROM ttl_delete_test ORDER BY id FORMAT Values") == "(11)" assert node2.query("SELECT id FROM ttl_delete_test ORDER BY id FORMAT Values") == "(11)" - - node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY") - node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")