diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 38340905e45..ea7a52876ed 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1866,12 +1866,16 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart( return nullptr; } +MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info) +{ + DataPartsLock data_parts_lock(data_parts_mutex); + return getActiveContainingPart(part_info, DataPartState::Committed, data_parts_lock); +} + MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name) { auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); - - DataPartsLock data_parts_lock(data_parts_mutex); - return getActiveContainingPart(part_info, DataPartState::Committed, data_parts_lock); + return getActiveContainingPart(part_info); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.h b/dbms/src/Storages/MergeTree/MergeTreeData.h index 2e51cfe09e1..09aff9dae1d 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.h +++ b/dbms/src/Storages/MergeTree/MergeTreeData.h @@ -373,6 +373,7 @@ public: /// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr. DataPartPtr getActiveContainingPart(const String & part_name); + DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info); DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock &lock); /// Returns all parts in specified partition diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataPartChecksum.h b/dbms/src/Storages/MergeTree/MergeTreeDataPartChecksum.h index ba4f9b88a66..9e6412b7bec 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataPartChecksum.h +++ b/dbms/src/Storages/MergeTree/MergeTreeDataPartChecksum.h @@ -100,6 +100,15 @@ struct MinimalisticDataPartChecksums uint128 hash_of_uncompressed_files {}; uint128 uncompressed_hash_of_compressed_files {}; + bool operator==(const MinimalisticDataPartChecksums & other) const + { + return num_compressed_files == other.num_compressed_files + && num_uncompressed_files == other.num_uncompressed_files + && hash_of_all_files == other.hash_of_all_files + && hash_of_uncompressed_files == other.hash_of_uncompressed_files + && uncompressed_hash_of_compressed_files == other.uncompressed_hash_of_compressed_files; + } + /// Is set only for old formats std::unique_ptr full_checksums; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index fdcc67ff09d..c79c8495e18 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -2713,7 +2713,9 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name) bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum) { - if (auto part = data.getPartIfExists(part_name, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting})) + const auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version); + + if (auto part = data.getPartIfExists(part_info, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting})) { LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch"); /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. @@ -2754,17 +2756,64 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin part_name, part, replaced_parts, nullptr); }; - ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host")); - auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()); - auto [user, password] = context.getInterserverCredentials(); - String interserver_scheme = context.getInterserverScheme(); + MergeTreeData::DataPartPtr part_to_clone; + { + /// If the desired part is a result of a part mutation, try to find the source part and compare + /// its checksums to the checksums of the desired part. If they match, we can just clone the local part. + + /// If we have the source part, its part_info will contain covered_part_info. + auto covered_part_info = part_info; + covered_part_info.mutation = 0; + auto source_part = data.getActiveContainingPart(covered_part_info); + + if (source_part) + { + MinimalisticDataPartChecksums source_part_checksums; + source_part_checksums.computeTotalChecksums(source_part->checksums); + + String desired_checksums_str = getZooKeeper()->get(replica_path + "/parts/" + part_name + "/checksums"); + auto desired_checksums = MinimalisticDataPartChecksums::deserializeFrom(desired_checksums_str); + if (source_part_checksums == desired_checksums) + { + LOG_TRACE(log, "Found local part " << source_part->name << " with the same checksums as " << part_name); + part_to_clone = source_part; + } + } + + } + + std::function get_part; + if (part_to_clone) + { + get_part = [&, part_to_clone]() + { + return data.cloneAndLoadDataPart(part_to_clone, "tmp_clone_", part_info); + }; + } + else + { + ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host")); + auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()); + auto [user, password] = context.getInterserverCredentials(); + String interserver_scheme = context.getInterserverScheme(); + + get_part = [&, address, timeouts, user, password, interserver_scheme]() + { + if (interserver_scheme != address.scheme) + throw Exception("Interserver schemes are different: '" + interserver_scheme + + "' != '" + address.scheme + "', can't fetch part from " + address.host, + ErrorCodes::LOGICAL_ERROR); + + return fetcher.fetchPart( + part_name, replica_path, + address.host, address.replication_port, + timeouts, user, password, interserver_scheme, to_detached); + }; + } try { - if (interserver_scheme != address.scheme) - throw Exception("Interserver schemes are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR); - - part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, user, password, interserver_scheme, to_detached); + part = get_part(); if (!to_detached) { @@ -2809,7 +2858,11 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches); - LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_path << (to_detached ? " (to 'detached' directory)" : "")); + if (part_to_clone) + LOG_DEBUG(log, "Cloned part " << part_name << " from " << part_to_clone->name << (to_detached ? " (to 'detached' directory)" : "")); + else + LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_path << (to_detached ? " (to 'detached' directory)" : "")); + return true; } diff --git a/dbms/tests/queries/0_stateless/00715_fetch_merged_or_mutated_part_zookeeper.reference b/dbms/tests/queries/0_stateless/00715_fetch_merged_or_mutated_part_zookeeper.reference new file mode 100644 index 00000000000..f226e9d3654 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00715_fetch_merged_or_mutated_part_zookeeper.reference @@ -0,0 +1,8 @@ +*** Check data after fetch of merged part *** +all_0_2_1 1 +all_0_2_1 2 +all_0_2_1 3 +*** Check data after fetch/clone of mutated part *** +all_0_2_1_3 1 +all_0_2_1_3 2 +all_0_2_1_3 3 diff --git a/dbms/tests/queries/0_stateless/00715_fetch_merged_or_mutated_part_zookeeper.sql b/dbms/tests/queries/0_stateless/00715_fetch_merged_or_mutated_part_zookeeper.sql new file mode 100644 index 00000000000..d7b485d46f0 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00715_fetch_merged_or_mutated_part_zookeeper.sql @@ -0,0 +1,42 @@ +DROP TABLE IF EXISTS test.fetches_r1; +DROP TABLE IF EXISTS test.fetches_r2; + +CREATE TABLE test.fetches_r1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/fetches', 'r1') ORDER BY x; +CREATE TABLE test.fetches_r2(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/fetches', 'r2') ORDER BY x + SETTINGS prefer_fetch_merged_part_time_threshold=0, + prefer_fetch_merged_part_size_threshold=0; + +INSERT INTO test.fetches_r1 VALUES (1); +INSERT INTO test.fetches_r1 VALUES (2); +INSERT INTO test.fetches_r1 VALUES (3); + +SYSTEM SYNC REPLICA test.fetches_r2; + +DETACH TABLE test.fetches_r2; + +SET replication_alter_partitions_sync=0; +OPTIMIZE TABLE test.fetches_r1 PARTITION tuple() FINAL; +SYSTEM SYNC REPLICA test.fetches_r1; + +-- After attach replica r2 should fetch the merged part from r1. +ATTACH TABLE test.fetches_r2; +SYSTEM SYNC REPLICA test.fetches_r2; + +SELECT '*** Check data after fetch of merged part ***'; +SELECT _part, * FROM test.fetches_r2 ORDER BY x; + +DETACH TABLE test.fetches_r2; + +-- Add mutation that doesn't change data. +ALTER TABLE test.fetches_r1 DELETE WHERE x = 0; +SYSTEM SYNC REPLICA test.fetches_r1; + +-- After attach replica r2 should compare checksums for mutated part and clone the local part. +ATTACH TABLE test.fetches_r2; +SYSTEM SYNC REPLICA test.fetches_r2; + +SELECT '*** Check data after fetch/clone of mutated part ***'; +SELECT _part, * FROM test.fetches_r2 ORDER BY x; + +DROP TABLE test.fetches_r1; +DROP TABLE test.fetches_r2;