From c3212f1c4677a905b41a9228cb675f5c172c803a Mon Sep 17 00:00:00 2001 From: Anton Ivashkin Date: Thu, 8 Apr 2021 21:27:56 +0300 Subject: [PATCH] Fix Zero-Copy replication with several S3 volumes (issue 22679) --- src/Storages/MergeTree/DataPartsExchange.cpp | 27 ++++++- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 14 +++- src/Storages/MergeTree/IMergeTreeDataPart.h | 28 ++++---- src/Storages/MergeTree/MergeTreeData.cpp | 17 ++++- .../configs/config.d/s3.xml | 32 +++++++++ .../test_s3_zero_copy_replication/test.py | 72 +++++++++++++++++-- 6 files changed, 167 insertions(+), 23 deletions(-) diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 862a3088f89..98f0423d334 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -410,9 +410,34 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( size_t sum_files_size = 0; readBinary(sum_files_size, in); IMergeTreeDataPart::TTLInfos ttl_infos; - /// Skip ttl infos, not required for S3 metadata String ttl_infos_string; readBinary(ttl_infos_string, in); + ReadBufferFromString ttl_infos_buffer(ttl_infos_string); + assertString("ttl format version: 1\n", ttl_infos_buffer); + ttl_infos.read(ttl_infos_buffer); + + ReservationPtr reservation + = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true); + if (!reservation) + reservation + = data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true); + if (reservation) + { + DiskPtr disk = reservation->getDisk(); + if (disk && disk->getType() == DiskType::Type::S3) + { + for (const auto & d : disks_s3) + { + if (d->getPath() == disk->getPath()) + { + Disks disks_tmp = { disk }; + disks_s3.swap(disks_tmp); + break; + } + } + } + } + String part_type = "Wide"; readStringBinary(part_type, in); if (part_type == "InMemory") diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 72ce05e7aab..2683357af3c 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -189,6 +189,9 @@ static void incrementStateMetric(IMergeTreeDataPart::State state) case IMergeTreeDataPart::State::DeleteOnDestroy: CurrentMetrics::add(CurrentMetrics::PartsDeleteOnDestroy); return; + case IMergeTreeDataPart::State::DeleteOnDestroyKeepS3: + CurrentMetrics::add(CurrentMetrics::PartsDeleteOnDestroy); + return; } } @@ -214,6 +217,9 @@ static void decrementStateMetric(IMergeTreeDataPart::State state) case IMergeTreeDataPart::State::DeleteOnDestroy: CurrentMetrics::sub(CurrentMetrics::PartsDeleteOnDestroy); return; + case IMergeTreeDataPart::State::DeleteOnDestroyKeepS3: + CurrentMetrics::sub(CurrentMetrics::PartsDeleteOnDestroy); + return; } } @@ -393,7 +399,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns) void IMergeTreeDataPart::removeIfNeeded() { - if (state == State::DeleteOnDestroy || is_temp) + if (state == State::DeleteOnDestroy || state == State::DeleteOnDestroyKeepS3 || is_temp) { try { @@ -416,9 +422,9 @@ void IMergeTreeDataPart::removeIfNeeded() } } - remove(false); + remove(state == State::DeleteOnDestroyKeepS3); - if (state == State::DeleteOnDestroy) + if (state == State::DeleteOnDestroy || state == State::DeleteOnDestroyKeepS3) { LOG_TRACE(storage.log, "Removed part from old location {}", path); } @@ -463,6 +469,8 @@ String IMergeTreeDataPart::stateToString(IMergeTreeDataPart::State state) return "Deleting"; case State::DeleteOnDestroy: return "DeleteOnDestroy"; + case State::DeleteOnDestroyKeepS3: + return "DeleteOnDestroyKeepS3"; } __builtin_unreachable(); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 03f6564788a..57738f8ea2d 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -202,22 +202,24 @@ public: * Part state should be modified under data_parts mutex. * * Possible state transitions: - * Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set - * Precommitted -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part) - * Precommitted -> Committed: we successfully committed a part to active dataset - * Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION - * Outdated -> Deleting: a cleaner selected this part for deletion - * Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion - * Committed -> DeleteOnDestroy if part was moved to another disk + * Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set + * Precommitted -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part) + * Precommitted -> Committed: we successfully committed a part to active dataset + * Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION + * Outdated -> Deleting: a cleaner selected this part for deletion + * Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion + * Committed -> DeleteOnDestroy if part was moved to another disk + * Committed -> DeleteOnDestroyKeepS3 if part was moved to another disk but shared data on S3 */ enum class State { - Temporary, /// the part is generating now, it is not in data_parts list - PreCommitted, /// the part is in data_parts, but not used for SELECTs - Committed, /// active data part, used by current and upcoming SELECTs - Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes - Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner - DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor + Temporary, /// the part is generating now, it is not in data_parts list + PreCommitted, /// the part is in data_parts, but not used for SELECTs + Committed, /// active data part, used by current and upcoming SELECTs + Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes + Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner + DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor + DeleteOnDestroyKeepS3, /// same as DeleteOnDestroy but shared S3 data should be keeped }; using TTLInfo = MergeTreeDataPartTTLInfo; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index d61de13b604..0f142e07f7d 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -2611,7 +2611,22 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy) if (active_part_it == data_parts_by_info.end()) throw Exception("Cannot swap part '" + part_copy->name + "', no such active part.", ErrorCodes::NO_SUCH_DATA_PART); - modifyPartState(original_active_part, DataPartState::DeleteOnDestroy); + /// We do not check allow_s3_zero_copy_replication here because data may be shared + /// when allow_s3_zero_copy_replication turned on and off again + bool keep_s3 = false; + + if (original_active_part->volume->getDisk()->getType() == DiskType::Type::S3) + { + if (part_copy->volume->getDisk()->getType() == DiskType::Type::S3 + && original_active_part->getUniqueId() == part_copy->getUniqueId()) + { /// May be when several volumes use the same S3 storage + keep_s3 = true; + } + else + keep_s3 = !unlockSharedData(*original_active_part); + } + + modifyPartState(original_active_part, keep_s3 ? DataPartState::DeleteOnDestroyKeepS3 : DataPartState::DeleteOnDestroy); data_parts_indexes.erase(active_part_it); auto part_it = data_parts_indexes.insert(part_copy).first; diff --git a/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml b/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml index 88eb49d9f17..ee990ee9012 100644 --- a/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml +++ b/tests/integration/test_s3_zero_copy_replication/configs/config.d/s3.xml @@ -8,6 +8,18 @@ minio minio123 + + s3 + http://minio1:9001/root/data/ + minio + minio123 + + + s3 + http://minio1:9001/root/data2/ + minio + minio123 + @@ -27,6 +39,26 @@ + + +
+ s31 +
+ + s32 + +
+
+ + +
+ s31 +
+ + s31_again + +
+
diff --git a/tests/integration/test_s3_zero_copy_replication/test.py b/tests/integration/test_s3_zero_copy_replication/test.py index 5bc30ab1d6b..423bf3f8c03 100644 --- a/tests/integration/test_s3_zero_copy_replication/test.py +++ b/tests/integration/test_s3_zero_copy_replication/test.py @@ -27,10 +27,10 @@ def cluster(): cluster.shutdown() -def get_large_objects_count(cluster, size=100): +def get_large_objects_count(cluster, folder='data', size=100): minio = cluster.minio_client counter = 0 - for obj in minio.list_objects(cluster.minio_bucket, 'data/'): + for obj in minio.list_objects(cluster.minio_bucket, '{}/'.format(folder)): if obj.size >= size: counter = counter + 1 return counter @@ -69,7 +69,7 @@ def test_s3_zero_copy_replication(cluster, policy): # Based on version 20.x - two parts assert get_large_objects_count(cluster) == 2 - node1.query("OPTIMIZE TABLE s3_test") + node1.query("OPTIMIZE TABLE s3_test FINAL") time.sleep(1) @@ -119,7 +119,7 @@ def test_s3_zero_copy_on_hybrid_storage(cluster): assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','default')" # Total objects in S3 - s3_objects = get_large_objects_count(cluster, 0) + s3_objects = get_large_objects_count(cluster, size=0) node2.query("ALTER TABLE hybrid_test MOVE PARTITION ID 'all' TO DISK 's31'") @@ -127,10 +127,72 @@ def test_s3_zero_copy_on_hybrid_storage(cluster): assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')" # Check that after moving partition on node2 no new obects on s3 - assert get_large_objects_count(cluster, 0) == s3_objects + assert get_large_objects_count(cluster, size=0) == s3_objects assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')" assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')" node1.query("DROP TABLE IF EXISTS hybrid_test NO DELAY") node2.query("DROP TABLE IF EXISTS hybrid_test NO DELAY") + + +@pytest.mark.parametrize( + "storage_policy", ["tiered", "tiered_copy"] +) +def test_s3_zero_copy_with_ttl_move(cluster, storage_policy): + node1 = cluster.instances["node1"] + node2 = cluster.instances["node2"] + + for i in range(10): + node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY") + node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY") + + node1.query( + """ + CREATE TABLE ttl_move_test ON CLUSTER test_cluster (d UInt64, d1 DateTime) + ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_move_test', '{}') + ORDER BY d + TTL d1 + INTERVAL 2 DAY TO VOLUME 'external' + SETTINGS storage_policy='{}' + """ + .format('{replica}', storage_policy) + ) + + node1.query("INSERT INTO ttl_move_test VALUES (10, now() - INTERVAL 3 DAY)") + node1.query("INSERT INTO ttl_move_test VALUES (11, now() - INTERVAL 1 DAY)") + + assert node1.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)" + assert node2.query("SELECT count() FROM ttl_move_test FORMAT Values") == "(2)" + assert node1.query("SELECT d FROM ttl_move_test ORDER BY d1 FORMAT Values") == "(10),(11)" + assert node2.query("SELECT d FROM ttl_move_test ORDER BY d1 FORMAT Values") == "(10),(11)" + + node1.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY") + node2.query("DROP TABLE IF EXISTS ttl_move_test NO DELAY") + + +def test_s3_zero_copy_with_ttl_delete(cluster): + node1 = cluster.instances["node1"] + node2 = cluster.instances["node2"] + + for i in range(10): + node1.query( + """ + CREATE TABLE ttl_delete_test ON CLUSTER test_cluster (d UInt64, d1 DateTime) + ENGINE=ReplicatedMergeTree('/clickhouse/tables/ttl_delete_test', '{}') + ORDER BY d + TTL d1 + INTERVAL 2 DAY + SETTINGS storage_policy='tiered' + """ + .format('{replica}') + ) + + node1.query("INSERT INTO ttl_delete_test VALUES (10, now() - INTERVAL 3 DAY)") + node1.query("INSERT INTO ttl_delete_test VALUES (11, now() - INTERVAL 1 DAY)") + + assert node1.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)" + assert node2.query("SELECT count() FROM ttl_delete_test FORMAT Values") == "(1)" + assert node1.query("SELECT d FROM ttl_delete_test ORDER BY d1 FORMAT Values") == "(11)" + assert node2.query("SELECT d FROM ttl_delete_test ORDER BY d1 FORMAT Values") == "(11)" + + node1.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY") + node2.query("DROP TABLE IF EXISTS ttl_delete_test NO DELAY")