From f50d46fb8cd0700925f8ccacd4cc698d79390ed8 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 3 Oct 2022 22:12:43 +0200 Subject: [PATCH] Revert "Merge pull request #41832 from ClickHouse/make_copy_instead_of_hardlink" This reverts commit 0097f15ee7399fc84dc3d878d59d86c975691dd8, reversing changes made to 4e422b804644188e5a33122a6a8d5c5fc636b496. --- .../MergeTree/DataPartStorageOnDisk.cpp | 6 +- .../MergeTree/DataPartStorageOnDisk.h | 3 +- src/Storages/MergeTree/IDataPartStorage.h | 7 +- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 9 +- src/Storages/MergeTree/MergeTreeData.cpp | 12 +- src/Storages/MergeTree/MergeTreeData.h | 2 +- src/Storages/MergeTree/MutateTask.cpp | 37 +--- src/Storages/MergeTree/localBackup.cpp | 13 +- src/Storages/MergeTree/localBackup.h | 2 +- src/Storages/StorageMergeTree.cpp | 4 +- src/Storages/StorageReplicatedMergeTree.cpp | 9 +- tests/integration/helpers/cluster.py | 41 +---- .../test_s3_zero_copy_replication/test.py | 159 ++---------------- 13 files changed, 37 insertions(+), 267 deletions(-) diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp index e2a2f3f793f..b002b3dd1c6 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.cpp @@ -742,14 +742,12 @@ DataPartStoragePtr DataPartStorageOnDisk::freeze( const std::string & dir_path, bool make_source_readonly, std::function save_metadata_callback, - bool copy_instead_of_hardlink, - const NameSet & files_to_copy_instead_of_hardlinks) const - + bool copy_instead_of_hardlink) const { auto disk = volume->getDisk(); disk->createDirectories(to); - localBackup(disk, getRelativePath(), fs::path(to) / dir_path, make_source_readonly, {}, copy_instead_of_hardlink, files_to_copy_instead_of_hardlinks); + localBackup(disk, getRelativePath(), fs::path(to) / dir_path, make_source_readonly, {}, copy_instead_of_hardlink); if (save_metadata_callback) save_metadata_callback(disk); diff --git a/src/Storages/MergeTree/DataPartStorageOnDisk.h b/src/Storages/MergeTree/DataPartStorageOnDisk.h index adf1b78cdfb..51b557767d4 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDisk.h +++ b/src/Storages/MergeTree/DataPartStorageOnDisk.h @@ -100,8 +100,7 @@ public: const std::string & dir_path, bool make_source_readonly, std::function save_metadata_callback, - bool copy_instead_of_hardlink, - const NameSet & files_to_copy_instead_of_hardlinks) const override; + bool copy_instead_of_hardlink) const override; DataPartStoragePtr clone( const std::string & to, diff --git a/src/Storages/MergeTree/IDataPartStorage.h b/src/Storages/MergeTree/IDataPartStorage.h index 17af6dd2909..bd449d46075 100644 --- a/src/Storages/MergeTree/IDataPartStorage.h +++ b/src/Storages/MergeTree/IDataPartStorage.h @@ -192,17 +192,12 @@ public: /// Creates hardlinks into 'to/dir_path' for every file in data part. /// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed. - /// Some files can be copied instead of hardlinks. It's because of details of zero copy replication - /// implementation which relies on paths of some blobs in S3. For example if we want to hardlink - /// the whole part during mutation we shouldn't hardlink checksums.txt, because otherwise - /// zero-copy locks for different parts will be on the same path in zookeeper. virtual std::shared_ptr freeze( const std::string & to, const std::string & dir_path, bool make_source_readonly, std::function save_metadata_callback, - bool copy_instead_of_hardlink, - const NameSet & files_to_copy_instead_of_hardlinks) const = 0; + bool copy_instead_of_hardlink) const = 0; /// Make a full copy of a data part into 'to/dir_path' (possibly to a different disk). virtual std::shared_ptr clone( diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 46323f12305..e4c54b933da 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1516,19 +1516,12 @@ void IMergeTreeDataPart::renameToDetached(const String & prefix, DataPartStorage void IMergeTreeDataPart::makeCloneInDetached(const String & prefix, const StorageMetadataPtr & /*metadata_snapshot*/) const { - auto storage_settings = storage.getSettings(); - - /// In case of zero-copy replication we copy directory instead of hardlinks - /// because hardlinks tracking doesn't work for detached parts. - bool copy_instead_of_hardlink = isStoredOnRemoteDiskWithZeroCopySupport() && storage.supportsReplication() && storage_settings->allow_remote_fs_zero_copy_replication; - data_part_storage->freeze( storage.relative_data_path, getRelativePathForDetachedPart(prefix), /*make_source_readonly*/ true, {}, - copy_instead_of_hardlink, - {}); + /*copy_instead_of_hardlink*/ false); } DataPartStoragePtr IMergeTreeDataPart::makeCloneOnDisk(const DiskPtr & disk, const String & directory_name) const diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index ea7b87a5a58..01ce4e8ed6c 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -6177,8 +6177,7 @@ std::pair MergeTreeData::cloneAn const StorageMetadataPtr & metadata_snapshot, const MergeTreeTransactionPtr & txn, HardlinkedFiles * hardlinked_files, - bool copy_instead_of_hardlink, - const NameSet & files_to_copy_instead_of_hardlinks) + bool copy_instead_of_hardlink) { /// Check that the storage policy contains the disk where the src_part is located. bool does_storage_policy_allow_same_disk = false; @@ -6222,7 +6221,7 @@ std::pair MergeTreeData::cloneAn std::string(fs::path(src_part_storage->getFullRootPath()) / tmp_dst_part_name), with_copy); - auto dst_part_storage = src_part_storage->freeze(relative_data_path, tmp_dst_part_name, /* make_source_readonly */ false, {}, copy_instead_of_hardlink, files_to_copy_instead_of_hardlinks); + auto dst_part_storage = src_part_storage->freeze(relative_data_path, tmp_dst_part_name, /* make_source_readonly */ false, {}, /* copy_instead_of_hardlinks */ copy_instead_of_hardlink); auto dst_data_part = createPart(dst_part_name, dst_part_info, dst_part_storage); @@ -6233,9 +6232,7 @@ std::pair MergeTreeData::cloneAn for (auto it = src_part->data_part_storage->iterate(); it->isValid(); it->next()) { - if (!files_to_copy_instead_of_hardlinks.contains(it->name()) - && it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME - && it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME) + if (it->name() != IMergeTreeDataPart::DELETE_ON_DESTROY_MARKER_FILE_NAME && it->name() != IMergeTreeDataPart::TXN_VERSION_METADATA_FILE_NAME) hardlinked_files->hardlinks_from_source_part.insert(it->name()); } } @@ -6410,8 +6407,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher( part->data_part_storage->getPartDirectory(), /*make_source_readonly*/ true, callback, - /*copy_instead_of_hardlink*/ false, - {}); + /*copy_instead_of_hardlink*/ false); part->is_frozen.store(true, std::memory_order_relaxed); result.push_back(PartitionCommandResultInfo{ diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index c3a70a9893b..a38499b786c 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -790,7 +790,7 @@ public: const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix, const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot, const MergeTreeTransactionPtr & txn, HardlinkedFiles * hardlinked_files, - bool copy_instead_of_hardlink, const NameSet & files_to_copy_instead_of_hardlinks); + bool copy_instead_of_hardlink); virtual std::vector getMutationsStatus() const = 0; diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 9f3c3100349..384a649a174 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -1507,24 +1507,8 @@ bool MutateTask::prepare() if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations( ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading))) { - NameSet files_to_copy_instead_of_hardlinks; - auto settings_ptr = ctx->data->getSettings(); - /// In zero-copy replication checksums file path in s3 (blob path) is used for zero copy locks in ZooKeeper. If we will hardlink checksums file, we will have the same blob path - /// and two different parts (source and new mutated part) will use the same locks in ZooKeeper. To avoid this we copy checksums.txt to generate new blob path. - /// Example: - /// part: all_0_0_0/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas - /// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name - /// ^ part name don't participate in lock path - /// In case of full hardlink we will have: - /// part: all_0_0_0_1/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas - /// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name - /// So we need to copy to have a new name - bool copy_checksumns = ctx->data->supportsReplication() && settings_ptr->allow_remote_fs_zero_copy_replication && ctx->source_part->isStoredOnRemoteDiskWithZeroCopySupport(); - if (copy_checksumns) - files_to_copy_instead_of_hardlinks.insert(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK); - LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {}", ctx->source_part->name, ctx->future_part->part_info.mutation); - auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false, files_to_copy_instead_of_hardlinks); + auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false); ctx->temporary_directory_lock = std::move(lock); promise.set_value(std::move(part)); return false; @@ -1637,24 +1621,7 @@ bool MutateTask::prepare() LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {} (optimized)", ctx->source_part->name, ctx->future_part->part_info.mutation); /// new_data_part is not used here, another part is created instead (see the comment above) ctx->temporary_directory_lock = {}; - - /// In zero-copy replication checksums file path in s3 (blob path) is used for zero copy locks in ZooKeeper. If we will hardlink checksums file, we will have the same blob path - /// and two different parts (source and new mutated part) will use the same locks in ZooKeeper. To avoid this we copy checksums.txt to generate new blob path. - /// Example: - /// part: all_0_0_0/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas - /// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name - /// ^ part name don't participate in lock path - /// In case of full hardlink we will have: - /// part: all_0_0_0_1/checksums.txt -> /s3/blobs/shjfgsaasdasdasdasdasdas - /// locks path in zk: /zero_copy/tbl_id/s3_blobs_shjfgsaasdasdasdasdasdas/replica_name - /// So we need to copy to have a new name - NameSet files_to_copy_instead_of_hardlinks; - auto settings_ptr = ctx->data->getSettings(); - bool copy_checksumns = ctx->data->supportsReplication() && settings_ptr->allow_remote_fs_zero_copy_replication && ctx->source_part->isStoredOnRemoteDiskWithZeroCopySupport(); - if (copy_checksumns) - files_to_copy_instead_of_hardlinks.insert(IMergeTreeDataPart::FILE_FOR_REFERENCES_CHECK); - - auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_mut_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false, files_to_copy_instead_of_hardlinks); + auto [part, lock] = ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_mut_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false); ctx->temporary_directory_lock = std::move(lock); promise.set_value(std::move(part)); return false; diff --git a/src/Storages/MergeTree/localBackup.cpp b/src/Storages/MergeTree/localBackup.cpp index 3559eff1f6b..10aff144914 100644 --- a/src/Storages/MergeTree/localBackup.cpp +++ b/src/Storages/MergeTree/localBackup.cpp @@ -19,7 +19,7 @@ namespace void localBackupImpl( const DiskPtr & disk, const String & source_path, const String & destination_path, bool make_source_readonly, size_t level, - std::optional max_level, const NameSet & files_to_copy_instead_of_hardlinks) + std::optional max_level) { if (max_level && level > *max_level) return; @@ -38,14 +38,11 @@ void localBackupImpl( { if (make_source_readonly) disk->setReadOnly(source); - if (files_to_copy_instead_of_hardlinks.contains(it->name())) - disk->copyFile(source, *disk, destination); - else - disk->createHardLink(source, destination); + disk->createHardLink(source, destination); } else { - localBackupImpl(disk, source, destination, make_source_readonly, level + 1, max_level, files_to_copy_instead_of_hardlinks); + localBackupImpl(disk, source, destination, make_source_readonly, level + 1, max_level); } } } @@ -89,7 +86,7 @@ private: void localBackup( const DiskPtr & disk, const String & source_path, const String & destination_path, bool make_source_readonly, - std::optional max_level, bool copy_instead_of_hardlinks, const NameSet & files_to_copy_intead_of_hardlinks) + std::optional max_level, bool copy_instead_of_hardlinks) { if (disk->exists(destination_path) && !disk->isDirectoryEmpty(destination_path)) { @@ -112,7 +109,7 @@ void localBackup( if (copy_instead_of_hardlinks) disk->copyDirectoryContent(source_path, disk, destination_path); else - localBackupImpl(disk, source_path, destination_path, make_source_readonly, 0, max_level, files_to_copy_intead_of_hardlinks); + localBackupImpl(disk, source_path, destination_path, make_source_readonly, 0, max_level); } catch (const DB::ErrnoException & e) { diff --git a/src/Storages/MergeTree/localBackup.h b/src/Storages/MergeTree/localBackup.h index 74b188daff6..2eac45ac6a3 100644 --- a/src/Storages/MergeTree/localBackup.h +++ b/src/Storages/MergeTree/localBackup.h @@ -20,6 +20,6 @@ namespace DB * If max_level is specified, than only files which depth relative source_path less or equal max_level will be copied. * So, if max_level=0 than only direct file child are copied. */ - void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, bool make_source_readonly = true, std::optional max_level = {}, bool copy_instead_of_hardlinks = false, const NameSet & files_to_copy_intead_of_hardlinks = {}); +void localBackup(const DiskPtr & disk, const String & source_path, const String & destination_path, bool make_source_readonly = true, std::optional max_level = {}, bool copy_instead_of_hardlinks = false); } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index aea853b6c39..e4062734352 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1569,7 +1569,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con Int64 temp_index = insert_increment.get(); MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); - auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, local_context->getCurrentTransaction(), {}, false, {}); + auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, local_context->getCurrentTransaction(), {}, false); dst_parts.emplace_back(std::move(dst_part)); dst_parts_locks.emplace_back(std::move(part_lock)); } @@ -1664,7 +1664,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const Int64 temp_index = insert_increment.get(); MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level); - auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, local_context->getCurrentTransaction(), {}, false, {}); + auto [dst_part, part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, local_context->getCurrentTransaction(), {}, false); dst_parts.emplace_back(std::move(dst_part)); dst_parts_locks.emplace_back(std::move(part_lock)); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 3ce20fff239..beb32f043da 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -2164,7 +2164,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry) throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED); auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk( - part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &part_desc->hardlinked_files, false, {}); + part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &part_desc->hardlinked_files, false); part_desc->res_part = std::move(res_part); part_desc->temporary_part_lock = std::move(temporary_part_lock); } @@ -3914,7 +3914,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora { get_part = [&, part_to_clone]() { - auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, false, {}); + auto [cloned_part, lock] = cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, false); part_to_clone_lock = std::move(lock); return cloned_part; }; @@ -6495,7 +6495,8 @@ void StorageReplicatedMergeTree::replacePartitionFrom( bool copy_instead_of_hardlink = storage_settings_ptr->allow_remote_fs_zero_copy_replication && src_part->isStoredOnRemoteDiskWithZeroCopySupport(); - auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink, {}); + auto [dst_part, part_lock] = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink); + src_parts.emplace_back(src_part); dst_parts.emplace_back(dst_part); dst_parts_locks.emplace_back(std::move(part_lock)); @@ -6725,7 +6726,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta bool copy_instead_of_hardlink = storage_settings_ptr->allow_remote_fs_zero_copy_replication && src_part->isStoredOnRemoteDiskWithZeroCopySupport(); - auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink, {}); + auto [dst_part, dst_part_lock] = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, NO_TRANSACTION_PTR, &hardlinked_files, copy_instead_of_hardlink); src_parts.emplace_back(src_part); dst_parts.emplace_back(dst_part); diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index c987ca292c1..e2cad8436a8 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -816,6 +816,7 @@ class ClickHouseCluster: env_variables[f"keeper_config_dir{i}"] = configs_dir env_variables[f"keeper_db_dir{i}"] = coordination_dir self.zookeeper_dirs_to_create += [logs_dir, configs_dir, coordination_dir] + logging.debug(f"DEBUG KEEPER: {self.zookeeper_dirs_to_create}") self.with_zookeeper = True self.base_cmd.extend(["--file", keeper_docker_compose_path]) @@ -4107,9 +4108,6 @@ class ClickHouseInstance: def get_backuped_s3_objects(self, disk, backup_name): path = f"/var/lib/clickhouse/disks/{disk}/shadow/{backup_name}/store" self.wait_for_path_exists(path, 10) - return self.get_s3_objects(path) - - def get_s3_objects(self, path): command = [ "find", path, @@ -4122,45 +4120,8 @@ class ClickHouseInstance: "{}", ";", ] - return self.exec_in_container(command).split("\n") - def get_s3_data_objects(self, path): - command = [ - "find", - path, - "-type", - "f", - "-name", - "*.bin", - "-exec", - "grep", - "-o", - "r[01]\\{64\\}-file-[[:lower:]]\\{32\\}", - "{}", - ";", - ] - return self.exec_in_container(command).split("\n") - - def get_table_objects(self, table, database=None): - objects = [] - database_query = "" - if database: - database_query = f"AND database='{database}'" - data_paths = self.query( - f""" - SELECT arrayJoin(data_paths) - FROM system.tables - WHERE name='{table}' - {database_query} - """ - ) - paths = data_paths.split("\n") - for path in paths: - if path: - objects = objects + self.get_s3_data_objects(path) - return objects - class ClickHouseKiller(object): def __init__(self, clickhouse_node): diff --git a/tests/integration/test_s3_zero_copy_replication/test.py b/tests/integration/test_s3_zero_copy_replication/test.py index 1c559312105..860b83d4ed1 100644 --- a/tests/integration/test_s3_zero_copy_replication/test.py +++ b/tests/integration/test_s3_zero_copy_replication/test.py @@ -8,12 +8,11 @@ from helpers.cluster import ClickHouseCluster logging.getLogger().setLevel(logging.INFO) logging.getLogger().addHandler(logging.StreamHandler()) -cluster = ClickHouseCluster(__file__) - @pytest.fixture(scope="module") -def started_cluster(): +def cluster(): try: + cluster = ClickHouseCluster(__file__) cluster.add_instance( "node1", main_configs=["configs/config.d/s3.xml"], @@ -97,7 +96,7 @@ def wait_for_active_parts(node, num_expected_parts, table_name, timeout=30): # Result of `get_large_objects_count` can be changed in other tests, so run this case at the beginning @pytest.mark.order(0) @pytest.mark.parametrize("policy", ["s3"]) -def test_s3_zero_copy_replication(started_cluster, policy): +def test_s3_zero_copy_replication(cluster, policy): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] @@ -154,7 +153,7 @@ def test_s3_zero_copy_replication(started_cluster, policy): @pytest.mark.skip(reason="Test is flaky (and never was stable)") -def test_s3_zero_copy_on_hybrid_storage(started_cluster): +def test_s3_zero_copy_on_hybrid_storage(cluster): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] @@ -269,9 +268,7 @@ def insert_large_data(node, table): ("tiered_copy", True, 3), ], ) -def test_s3_zero_copy_with_ttl_move( - started_cluster, storage_policy, large_data, iterations -): +def test_s3_zero_copy_with_ttl_move(cluster, storage_policy, large_data, iterations): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] @@ -336,7 +333,7 @@ def test_s3_zero_copy_with_ttl_move( (True, 3), ], ) -def test_s3_zero_copy_with_ttl_delete(started_cluster, large_data, iterations): +def test_s3_zero_copy_with_ttl_delete(cluster, large_data, iterations): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] @@ -418,22 +415,6 @@ def wait_mutations(node, table, seconds): assert mutations == "0\n" -def wait_for_clean_old_parts(node, table, seconds): - time.sleep(1) - while seconds > 0: - seconds -= 1 - parts = node.query( - f"SELECT count() FROM system.parts WHERE table='{table}' AND active=0" - ) - if parts == "0\n": - return - time.sleep(1) - parts = node.query( - f"SELECT count() FROM system.parts WHERE table='{table}' AND active=0" - ) - assert parts == "0\n" - - def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] @@ -454,8 +435,6 @@ def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template): node1.query("INSERT INTO unfreeze_test VALUES (0)") - wait_for_active_parts(node2, 1, "unfreeze_test") - node1.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup1'") node2.query("ALTER TABLE unfreeze_test FREEZE WITH NAME 'freeze_backup2'") wait_mutations(node1, "unfreeze_test", 10) @@ -493,11 +472,11 @@ def s3_zero_copy_unfreeze_base(cluster, unfreeze_query_template): node2.query("DROP TABLE IF EXISTS unfreeze_test NO DELAY") -def test_s3_zero_copy_unfreeze_alter(started_cluster): +def test_s3_zero_copy_unfreeze_alter(cluster): s3_zero_copy_unfreeze_base(cluster, "ALTER TABLE unfreeze_test UNFREEZE WITH NAME") -def test_s3_zero_copy_unfreeze_system(started_cluster): +def test_s3_zero_copy_unfreeze_system(cluster): s3_zero_copy_unfreeze_base(cluster, "SYSTEM UNFREEZE WITH NAME") @@ -586,17 +565,17 @@ def s3_zero_copy_drop_detached(cluster, unfreeze_query_template): check_objects_not_exisis(cluster, objects1) -def test_s3_zero_copy_drop_detached_alter(started_cluster): +def test_s3_zero_copy_drop_detached_alter(cluster): s3_zero_copy_drop_detached( cluster, "ALTER TABLE drop_detached_test UNFREEZE WITH NAME" ) -def test_s3_zero_copy_drop_detached_system(started_cluster): +def test_s3_zero_copy_drop_detached_system(cluster): s3_zero_copy_drop_detached(cluster, "SYSTEM UNFREEZE WITH NAME") -def test_s3_zero_copy_concurrent_merge(started_cluster): +def test_s3_zero_copy_concurrent_merge(cluster): node1 = cluster.instances["node1"] node2 = cluster.instances["node2"] @@ -641,119 +620,3 @@ def test_s3_zero_copy_concurrent_merge(started_cluster): for node in (node1, node2): assert node.query("select sum(id) from concurrent_merge").strip() == "1600" - - -def test_s3_zero_copy_keeps_data_after_mutation(started_cluster): - node1 = cluster.instances["node1"] - node2 = cluster.instances["node2"] - - node1.query("DROP TABLE IF EXISTS zero_copy_mutation NO DELAY") - node2.query("DROP TABLE IF EXISTS zero_copy_mutation NO DELAY") - - node1.query( - """ - CREATE TABLE zero_copy_mutation (id UInt64, value1 String, value2 String, value3 String) - ENGINE=ReplicatedMergeTree('/clickhouse/tables/zero_copy_mutation', '{replica}') - ORDER BY id - PARTITION BY (id % 4) - SETTINGS storage_policy='s3', - old_parts_lifetime=1000 - """ - ) - - node2.query( - """ - CREATE TABLE zero_copy_mutation (id UInt64, value1 String, value2 String, value3 String) - ENGINE=ReplicatedMergeTree('/clickhouse/tables/zero_copy_mutation', '{replica}') - ORDER BY id - PARTITION BY (id % 4) - SETTINGS storage_policy='s3', - old_parts_lifetime=1000 - """ - ) - - node1.query( - """ - INSERT INTO zero_copy_mutation - SELECT * FROM generateRandom('id UInt64, value1 String, value2 String, value3 String') limit 1000000 - """ - ) - - wait_for_active_parts(node2, 4, "zero_copy_mutation") - - objects1 = node1.get_table_objects("zero_copy_mutation") - check_objects_exisis(cluster, objects1) - - node1.query( - """ - ALTER TABLE zero_copy_mutation - ADD COLUMN valueX String MATERIALIZED value1 - """ - ) - - node1.query( - """ - ALTER TABLE zero_copy_mutation - MATERIALIZE COLUMN valueX - """ - ) - - wait_mutations(node1, "zero_copy_mutation", 10) - wait_mutations(node2, "zero_copy_mutation", 10) - - # If bug present at least one node has metadata with incorrect ref_count values. - # But it may be any node depends on mutation execution order. - # We can try to find one, but this required knowledge about internal metadata structure. - # It can be change in future, so we do not find this node here. - # And with the bug test may be success sometimes. - nodeX = node1 - nodeY = node2 - - objectsY = nodeY.get_table_objects("zero_copy_mutation") - check_objects_exisis(cluster, objectsY) - - nodeX.query( - """ - ALTER TABLE zero_copy_mutation - DETACH PARTITION '0' - """ - ) - - nodeX.query( - """ - ALTER TABLE zero_copy_mutation - ATTACH PARTITION '0' - """ - ) - - wait_mutations(node1, "zero_copy_mutation", 10) - wait_mutations(node2, "zero_copy_mutation", 10) - - nodeX.query( - """ - DROP TABLE zero_copy_mutation SYNC - """ - ) - - # time to remove objects - time.sleep(10) - - nodeY.query( - """ - SELECT count() FROM zero_copy_mutation - WHERE value3 LIKE '%ab%' - """ - ) - - check_objects_exisis(cluster, objectsY) - - nodeY.query( - """ - DROP TABLE zero_copy_mutation SYNC - """ - ) - - # time to remove objects - time.sleep(10) - - check_objects_not_exisis(cluster, objectsY)