From 5b2b8d38fa50fd8b1f195c5890be8c103ad65b61 Mon Sep 17 00:00:00 2001 From: Pavel Kovalenko Date: Wed, 8 Apr 2020 11:41:13 +0300 Subject: [PATCH] Download part trough disk interface. --- src/Storages/MergeTree/DataPartsExchange.cpp | 29 ++++++++++---------- src/Storages/MergeTree/MergeTreeData.h | 3 ++ 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 6373c85a15d..4e40d4a5977 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -258,19 +258,20 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart( size_t files; readBinary(files, in); + auto disk = reservation->getDisk(); + static const String TMP_PREFIX = "tmp_fetch_"; String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_; - String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name; - String absolute_part_path = Poco::Path(data.getFullPathOnDisk(reservation->getDisk()) + relative_part_path + "/").absolute().toString(); - Poco::File part_file(absolute_part_path); + String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name; + String part_download_path = data.getRelativeDataPath() + part_relative_path + "/"; - if (part_file.exists()) - throw Exception("Directory " + absolute_part_path + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS); + if (disk->exists(part_download_path)) + throw Exception("Directory " + fullPath(disk, part_download_path) + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS); CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch}; - part_file.createDirectory(); + disk->createDirectories(part_download_path); MergeTreeData::DataPart::Checksums checksums; for (size_t i = 0; i < files; ++i) @@ -283,21 +284,21 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart( /// File must be inside "absolute_part_path" directory. /// Otherwise malicious ClickHouse replica may force us to write to arbitrary path. - String absolute_file_path = Poco::Path(absolute_part_path + file_name).absolute().toString(); - if (!startsWith(absolute_file_path, absolute_part_path)) - throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + absolute_part_path + ")." + String absolute_file_path = Poco::Path(part_download_path + file_name).absolute().toString(); + if (!startsWith(absolute_file_path, part_download_path)) + throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + part_download_path + ")." " This may happen if we are trying to download part from malicious replica or logical error.", ErrorCodes::INSECURE_PATH); - WriteBufferFromFile file_out(absolute_file_path); - HashingWriteBuffer hashing_out(file_out); + auto file_out = disk->writeFile(part_download_path + file_name); + HashingWriteBuffer hashing_out(*file_out); copyData(in, hashing_out, file_size, blocker.getCounter()); if (blocker.isCancelled()) { /// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout. /// And now we check it only between read chunks (in the `copyData` function). - part_file.remove(true); + disk->removeRecursive(part_download_path); throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED); } @@ -305,7 +306,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart( readPODBinary(expected_hash, in); if (expected_hash != hashing_out.getHash()) - throw Exception("Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path, + throw Exception("Checksum mismatch for file " + fullPath(disk, part_download_path + file_name) + " transferred from " + replica_path, ErrorCodes::CHECKSUM_DOESNT_MATCH); if (file_name != "checksums.txt" && @@ -315,7 +316,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart( assertEOF(in); - MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), relative_part_path); + MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), part_relative_path); new_data_part->is_temp = true; new_data_part->modification_time = time(nullptr); new_data_part->loadColumnsChecksumsIndexes(true, false); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 125a90d26e0..eb2a0dd8774 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -625,6 +625,9 @@ public: return storage_settings.get(); } + /// Get relative table path + String getRelativeDataPath() const { return relative_data_path; } + /// Get table path on disk String getFullPathOnDisk(const DiskPtr & disk) const;