Merge pull request #3103 from yandex/dont-fetch-unchanged-mutated-parts

Don't fetch parts not changed by mutations
This commit is contained in:
alexey-milovidov 2018-09-13 04:00:31 +03:00 committed by GitHub
commit fd630179ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 130 additions and 13 deletions

View File

@ -1866,12 +1866,16 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(
return nullptr; return nullptr;
} }
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const MergeTreePartInfo & part_info)
{
DataPartsLock data_parts_lock(data_parts_mutex);
return getActiveContainingPart(part_info, DataPartState::Committed, data_parts_lock);
}
MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name) MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String & part_name)
{ {
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version); auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
return getActiveContainingPart(part_info);
DataPartsLock data_parts_lock(data_parts_mutex);
return getActiveContainingPart(part_info, DataPartState::Committed, data_parts_lock);
} }

View File

@ -373,6 +373,7 @@ public:
/// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr. /// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name); DataPartPtr getActiveContainingPart(const String & part_name);
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info);
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock &lock); DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock &lock);
/// Returns all parts in specified partition /// Returns all parts in specified partition

View File

@ -100,6 +100,15 @@ struct MinimalisticDataPartChecksums
uint128 hash_of_uncompressed_files {}; uint128 hash_of_uncompressed_files {};
uint128 uncompressed_hash_of_compressed_files {}; uint128 uncompressed_hash_of_compressed_files {};
bool operator==(const MinimalisticDataPartChecksums & other) const
{
return num_compressed_files == other.num_compressed_files
&& num_uncompressed_files == other.num_uncompressed_files
&& hash_of_all_files == other.hash_of_all_files
&& hash_of_uncompressed_files == other.hash_of_uncompressed_files
&& uncompressed_hash_of_compressed_files == other.uncompressed_hash_of_compressed_files;
}
/// Is set only for old formats /// Is set only for old formats
std::unique_ptr<MergeTreeDataPartChecksums> full_checksums; std::unique_ptr<MergeTreeDataPartChecksums> full_checksums;

View File

@ -2713,7 +2713,9 @@ void StorageReplicatedMergeTree::updateQuorum(const String & part_name)
bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum) bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const String & replica_path, bool to_detached, size_t quorum)
{ {
if (auto part = data.getPartIfExists(part_name, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting})) const auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
if (auto part = data.getPartIfExists(part_info, {MergeTreeDataPart::State::Outdated, MergeTreeDataPart::State::Deleting}))
{ {
LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch"); LOG_DEBUG(log, "Part " << part->getNameWithState() << " should be deleted after previous attempt before fetch");
/// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt. /// Force immediate parts cleanup to delete the part that was left from the previous fetch attempt.
@ -2754,17 +2756,64 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
part_name, part, replaced_parts, nullptr); part_name, part, replaced_parts, nullptr);
}; };
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host")); MergeTreeData::DataPartPtr part_to_clone;
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef()); {
auto [user, password] = context.getInterserverCredentials(); /// If the desired part is a result of a part mutation, try to find the source part and compare
String interserver_scheme = context.getInterserverScheme(); /// its checksums to the checksums of the desired part. If they match, we can just clone the local part.
/// If we have the source part, its part_info will contain covered_part_info.
auto covered_part_info = part_info;
covered_part_info.mutation = 0;
auto source_part = data.getActiveContainingPart(covered_part_info);
if (source_part)
{
MinimalisticDataPartChecksums source_part_checksums;
source_part_checksums.computeTotalChecksums(source_part->checksums);
String desired_checksums_str = getZooKeeper()->get(replica_path + "/parts/" + part_name + "/checksums");
auto desired_checksums = MinimalisticDataPartChecksums::deserializeFrom(desired_checksums_str);
if (source_part_checksums == desired_checksums)
{
LOG_TRACE(log, "Found local part " << source_part->name << " with the same checksums as " << part_name);
part_to_clone = source_part;
}
}
}
std::function<MergeTreeData::MutableDataPartPtr()> get_part;
if (part_to_clone)
{
get_part = [&, part_to_clone]()
{
return data.cloneAndLoadDataPart(part_to_clone, "tmp_clone_", part_info);
};
}
else
{
ReplicatedMergeTreeAddress address(getZooKeeper()->get(replica_path + "/host"));
auto timeouts = ConnectionTimeouts::getHTTPTimeouts(context.getSettingsRef());
auto [user, password] = context.getInterserverCredentials();
String interserver_scheme = context.getInterserverScheme();
get_part = [&, address, timeouts, user, password, interserver_scheme]()
{
if (interserver_scheme != address.scheme)
throw Exception("Interserver schemes are different: '" + interserver_scheme
+ "' != '" + address.scheme + "', can't fetch part from " + address.host,
ErrorCodes::LOGICAL_ERROR);
return fetcher.fetchPart(
part_name, replica_path,
address.host, address.replication_port,
timeouts, user, password, interserver_scheme, to_detached);
};
}
try try
{ {
if (interserver_scheme != address.scheme) part = get_part();
throw Exception("Interserver schemes are different '" + interserver_scheme + "' != '" + address.scheme + "', can't fetch part from " + address.host, ErrorCodes::LOGICAL_ERROR);
part = fetcher.fetchPart(part_name, replica_path, address.host, address.replication_port, timeouts, user, password, interserver_scheme, to_detached);
if (!to_detached) if (!to_detached)
{ {
@ -2809,7 +2858,11 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Strin
ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches); ProfileEvents::increment(ProfileEvents::ReplicatedPartFetches);
LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_path << (to_detached ? " (to 'detached' directory)" : "")); if (part_to_clone)
LOG_DEBUG(log, "Cloned part " << part_name << " from " << part_to_clone->name << (to_detached ? " (to 'detached' directory)" : ""));
else
LOG_DEBUG(log, "Fetched part " << part_name << " from " << replica_path << (to_detached ? " (to 'detached' directory)" : ""));
return true; return true;
} }

