mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Download part trough disk interface.
This commit is contained in:
parent
385e8839dc
commit
5b2b8d38fa
@ -258,19 +258,20 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
|
||||
size_t files;
|
||||
readBinary(files, in);
|
||||
|
||||
auto disk = reservation->getDisk();
|
||||
|
||||
static const String TMP_PREFIX = "tmp_fetch_";
|
||||
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
|
||||
|
||||
String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
|
||||
String absolute_part_path = Poco::Path(data.getFullPathOnDisk(reservation->getDisk()) + relative_part_path + "/").absolute().toString();
|
||||
Poco::File part_file(absolute_part_path);
|
||||
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
|
||||
String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
|
||||
|
||||
if (part_file.exists())
|
||||
throw Exception("Directory " + absolute_part_path + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
|
||||
if (disk->exists(part_download_path))
|
||||
throw Exception("Directory " + fullPath(disk, part_download_path) + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
|
||||
|
||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
|
||||
|
||||
part_file.createDirectory();
|
||||
disk->createDirectories(part_download_path);
|
||||
|
||||
MergeTreeData::DataPart::Checksums checksums;
|
||||
for (size_t i = 0; i < files; ++i)
|
||||
@ -283,21 +284,21 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
|
||||
|
||||
/// File must be inside "absolute_part_path" directory.
|
||||
/// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.
|
||||
String absolute_file_path = Poco::Path(absolute_part_path + file_name).absolute().toString();
|
||||
if (!startsWith(absolute_file_path, absolute_part_path))
|
||||
throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + absolute_part_path + ")."
|
||||
String absolute_file_path = Poco::Path(part_download_path + file_name).absolute().toString();
|
||||
if (!startsWith(absolute_file_path, part_download_path))
|
||||
throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + part_download_path + ")."
|
||||
" This may happen if we are trying to download part from malicious replica or logical error.",
|
||||
ErrorCodes::INSECURE_PATH);
|
||||
|
||||
WriteBufferFromFile file_out(absolute_file_path);
|
||||
HashingWriteBuffer hashing_out(file_out);
|
||||
auto file_out = disk->writeFile(part_download_path + file_name);
|
||||
HashingWriteBuffer hashing_out(*file_out);
|
||||
copyData(in, hashing_out, file_size, blocker.getCounter());
|
||||
|
||||
if (blocker.isCancelled())
|
||||
{
|
||||
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout.
|
||||
/// And now we check it only between read chunks (in the `copyData` function).
|
||||
part_file.remove(true);
|
||||
disk->removeRecursive(part_download_path);
|
||||
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
|
||||
}
|
||||
|
||||
@ -305,7 +306,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
|
||||
readPODBinary(expected_hash, in);
|
||||
|
||||
if (expected_hash != hashing_out.getHash())
|
||||
throw Exception("Checksum mismatch for file " + absolute_part_path + file_name + " transferred from " + replica_path,
|
||||
throw Exception("Checksum mismatch for file " + fullPath(disk, part_download_path + file_name) + " transferred from " + replica_path,
|
||||
ErrorCodes::CHECKSUM_DOESNT_MATCH);
|
||||
|
||||
if (file_name != "checksums.txt" &&
|
||||
@ -315,7 +316,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart(
|
||||
|
||||
assertEOF(in);
|
||||
|
||||
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), relative_part_path);
|
||||
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, reservation->getDisk(), part_relative_path);
|
||||
new_data_part->is_temp = true;
|
||||
new_data_part->modification_time = time(nullptr);
|
||||
new_data_part->loadColumnsChecksumsIndexes(true, false);
|
||||
|
@ -625,6 +625,9 @@ public:
|
||||
return storage_settings.get();
|
||||
}
|
||||
|
||||
/// Get relative table path
|
||||
String getRelativeDataPath() const { return relative_data_path; }
|
||||
|
||||
/// Get table path on disk
|
||||
String getFullPathOnDisk(const DiskPtr & disk) const;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user