ClickHouse/src/Storages/MergeTree/DataPartsExchange.cpp

841 lines
32 KiB
C++
Raw Normal View History

#include <Storages/MergeTree/DataPartsExchange.h>
2021-10-15 20:18:20 +00:00
#include <Formats/NativeWriter.h>
2021-07-05 03:32:56 +00:00
#include <Disks/IDiskRemote.h>
#include <Disks/SingleDiskVolume.h>
#include <Disks/createVolume.h>
#include <IO/HTTPCommon.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/HTTPServerResponse.h>
2020-04-29 17:14:49 +00:00
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
2021-05-26 20:37:44 +00:00
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/CurrentMetrics.h>
#include <Common/NetException.h>
#include <IO/createReadBufferFromFileBase.h>
2021-10-02 07:13:14 +00:00
#include <base/scope_guard.h>
#include <Poco/Net/HTTPRequest.h>
#include <boost/algorithm/string/join.hpp>
2021-07-05 03:32:56 +00:00
#include <iterator>
#include <regex>
2022-01-30 19:49:48 +00:00
#include <base/sort.h>
2014-07-22 13:49:52 +00:00
2021-04-27 00:05:43 +00:00
namespace fs = std::filesystem;
namespace CurrentMetrics
{
extern const Metric ReplicatedSend;
extern const Metric ReplicatedFetch;
}
2014-07-22 13:49:52 +00:00
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int NO_SUCH_DATA_PART;
extern const int ABORTED;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int CANNOT_WRITE_TO_OSTREAM;
extern const int CHECKSUM_DOESNT_MATCH;
extern const int INSECURE_PATH;
2020-05-05 01:27:31 +00:00
extern const int CORRUPTED_DATA;
extern const int LOGICAL_ERROR;
extern const int S3_ERROR;
2021-02-26 09:48:57 +00:00
extern const int INCORRECT_PART_TYPE;
2021-07-05 03:32:56 +00:00
extern const int ZERO_COPY_REPLICATION_ERROR;
}
2016-01-28 01:00:27 +00:00
namespace DataPartsExchange
{
namespace
{
2020-03-09 01:22:33 +00:00
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = 1;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = 2;
2020-05-14 20:08:15 +00:00
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE = 3;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION = 4;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5;
2021-07-05 03:32:56 +00:00
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7;
2021-08-27 12:17:58 +00:00
// Reserved for ALTER PRIMARY KEY
// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 8;
2019-09-04 16:00:20 +00:00
2016-01-28 01:00:27 +00:00
std::string getEndpointId(const std::string & node_id)
{
return "DataPartsExchange:" + node_id;
2016-01-28 01:00:27 +00:00
}
2020-10-27 13:09:14 +00:00
/// Simple functor for tracking fetch progress in system.replicated_fetches table.
2020-10-26 16:38:35 +00:00
struct ReplicatedFetchReadCallback
{
ReplicatedFetchList::Entry & replicated_fetch_entry;
2020-10-27 15:29:06 +00:00
explicit ReplicatedFetchReadCallback(ReplicatedFetchList::Entry & replicated_fetch_entry_)
2020-10-26 16:38:35 +00:00
: replicated_fetch_entry(replicated_fetch_entry_)
{}
void operator() (size_t bytes_count)
{
2020-10-27 12:24:10 +00:00
replicated_fetch_entry->bytes_read_compressed.store(bytes_count, std::memory_order_relaxed);
2020-10-30 08:52:11 +00:00
/// It's possible when we fetch part from very old clickhouse version
/// which doesn't send total size.
if (replicated_fetch_entry->total_size_bytes_compressed != 0)
{
replicated_fetch_entry->progress.store(
static_cast<double>(bytes_count) / replicated_fetch_entry->total_size_bytes_compressed,
std::memory_order_relaxed);
}
2020-10-26 16:38:35 +00:00
}
};
2016-01-28 01:00:27 +00:00
}
2021-05-26 20:37:44 +00:00
Service::Service(StorageReplicatedMergeTree & data_) :
data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
2016-01-28 01:00:27 +00:00
std::string Service::getId(const std::string & node_id) const
{
return getEndpointId(node_id);
2016-01-28 01:00:27 +00:00
}
void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, HTTPServerResponse & response)
2014-07-22 13:49:52 +00:00
{
int client_protocol_version = parse<int>(params.get("client_protocol_version", "0"));
2019-09-06 12:18:56 +00:00
2019-09-04 16:00:20 +00:00
String part_name = params.get("part");
2019-08-26 14:24:29 +00:00
const auto data_settings = data.getSettings();
2019-08-02 20:19:06 +00:00
/// Validation of the input that may come from malicious replica.
MergeTreePartInfo::fromPartName(part_name, data.format_version);
/// We pretend to work as older server version, to be sure that client will correctly process our version
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION))});
2019-09-06 12:18:56 +00:00
2020-05-23 22:24:01 +00:00
LOG_TRACE(log, "Sending part {}", part_name);
MergeTreeData::DataPartPtr part;
auto report_broken_part = [&]()
{
if (part && part->isProjectionPart())
{
data.reportBrokenPart(part->getParentPart()->name);
}
else
{
data.reportBrokenPart(part_name);
}
};
try
{
part = findPart(part_name);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
2020-04-29 17:14:49 +00:00
writeBinary(part->checksums.getTotalSizeOnDisk(), out);
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
WriteBufferFromOwnString ttl_infos_buffer;
part->ttl_infos.write(ttl_infos_buffer);
writeBinary(ttl_infos_buffer.str(), out);
}
2020-05-14 20:08:15 +00:00
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
writeStringBinary(part->getType().toString(), out);
2020-10-29 16:18:25 +00:00
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
writeUUIDText(part->uuid, out);
2021-07-05 03:32:56 +00:00
String remote_fs_metadata = parse<String>(params.get("remote_fs_metadata", ""));
std::regex re("\\s*,\\s*");
Strings capability(
std::sregex_token_iterator(remote_fs_metadata.begin(), remote_fs_metadata.end(), re, -1),
std::sregex_token_iterator());
2021-07-05 03:32:56 +00:00
if (data_settings->allow_remote_fs_zero_copy_replication &&
client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
2021-06-24 08:25:05 +00:00
{
2021-07-05 03:32:56 +00:00
auto disk = part->volume->getDisk();
2021-08-24 22:24:47 +00:00
auto disk_type = toString(disk->getType());
2021-07-05 03:32:56 +00:00
if (disk->supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end())
{
2021-07-05 03:32:56 +00:00
/// Send metadata if the receiver's capability covers the source disk type.
response.addCookie({"remote_fs_metadata", disk_type});
sendPartFromDiskRemoteMeta(part, out);
return;
}
}
2021-07-05 03:32:56 +00:00
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
{
const auto & projections = part->getProjectionParts();
writeBinary(projections.size(), out);
if (isInMemoryPart(part))
sendPartFromMemory(part, out, projections);
else
sendPartFromDisk(part, out, client_protocol_version, projections);
}
else
{
if (isInMemoryPart(part))
sendPartFromMemory(part, out);
else
2020-12-16 15:31:13 +00:00
sendPartFromDisk(part, out, client_protocol_version);
}
}
catch (const NetException &)
{
/// Network error or error on remote side. No need to enqueue part for check.
throw;
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::ABORTED && e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM)
report_broken_part();
throw;
}
catch (...)
{
report_broken_part();
throw;
}
2014-07-22 13:49:52 +00:00
}
void Service::sendPartFromMemory(
const MergeTreeData::DataPartPtr & part, WriteBuffer & out, const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
2020-04-29 17:14:49 +00:00
{
2020-06-26 11:30:23 +00:00
auto metadata_snapshot = data.getInMemoryMetadataPtr();
for (const auto & [name, projection] : projections)
{
auto projection_sample_block = metadata_snapshot->projections.get(name).sample_block;
auto part_in_memory = asInMemoryPart(projection);
if (!part_in_memory)
throw Exception("Projection " + name + " of part " + part->name + " is not stored in memory", ErrorCodes::LOGICAL_ERROR);
writeStringBinary(name, out);
projection->checksums.write(out);
2021-10-08 17:21:19 +00:00
NativeWriter block_out(out, 0, projection_sample_block);
block_out.write(part_in_memory->block);
}
2020-06-05 20:47:46 +00:00
auto part_in_memory = asInMemoryPart(part);
2020-04-29 17:14:49 +00:00
if (!part_in_memory)
2020-05-05 01:27:31 +00:00
throw Exception("Part " + part->name + " is not stored in memory", ErrorCodes::LOGICAL_ERROR);
2021-10-08 17:21:19 +00:00
NativeWriter block_out(out, 0, metadata_snapshot->getSampleBlock());
2020-05-05 01:27:31 +00:00
part->checksums.write(out);
2020-04-29 17:14:49 +00:00
block_out.write(part_in_memory->block);
2021-05-26 20:37:44 +00:00
data.getSendsThrottler()->add(part_in_memory->block.bytes());
2020-04-29 17:14:49 +00:00
}
MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
int client_protocol_version,
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
2020-04-29 17:14:49 +00:00
{
/// We'll take a list of files from the list of checksums.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Add files that are not in the checksum list.
auto file_names_without_checksums = part->getFileNamesWithoutChecksums();
for (const auto & file_name : file_names_without_checksums)
{
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION && file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
continue;
checksums.files[file_name] = {};
}
2020-04-29 17:14:49 +00:00
auto disk = part->volume->getDisk();
2020-04-29 17:14:49 +00:00
MergeTreeData::DataPart::Checksums data_checksums;
for (const auto & [name, projection] : part->getProjectionParts())
{
// Get rid of projection files
checksums.files.erase(name + ".proj");
auto it = projections.find(name);
if (it != projections.end())
{
writeStringBinary(name, out);
MergeTreeData::DataPart::Checksums projection_checksum = sendPartFromDisk(it->second, out, client_protocol_version);
data_checksums.addFile(name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
}
else if (part->checksums.has(name + ".proj"))
{
// We don't send this projection, just add out checksum to bypass the following check
const auto & our_checksum = part->checksums.files.find(name + ".proj")->second;
data_checksums.addFile(name + ".proj", our_checksum.file_size, our_checksum.file_hash);
}
}
2020-04-29 17:14:49 +00:00
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
String path = fs::path(part->getFullRelativePath()) / file_name;
2020-04-29 17:14:49 +00:00
UInt64 size = disk->getFileSize(path);
writeStringBinary(it.first, out);
writeBinary(size, out);
auto file_in = disk->readFile(path);
HashingWriteBuffer hashing_out(out);
2021-05-26 20:37:44 +00:00
copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler());
2020-04-29 17:14:49 +00:00
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
if (hashing_out.count() != size)
2022-02-01 02:49:40 +00:00
throw Exception("Unexpected size of file " + path, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
2020-04-29 17:14:49 +00:00
writePODBinary(hashing_out.getHash(), out);
if (!file_names_without_checksums.count(file_name))
2020-04-29 17:14:49 +00:00
data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
}
part->checksums.checkEqual(data_checksums, false);
return data_checksums;
2020-04-29 17:14:49 +00:00
}
2021-06-24 08:25:05 +00:00
void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part, WriteBuffer & out)
{
/// We'll take a list of files from the list of checksums.
MergeTreeData::DataPart::Checksums checksums = part->checksums;
/// Add files that are not in the checksum list.
auto file_names_without_checksums = part->getFileNamesWithoutChecksums();
for (const auto & file_name : file_names_without_checksums)
checksums.files[file_name] = {};
auto disk = part->volume->getDisk();
2021-07-05 03:32:56 +00:00
if (!disk->supportZeroCopyReplication())
throw Exception(fmt::format("disk {} doesn't support zero-copy replication", disk->getName()), ErrorCodes::LOGICAL_ERROR);
2021-02-26 09:48:57 +00:00
part->storage.lockSharedData(*part);
String part_id = part->getUniqueId();
writeStringBinary(part_id, out);
writeBinary(checksums.files.size(), out);
for (const auto & it : checksums.files)
{
String file_name = it.first;
String metadata_file = fs::path(disk->getPath()) / part->getFullRelativePath() / file_name;
2021-04-27 00:05:43 +00:00
fs::path metadata(metadata_file);
2021-04-27 00:05:43 +00:00
if (!fs::exists(metadata))
2021-07-05 03:32:56 +00:00
throw Exception("Remote metadata '" + file_name + "' is not exists", ErrorCodes::CORRUPTED_DATA);
2021-04-27 00:05:43 +00:00
if (!fs::is_regular_file(metadata))
2021-07-05 03:32:56 +00:00
throw Exception("Remote metadata '" + file_name + "' is not a file", ErrorCodes::CORRUPTED_DATA);
2021-04-27 00:05:43 +00:00
UInt64 file_size = fs::file_size(metadata);
writeStringBinary(it.first, out);
writeBinary(file_size, out);
auto file_in = createReadBufferFromFileBase(metadata_file, /* settings= */ {});
HashingWriteBuffer hashing_out(out);
2021-05-26 20:37:44 +00:00
copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler());
if (blocker.isCancelled())
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
if (hashing_out.count() != file_size)
throw Exception("Unexpected size of file " + metadata_file, ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART);
writePODBinary(hashing_out.getHash(), out);
2020-10-22 09:32:05 +00:00
}
}
2016-01-28 01:00:27 +00:00
MergeTreeData::DataPartPtr Service::findPart(const String & name)
{
2021-12-30 14:27:22 +00:00
/// It is important to include PreActive and Outdated parts here because remote replicas cannot reliably
/// determine the local state of the part, so queries for the parts in these states are completely normal.
auto part = data.getPartIfExists(
2021-12-30 14:27:22 +00:00
name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (part)
return part;
throw Exception("No part " + name + " in table", ErrorCodes::NO_SUCH_DATA_PART);
2016-01-28 01:00:27 +00:00
}
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
2020-06-26 11:30:23 +00:00
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
const String & part_name,
const String & replica_path,
const String & host,
int port,
const ConnectionTimeouts & timeouts,
const String & user,
const String & password,
const String & interserver_scheme,
2021-05-26 20:37:44 +00:00
ThrottlerPtr throttler,
bool to_detached,
const String & tmp_prefix_,
std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr,
2021-06-24 08:25:05 +00:00
bool try_zero_copy,
2021-07-05 03:32:56 +00:00
DiskPtr disk)
2014-07-22 13:49:52 +00:00
{
if (blocker.isCancelled())
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
2019-08-02 20:19:06 +00:00
/// Validation of the input that may come from malicious replica.
2020-10-26 16:38:35 +00:00
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
2019-08-26 14:24:29 +00:00
const auto data_settings = data.getSettings();
2019-08-02 20:19:06 +00:00
Poco::URI uri;
uri.setScheme(interserver_scheme);
uri.setHost(host);
uri.setPort(port);
uri.setQueryParameters(
{
2019-09-06 12:18:56 +00:00
{"endpoint", getEndpointId(replica_path)},
{"part", part_name},
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)},
2019-09-06 12:18:56 +00:00
{"compress", "false"}
2017-11-17 20:42:03 +00:00
});
2021-07-05 03:32:56 +00:00
Strings capability;
if (try_zero_copy && data_settings->allow_remote_fs_zero_copy_replication)
{
2021-07-05 03:32:56 +00:00
if (!disk)
{
2021-08-24 23:05:55 +00:00
Disks disks = data.getDisks();
for (const auto & data_disk : disks)
if (data_disk->supportZeroCopyReplication())
capability.push_back(toString(data_disk->getType()));
2021-06-24 08:25:05 +00:00
}
2021-07-05 03:32:56 +00:00
else if (disk->supportZeroCopyReplication())
2021-06-24 08:25:05 +00:00
{
2021-08-24 22:24:47 +00:00
capability.push_back(toString(disk->getType()));
}
}
2021-07-05 03:32:56 +00:00
if (!capability.empty())
{
2022-01-30 19:49:48 +00:00
::sort(capability.begin(), capability.end());
2021-08-24 23:05:55 +00:00
capability.erase(std::unique(capability.begin(), capability.end()), capability.end());
2021-07-05 03:32:56 +00:00
const String & remote_fs_metadata = boost::algorithm::join(capability, ", ");
uri.addQueryParameter("remote_fs_metadata", remote_fs_metadata);
}
2021-07-05 03:32:56 +00:00
else
2021-06-24 08:25:05 +00:00
{
2021-07-05 03:32:56 +00:00
try_zero_copy = false;
}
Poco::Net::HTTPBasicCredentials creds{};
if (!user.empty())
{
creds.setUsername(user);
creds.setPassword(password);
}
PooledReadWriteBufferFromHTTP in{
2019-09-06 12:18:56 +00:00
uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
0, /* no redirects */
2019-08-13 10:29:31 +00:00
data_settings->replicated_max_parallel_fetches_for_host
};
int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));
2019-09-09 12:28:28 +00:00
2019-11-27 09:39:44 +00:00
ReservationPtr reservation;
size_t sum_files_size = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
2019-09-06 12:18:56 +00:00
{
readBinary(sum_files_size, in);
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
IMergeTreeDataPart::TTLInfos ttl_infos;
String ttl_infos_string;
readBinary(ttl_infos_string, in);
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
2021-07-05 03:32:56 +00:00
if (!disk)
{
2021-02-18 08:50:31 +00:00
reservation
2021-07-05 03:32:56 +00:00
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
if (!reservation)
reservation
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
}
}
2021-07-05 03:32:56 +00:00
else if (!disk)
2021-02-18 08:50:31 +00:00
{
reservation = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, nullptr);
if (!reservation)
reservation = data.reserveSpace(sum_files_size);
}
2019-09-06 12:18:56 +00:00
}
2021-07-05 03:32:56 +00:00
else if (!disk)
2019-09-06 12:18:56 +00:00
{
2019-09-06 15:09:20 +00:00
/// We don't know real size of part because sender server version is too old
reservation = data.makeEmptyReservationOnLargestDisk();
2019-09-06 12:18:56 +00:00
}
2021-07-05 03:32:56 +00:00
if (!disk)
disk = reservation->getDisk();
2019-09-06 12:18:56 +00:00
2020-07-02 23:41:37 +00:00
bool sync = (data_settings->min_compressed_bytes_to_fsync_after_fetch
&& sum_files_size >= data_settings->min_compressed_bytes_to_fsync_after_fetch);
2020-05-14 20:08:15 +00:00
String part_type = "Wide";
2020-06-26 11:38:37 +00:00
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
2020-05-14 20:08:15 +00:00
readStringBinary(part_type, in);
UUID part_uuid = UUIDHelpers::Nil;
2020-10-29 16:18:25 +00:00
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
readUUIDText(part_uuid, in);
2021-07-05 03:32:56 +00:00
String remote_fs_metadata = parse<String>(in.getResponseCookie("remote_fs_metadata", ""));
if (!remote_fs_metadata.empty())
{
if (!try_zero_copy)
throw Exception("Got unexpected 'remote_fs_metadata' cookie", ErrorCodes::LOGICAL_ERROR);
if (std::find(capability.begin(), capability.end(), remote_fs_metadata) == capability.end())
throw Exception(fmt::format("Got 'remote_fs_metadata' cookie {}, expect one from {}", remote_fs_metadata, fmt::join(capability, ", ")), ErrorCodes::LOGICAL_ERROR);
if (server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
throw Exception(fmt::format("Got 'remote_fs_metadata' cookie with old protocol version {}", server_protocol_version), ErrorCodes::LOGICAL_ERROR);
if (part_type == "InMemory")
throw Exception("Got 'remote_fs_metadata' cookie for in-memory part", ErrorCodes::INCORRECT_PART_TYPE);
try
{
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler);
}
catch (const Exception & e)
{
if (e.code() != ErrorCodes::S3_ERROR && e.code() != ErrorCodes::ZERO_COPY_REPLICATION_ERROR)
throw;
LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy."));
2021-07-05 03:32:56 +00:00
/// Try again but without zero-copy
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false, disk);
}
}
2020-10-27 12:47:42 +00:00
auto storage_id = data.getStorageID();
2021-07-05 03:32:56 +00:00
String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
auto entry = data.getContext()->getReplicatedFetchList().insert(
2020-10-27 12:47:42 +00:00
storage_id.getDatabaseName(), storage_id.getTableName(),
part_info.partition_id, part_name, new_part_path,
replica_path, uri, to_detached, sum_files_size);
2020-10-27 13:00:40 +00:00
in.setNextCallback(ReplicatedFetchReadCallback(*entry));
2020-10-27 12:47:42 +00:00
size_t projections = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
readBinary(projections, in);
MergeTreeData::DataPart::Checksums checksums;
return part_type == "InMemory"
2021-07-05 03:32:56 +00:00
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler);
2020-04-29 17:14:49 +00:00
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
const String & part_name,
2020-10-29 16:18:25 +00:00
const UUID & part_uuid,
2020-06-26 11:30:23 +00:00
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
2021-07-05 03:32:56 +00:00
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
2021-05-26 20:37:44 +00:00
size_t projections,
ThrottlerPtr throttler)
2020-04-29 17:14:49 +00:00
{
2021-07-05 03:32:56 +00:00
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, volume);
for (auto i = 0ul; i < projections; ++i)
{
String projection_name;
readStringBinary(projection_name, in);
MergeTreeData::DataPart::Checksums checksums;
if (!checksums.read(in))
throw Exception("Cannot deserialize checksums", ErrorCodes::CORRUPTED_DATA);
2021-10-08 17:21:19 +00:00
NativeReader block_in(in, 0);
auto block = block_in.read();
2021-05-26 20:37:44 +00:00
throttler->add(block.bytes());
MergeTreePartInfo new_part_info("all", 0, 0, 0);
MergeTreeData::MutableDataPartPtr new_projection_part =
std::make_shared<MergeTreeDataPartInMemory>(data, projection_name, new_part_info, volume, projection_name, new_data_part.get());
new_projection_part->is_temp = false;
new_projection_part->setColumns(block.getNamesAndTypesList());
MergeTreePartition partition{};
new_projection_part->partition = std::move(partition);
new_projection_part->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
MergedBlockOutputStream part_out(
new_projection_part,
metadata_snapshot->projections.get(projection_name).metadata,
block.getNamesAndTypesList(),
{},
2021-10-29 17:21:02 +00:00
CompressionCodecFactory::instance().get("NONE", {}));
2021-05-14 21:45:13 +00:00
part_out.write(block);
2022-02-01 02:49:40 +00:00
part_out.writeSuffixAndFinalizePart(new_projection_part);
new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
new_data_part->addProjectionPart(projection_name, std::move(new_projection_part));
}
2020-05-05 01:27:31 +00:00
MergeTreeData::DataPart::Checksums checksums;
if (!checksums.read(in))
throw Exception("Cannot deserialize checksums", ErrorCodes::CORRUPTED_DATA);
2021-10-08 17:21:19 +00:00
NativeReader block_in(in, 0);
2020-06-03 18:59:18 +00:00
auto block = block_in.read();
2021-05-26 20:37:44 +00:00
throttler->add(block.bytes());
2020-10-29 16:18:25 +00:00
new_data_part->uuid = part_uuid;
2020-04-29 17:14:49 +00:00
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList());
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
new_data_part->partition.create(metadata_snapshot, block, 0, context);
2020-04-29 17:14:49 +00:00
MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {},
2021-10-29 17:21:02 +00:00
CompressionCodecFactory::instance().get("NONE", {}));
2021-04-15 21:47:11 +00:00
2020-04-29 17:14:49 +00:00
part_out.write(block);
2022-02-01 02:49:40 +00:00
part_out.writeSuffixAndFinalizePart(new_data_part);
2020-05-05 01:27:31 +00:00
new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
2020-04-29 17:14:49 +00:00
return new_data_part;
}
void Fetcher::downloadBaseOrProjectionPartToDisk(
const String & replica_path,
const String & part_download_path,
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
2021-05-26 20:37:44 +00:00
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler) const
{
size_t files;
readBinary(files, in);
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
/// File must be inside "absolute_part_path" directory.
/// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.
2021-05-24 16:03:09 +00:00
String absolute_file_path = fs::weakly_canonical(fs::path(part_download_path) / file_name);
if (!startsWith(absolute_file_path, fs::weakly_canonical(part_download_path).string()))
2020-04-08 08:41:13 +00:00
throw Exception("File path (" + absolute_file_path + ") doesn't appear to be inside part path (" + part_download_path + ")."
" This may happen if we are trying to download part from malicious replica or logical error.",
ErrorCodes::INSECURE_PATH);
auto file_out = disk->writeFile(fs::path(part_download_path) / file_name);
2020-04-08 08:41:13 +00:00
HashingWriteBuffer hashing_out(*file_out);
2021-05-26 20:37:44 +00:00
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
if (blocker.isCancelled())
{
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,
/// performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).
2020-04-08 08:41:13 +00:00
disk->removeRecursive(part_download_path);
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
}
2017-06-21 01:24:05 +00:00
MergeTreeDataPartChecksum::uint128 expected_hash;
readPODBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
throw Exception("Checksum mismatch for file " + fullPath(disk, (fs::path(part_download_path) / file_name).string()) + " transferred from " + replica_path,
ErrorCodes::CHECKSUM_DOESNT_MATCH);
if (file_name != "checksums.txt" &&
file_name != "columns.txt" &&
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
checksums.addFile(file_name, file_size, expected_hash);
if (sync)
hashing_out.sync();
}
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
const String & part_name,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
bool sync,
DiskPtr disk,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
2021-05-26 20:37:44 +00:00
MergeTreeData::DataPart::Checksums & checksums,
ThrottlerPtr throttler)
{
static const String TMP_PREFIX = "tmp-fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
/// We will remove directory if it's already exists. Make precautions.
if (tmp_prefix.empty() //-V560
|| part_name.empty()
|| std::string::npos != tmp_prefix.find_first_of("/.")
|| std::string::npos != part_name.find_first_of("/."))
throw Exception("Logical error: tmp_prefix and part_name cannot be empty or contain '.' or '/' characters.", ErrorCodes::LOGICAL_ERROR);
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
String part_download_path = data.getRelativeDataPath() + part_relative_path + "/";
if (disk->exists(part_download_path))
{
LOG_WARNING(log, "Directory {} already exists, probably result of a failed fetch. Will remove it before fetching part.",
fullPath(disk, part_download_path));
disk->removeRecursive(part_download_path);
}
disk->createDirectories(part_download_path);
SyncGuardPtr sync_guard;
if (data.getSettings()->fsync_part_directory)
sync_guard = disk->getDirectorySyncGuard(part_download_path);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
for (auto i = 0ul; i < projections; ++i)
{
String projection_name;
readStringBinary(projection_name, in);
MergeTreeData::DataPart::Checksums projection_checksum;
2021-05-11 11:44:59 +00:00
disk->createDirectories(part_download_path + projection_name + ".proj/");
downloadBaseOrProjectionPartToDisk(
2021-05-26 20:37:44 +00:00
replica_path, part_download_path + projection_name + ".proj/", sync, disk, in, projection_checksum, throttler);
checksums.addFile(
projection_name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
}
// Download the base part
2021-05-26 20:37:44 +00:00
downloadBaseOrProjectionPartToDisk(replica_path, part_download_path, sync, disk, in, checksums, throttler);
assertEOF(in);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
2020-02-11 23:29:34 +00:00
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
new_data_part->checksums.checkEqual(checksums, false);
return new_data_part;
2014-07-22 13:49:52 +00:00
}
2021-06-24 08:25:05 +00:00
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
const String & part_name,
const String & replica_path,
bool to_detached,
const String & tmp_prefix_,
2021-07-05 03:32:56 +00:00
DiskPtr disk,
2021-05-26 20:37:44 +00:00
PooledReadWriteBufferFromHTTP & in,
ThrottlerPtr throttler)
{
String part_id;
readStringBinary(part_id, in);
2021-07-05 03:32:56 +00:00
if (!disk->supportZeroCopyReplication() || !disk->checkUniqueId(part_id))
{
2021-07-05 03:32:56 +00:00
throw Exception(fmt::format("Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName()), ErrorCodes::ZERO_COPY_REPLICATION_ERROR);
}
2021-07-05 03:32:56 +00:00
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",
part_name, part_id, disk->getName());
static const String TMP_PREFIX = "tmp-fetch_";
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
String part_relative_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
String part_download_path = fs::path(data.getRelativeDataPath()) / part_relative_path / "";
if (disk->exists(part_download_path))
throw Exception("Directory " + fullPath(disk, part_download_path) + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
disk->createDirectories(part_download_path);
size_t files;
readBinary(files, in);
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
for (size_t i = 0; i < files; ++i)
{
String file_name;
UInt64 file_size;
readStringBinary(file_name, in);
readBinary(file_size, in);
String data_path = fs::path(part_download_path) / file_name;
String metadata_file = fullPath(disk, data_path);
{
2021-01-20 09:48:22 +00:00
auto file_out = std::make_unique<WriteBufferFromFile>(metadata_file, DBMS_DEFAULT_BUFFER_SIZE, -1, 0666, nullptr, 0);
HashingWriteBuffer hashing_out(*file_out);
2021-05-26 20:37:44 +00:00
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
if (blocker.isCancelled())
{
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,
/// performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).
disk->removeSharedRecursive(part_download_path, true);
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
}
MergeTreeDataPartChecksum::uint128 expected_hash;
readPODBinary(expected_hash, in);
if (expected_hash != hashing_out.getHash())
{
throw Exception("Checksum mismatch for file " + metadata_file + " transferred from " + replica_path,
ErrorCodes::CHECKSUM_DOESNT_MATCH);
}
}
}
assertEOF(in);
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, volume, part_relative_path);
new_data_part->is_temp = true;
new_data_part->modification_time = time(nullptr);
new_data_part->loadColumnsChecksumsIndexes(true, false);
2021-02-26 09:48:57 +00:00
new_data_part->storage.lockSharedData(*new_data_part);
return new_data_part;
}
2014-07-22 13:49:52 +00:00
}
2016-01-28 01:00:27 +00:00
}