View File

@ -0,0 +1,8 @@
*** Check data after fetch of merged part ***
all_0_2_1 1
all_0_2_1 2
all_0_2_1 3
*** Check data after fetch/clone of mutated part ***
all_0_2_1_3 1
all_0_2_1_3 2
all_0_2_1_3 3

View File

@ -0,0 +1,42 @@
DROP TABLE IF EXISTS test.fetches_r1;
DROP TABLE IF EXISTS test.fetches_r2;
CREATE TABLE test.fetches_r1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/fetches', 'r1') ORDER BY x;
CREATE TABLE test.fetches_r2(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/fetches', 'r2') ORDER BY x
SETTINGS prefer_fetch_merged_part_time_threshold=0,
prefer_fetch_merged_part_size_threshold=0;
INSERT INTO test.fetches_r1 VALUES (1);
INSERT INTO test.fetches_r1 VALUES (2);
INSERT INTO test.fetches_r1 VALUES (3);
SYSTEM SYNC REPLICA test.fetches_r2;
DETACH TABLE test.fetches_r2;
SET replication_alter_partitions_sync=0;
OPTIMIZE TABLE test.fetches_r1 PARTITION tuple() FINAL;
SYSTEM SYNC REPLICA test.fetches_r1;
-- After attach replica r2 should fetch the merged part from r1.
ATTACH TABLE test.fetches_r2;
SYSTEM SYNC REPLICA test.fetches_r2;
SELECT '*** Check data after fetch of merged part ***';
SELECT _part, * FROM test.fetches_r2 ORDER BY x;
DETACH TABLE test.fetches_r2;
-- Add mutation that doesn't change data.
ALTER TABLE test.fetches_r1 DELETE WHERE x = 0;
SYSTEM SYNC REPLICA test.fetches_r1;
-- After attach replica r2 should compare checksums for mutated part and clone the local part.
ATTACH TABLE test.fetches_r2;
SYSTEM SYNC REPLICA test.fetches_r2;
SELECT '*** Check data after fetch/clone of mutated part ***';
SELECT _part, * FROM test.fetches_r2 ORDER BY x;
DROP TABLE test.fetches_r1;
DROP TABLE test.fetches_r2;