2017-04-01 09:19:00 +00:00
|
|
|
#include <Storages/MergeTree/DataPartsExchange.h>
|
2021-02-19 12:51:26 +00:00
|
|
|
|
2021-10-15 20:18:20 +00:00
|
|
|
#include <Formats/NativeWriter.h>
|
2021-02-19 12:51:26 +00:00
|
|
|
#include <Disks/SingleDiskVolume.h>
|
|
|
|
#include <Disks/createVolume.h>
|
|
|
|
#include <IO/HTTPCommon.h>
|
|
|
|
#include <Server/HTTP/HTMLForm.h>
|
|
|
|
#include <Server/HTTP/HTTPServerResponse.h>
|
2020-04-29 17:14:49 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
|
|
|
|
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
2021-02-19 12:51:26 +00:00
|
|
|
#include <Storages/MergeTree/ReplicatedFetchList.h>
|
2021-05-26 20:37:44 +00:00
|
|
|
#include <Storages/StorageReplicatedMergeTree.h>
|
2017-04-01 09:19:00 +00:00
|
|
|
#include <Common/CurrentMetrics.h>
|
|
|
|
#include <Common/NetException.h>
|
2022-04-21 19:19:13 +00:00
|
|
|
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
|
2022-03-08 17:05:55 +00:00
|
|
|
#include <Disks/IO/createReadBufferFromFileBase.h>
|
2021-10-02 07:13:14 +00:00
|
|
|
#include <base/scope_guard.h>
|
2017-04-11 14:13:19 +00:00
|
|
|
#include <Poco/Net/HTTPRequest.h>
|
2021-11-18 18:07:35 +00:00
|
|
|
#include <boost/algorithm/string/join.hpp>
|
2021-07-05 03:32:56 +00:00
|
|
|
#include <iterator>
|
|
|
|
#include <regex>
|
2022-01-30 19:49:48 +00:00
|
|
|
#include <base/sort.h>
|
|
|
|
|
2014-07-22 13:49:52 +00:00
|
|
|
|
2021-04-27 00:05:43 +00:00
|
|
|
namespace fs = std::filesystem;
|
|
|
|
|
2016-10-24 04:06:27 +00:00
|
|
|
namespace CurrentMetrics
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const Metric ReplicatedSend;
|
2021-11-10 18:15:27 +00:00
|
|
|
extern const Metric ReplicatedFetch;
|
2016-10-24 04:06:27 +00:00
|
|
|
}
|
|
|
|
|
2014-07-22 13:49:52 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2020-02-25 18:02:41 +00:00
|
|
|
extern const int DIRECTORY_ALREADY_EXISTS;
|
|
|
|
extern const int NO_SUCH_DATA_PART;
|
2017-04-01 07:20:54 +00:00
|
|
|
extern const int ABORTED;
|
|
|
|
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
|
2017-08-09 13:31:13 +00:00
|
|
|
extern const int CANNOT_WRITE_TO_OSTREAM;
|
2018-11-22 21:19:58 +00:00
|
|
|
extern const int CHECKSUM_DOESNT_MATCH;
|
2019-07-31 18:21:13 +00:00
|
|
|
extern const int INSECURE_PATH;
|
2020-05-05 01:27:31 +00:00
|
|
|
extern const int CORRUPTED_DATA;
|
|
|
|
extern const int LOGICAL_ERROR;
|
2020-10-08 15:45:10 +00:00
|
|
|
extern const int S3_ERROR;
|
2021-02-26 09:48:57 +00:00
|
|
|
extern const int INCORRECT_PART_TYPE;
|
2021-07-05 03:32:56 +00:00
|
|
|
extern const int ZERO_COPY_REPLICATION_ERROR;
|
2016-01-11 21:46:36 +00:00
|
|
|
}
|
|
|
|
|
2016-01-28 01:00:27 +00:00
|
|
|
namespace DataPartsExchange
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace
|
|
|
|
{
|
2020-03-09 01:22:33 +00:00
|
|
|
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = 1;
|
|
|
|
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = 2;
|
2020-05-14 20:08:15 +00:00
|
|
|
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE = 3;
|
2020-08-26 15:29:46 +00:00
|
|
|
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION = 4;
|
2020-10-15 16:17:16 +00:00
|
|
|
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5;
|
2021-07-05 03:32:56 +00:00
|
|
|
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6;
|
2021-02-10 14:12:49 +00:00
|
|
|
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7;
|
2021-08-27 12:17:58 +00:00
|
|
|
// Reserved for ALTER PRIMARY KEY
|
|
|
|
// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 8;
|
2020-01-30 10:21:40 +00:00
|
|
|
|
2019-09-04 16:00:20 +00:00
|
|
|
|
2016-01-28 01:00:27 +00:00
|
|
|
std::string getEndpointId(const std::string & node_id)
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return "DataPartsExchange:" + node_id;
|
2016-01-28 01:00:27 +00:00
|
|
|
}
|
|
|
|
|
2020-10-27 13:09:14 +00:00
|
|
|
/// Simple functor for tracking fetch progress in system.replicated_fetches table.
|
2020-10-26 16:38:35 +00:00
|
|
|
struct ReplicatedFetchReadCallback
|
|
|
|
{
|
|
|
|
ReplicatedFetchList::Entry & replicated_fetch_entry;
|
|
|
|
|
2020-10-27 15:29:06 +00:00
|
|
|
explicit ReplicatedFetchReadCallback(ReplicatedFetchList::Entry & replicated_fetch_entry_)
|
2020-10-26 16:38:35 +00:00
|
|
|
: replicated_fetch_entry(replicated_fetch_entry_)
|
|
|
|
{}
|
|
|
|
|
|
|
|
void operator() (size_t bytes_count)
|
|
|
|
{
|
2020-10-27 12:24:10 +00:00
|
|
|
replicated_fetch_entry->bytes_read_compressed.store(bytes_count, std::memory_order_relaxed);
|
2020-10-30 08:52:11 +00:00
|
|
|
|
|
|
|
/// It's possible when we fetch part from very old clickhouse version
|
|
|
|
/// which doesn't send total size.
|
|
|
|
if (replicated_fetch_entry->total_size_bytes_compressed != 0)
|
|
|
|
{
|
|
|
|
replicated_fetch_entry->progress.store(
|
|
|
|
static_cast<double>(bytes_count) / replicated_fetch_entry->total_size_bytes_compressed,
|
|
|
|
std::memory_order_relaxed);
|
|
|
|
}
|
2020-10-26 16:38:35 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2016-01-28 01:00:27 +00:00
|
|
|
}
|
|
|
|
|
2021-05-26 20:37:44 +00:00
|
|
|
|
|
|
|
Service::Service(StorageReplicatedMergeTree & data_) :
|
|
|
|
data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
|
|
|
|
|
2016-01-28 01:00:27 +00:00
|
|
|
std::string Service::getId(const std::string & node_id) const
|
|
|
|
{
|
2017-04-01 07:20:54 +00:00
|
|
|
return getEndpointId(node_id);
|
2016-01-28 01:00:27 +00:00
|
|
|
}
|
2016-01-11 21:46:36 +00:00
|
|
|
|
2021-02-19 12:51:26 +00:00
|
|
|
void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, HTTPServerResponse & response)
|
2014-07-22 13:49:52 +00:00
|
|
|
{
|
2020-02-27 10:43:38 +00:00
|
|
|
int client_protocol_version = parse<int>(params.get("client_protocol_version", "0"));
|
2019-09-06 12:18:56 +00:00
|
|
|
|
2019-09-04 16:00:20 +00:00
|
|
|
String part_name = params.get("part");
|
2019-05-12 14:57:23 +00:00
|
|
|
|
2019-08-26 14:24:29 +00:00
|
|
|
const auto data_settings = data.getSettings();
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2019-08-02 20:19:06 +00:00
|
|
|
/// Validation of the input that may come from malicious replica.
|
|
|
|
MergeTreePartInfo::fromPartName(part_name, data.format_version);
|
|
|
|
|
2020-02-27 10:43:38 +00:00
|
|
|
/// We pretend to work as older server version, to be sure that client will correctly process our version
|
2021-02-10 14:12:49 +00:00
|
|
|
response.addCookie({"server_protocol_version", toString(std::min(client_protocol_version, REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION))});
|
2019-09-06 12:18:56 +00:00
|
|
|
|
2020-05-23 22:24:01 +00:00
|
|
|
LOG_TRACE(log, "Sending part {}", part_name);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
MergeTreeData::DataPartPtr part;
|
|
|
|
|
|
|
|
auto report_broken_part = [&]()
|
|
|
|
{
|
|
|
|
if (part && part->isProjectionPart())
|
|
|
|
{
|
2022-07-18 21:37:07 +00:00
|
|
|
auto parent_part = part->getParentPart()->shared_from_this();
|
|
|
|
data.reportBrokenPart(parent_part);
|
2021-02-10 14:12:49 +00:00
|
|
|
}
|
2022-07-18 21:37:07 +00:00
|
|
|
else if (part)
|
|
|
|
data.reportBrokenPart(part);
|
2021-02-10 14:12:49 +00:00
|
|
|
else
|
2022-07-18 21:37:07 +00:00
|
|
|
LOG_TRACE(log, "Part {} was not found, do not report it as broken", part_name);
|
2021-02-10 14:12:49 +00:00
|
|
|
};
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
try
|
|
|
|
{
|
2021-02-10 14:12:49 +00:00
|
|
|
part = findPart(part_name);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedSend};
|
|
|
|
|
2022-06-20 18:18:17 +00:00
|
|
|
if (part->data_part_storage->isStoredOnRemoteDisk())
|
2022-02-25 17:44:13 +00:00
|
|
|
{
|
|
|
|
UInt64 revision = parse<UInt64>(params.get("disk_revision", "0"));
|
|
|
|
if (revision)
|
2022-06-20 18:18:17 +00:00
|
|
|
part->data_part_storage->syncRevision(revision);
|
|
|
|
revision = part->data_part_storage->getRevision();
|
2022-02-25 17:44:13 +00:00
|
|
|
if (revision)
|
|
|
|
response.addCookie({"disk_revision", toString(revision)});
|
|
|
|
}
|
|
|
|
|
2020-02-27 10:43:38 +00:00
|
|
|
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
|
2020-04-29 17:14:49 +00:00
|
|
|
writeBinary(part->checksums.getTotalSizeOnDisk(), out);
|
2019-05-12 14:57:23 +00:00
|
|
|
|
2020-02-27 10:43:38 +00:00
|
|
|
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
|
2020-01-30 10:21:40 +00:00
|
|
|
{
|
|
|
|
WriteBufferFromOwnString ttl_infos_buffer;
|
|
|
|
part->ttl_infos.write(ttl_infos_buffer);
|
|
|
|
writeBinary(ttl_infos_buffer.str(), out);
|
|
|
|
}
|
|
|
|
|
2020-05-14 20:08:15 +00:00
|
|
|
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
|
|
|
|
writeStringBinary(part->getType().toString(), out);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2020-10-29 16:18:25 +00:00
|
|
|
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
|
|
|
|
writeUUIDText(part->uuid, out);
|
|
|
|
|
2021-07-05 03:32:56 +00:00
|
|
|
String remote_fs_metadata = parse<String>(params.get("remote_fs_metadata", ""));
|
|
|
|
std::regex re("\\s*,\\s*");
|
|
|
|
Strings capability(
|
|
|
|
std::sregex_token_iterator(remote_fs_metadata.begin(), remote_fs_metadata.end(), re, -1),
|
|
|
|
std::sregex_token_iterator());
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2021-07-05 03:32:56 +00:00
|
|
|
if (data_settings->allow_remote_fs_zero_copy_replication &&
|
Fix fetch of in-memory part with allow_remote_fs_zero_copy_replication
CI founds the following error during trying to fetch in-memory part [1]:
2022.07.13 08:06:54.231033 [ 141093 ] {} <Error> InterserverIOHTTPHandler: Code: 107. DB::ErrnoException: Cannot open file /var/lib/clickhouse/disks/s3/store/886/88635b40-e4e3-4fe6-a0a0-1b6755463358/all_0_0_0/data.bin, errno: 2, strerror: No such file or directory. (FILE_DOESNT_EXIST), Stack trace (when copying this message, always include the lines below):
<details>
<summary>stacktrace</summary>
2022.07.13 08:06:54.101825 [ 189342 ] {8dbd11b3-f38a-4d5d-9ded-148987adb71d} <Debug> executeQuery: (from [::1]:54570) (comment: 01643_replicated_merge_tree_fsync_smoke.sql) select 'memory in_memory_parts_insert_sync'; (stage: Complete)
2022.07.13 08:06:54.131309 [ 691 ] {} <Debug> test_26u6kx.rep_fsync_r2 (39c3823c-22e5-4c05-9dec-cdffd8872c40): Fetching part all_0_0_0 from /clickhouse/tables/test_26u6kx/rep_fsync/replicas/r1
2022.07.13 08:06:54.231033 [ 141093 ] {} <Error> InterserverIOHTTPHandler: Code: 107. DB::ErrnoException: Cannot open file /var/lib/clickhouse/disks/s3/store/886/88635b40-e4e3-4fe6-a0a0-1b6755463358/all_0_0_0/data.bin, errno: 2, strerror: No such file or directory. (FILE_DOESNT_EXIST), Stack trace (when copying this message, always include the lines below):
0. DB::Exception::Exception() @ 0xba0191a in /usr/bin/clickhouse
1. DB::throwFromErrnoWithPath() @ 0xba029ca in /usr/bin/clickhouse
2. DB::OpenedFile::open() const @ 0x156e7fb0 in /usr/bin/clickhouse
3. DB::OpenedFile::getFD() const @ 0x156e8003 in /usr/bin/clickhouse
4. DB::ReadBufferFromFilePReadWithDescriptorsCache::ReadBufferFromFilePReadWithDescriptorsCache() @ 0x156e5f23 in /usr/bin/clickhouse
5. ? @ 0x156e53f0 in /usr/bin/clickhouse
6. DB::createReadBufferFromFileBase() @ 0x156e52b5 in /usr/bin/clickhouse
7. DB::DiskLocal::readFile() const @ 0x15e45ea8 in /usr/bin/clickhouse
8. DB::MetadataStorageFromDisk::readFileToString() const @ 0x15e6ab8b in /usr/bin/clickhouse
9. DB::MetadataStorageFromDisk::readMetadataUnlocked() const @ 0x15e6cdeb in /usr/bin/clickhouse
10. DB::MetadataStorageFromDisk::getSerializedMetadata() const @ 0x15e6cfc4 in /usr/bin/clickhouse
11. DB::DiskObjectStorage::getSerializedMetadata() const @ 0x15e19e2e in /usr/bin/clickhouse
12. DB::DiskDecorator::getSerializedMetadata() const @ 0x15e54ed1 in /usr/bin/clickhouse
13. DB::DiskDecorator::getSerializedMetadata() const @ 0x15e54ed1 in /usr/bin/clickhouse
14. DB::DataPartsExchange::Service::sendPartFromDiskRemoteMeta() @ 0x1700bb9e in /usr/bin/clickhouse
15. DB::DataPartsExchange::Service::processQuery(DB::HTMLForm const&, DB::ReadBuffer&, DB::WriteBuffer&, DB::HTTPServerResponse&) @ 0x1700a649 in /usr/bin/clickhouse
16. DB::InterserverIOHTTPHandler::processQuery(DB::HTTPServerRequest&, DB::HTTPServerResponse&, DB::InterserverIOHTTPHandler::Output&) @ 0x17433c53 in /usr/bin/clickhouse
17. DB::InterserverIOHTTPHandler::handleRequest(DB::HTTPServerRequest&, DB::HTTPServerResponse&) @ 0x174344f1 in /usr/bin/clickhouse
18. DB::HTTPServerConnection::run() @ 0x1768714d in /usr/bin/clickhouse
19. Poco::Net::TCPServerConnection::start() @ 0x1a398093 in /usr/bin/clickhouse
20. Poco::Net::TCPServerDispatcher::run() @ 0x1a399411 in /usr/bin/clickhouse
21. Poco::PooledThread::run() @ 0x1a54b7bb in /usr/bin/clickhouse
22. Poco::ThreadImpl::runnableEntry(void*) @ 0x1a548ec0 in /usr/bin/clickhouse
23. ? @ 0x7fdf1c204609 in ?
24. clone @ 0x7fdf1c129133 in ?
(version 22.7.1.1781 (official build))
</details>
[1]: https://s3.amazonaws.com/clickhouse-test-reports/0/8b6e31cc615ca52c80724b6e5097777cb9514f07/stateless_tests__release__s3_storage__actions_.html
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-07-14 08:38:46 +00:00
|
|
|
/// In memory data part does not have metadata yet.
|
|
|
|
!isInMemoryPart(part) &&
|
2021-07-05 03:32:56 +00:00
|
|
|
client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
|
2021-06-24 08:25:05 +00:00
|
|
|
{
|
2022-04-21 19:19:13 +00:00
|
|
|
auto disk_type = part->data_part_storage->getDiskType();
|
|
|
|
if (part->data_part_storage->supportZeroCopyReplication() && std::find(capability.begin(), capability.end(), disk_type) != capability.end())
|
2021-02-10 14:12:49 +00:00
|
|
|
{
|
2021-07-05 03:32:56 +00:00
|
|
|
/// Send metadata if the receiver's capability covers the source disk type.
|
|
|
|
response.addCookie({"remote_fs_metadata", disk_type});
|
|
|
|
sendPartFromDiskRemoteMeta(part, out);
|
|
|
|
return;
|
2020-10-08 15:45:10 +00:00
|
|
|
}
|
2021-02-10 14:12:49 +00:00
|
|
|
}
|
2021-07-05 03:32:56 +00:00
|
|
|
|
|
|
|
if (client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
|
2021-02-10 14:12:49 +00:00
|
|
|
{
|
|
|
|
const auto & projections = part->getProjectionParts();
|
|
|
|
writeBinary(projections.size(), out);
|
|
|
|
if (isInMemoryPart(part))
|
|
|
|
sendPartFromMemory(part, out, projections);
|
|
|
|
else
|
|
|
|
sendPartFromDisk(part, out, client_protocol_version, projections);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
if (isInMemoryPart(part))
|
|
|
|
sendPartFromMemory(part, out);
|
2020-10-08 15:45:10 +00:00
|
|
|
else
|
2020-12-16 15:31:13 +00:00
|
|
|
sendPartFromDisk(part, out, client_protocol_version);
|
2020-08-26 15:29:46 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2018-08-10 04:02:56 +00:00
|
|
|
catch (const NetException &)
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2018-03-23 16:33:51 +00:00
|
|
|
/// Network error or error on remote side. No need to enqueue part for check.
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
2017-08-09 13:31:13 +00:00
|
|
|
if (e.code() != ErrorCodes::ABORTED && e.code() != ErrorCodes::CANNOT_WRITE_TO_OSTREAM)
|
2021-02-10 14:12:49 +00:00
|
|
|
report_broken_part();
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
2021-02-10 14:12:49 +00:00
|
|
|
report_broken_part();
|
2017-04-01 07:20:54 +00:00
|
|
|
throw;
|
|
|
|
}
|
2014-07-22 13:49:52 +00:00
|
|
|
}
|
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
void Service::sendPartFromMemory(
|
|
|
|
const MergeTreeData::DataPartPtr & part, WriteBuffer & out, const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
|
2020-04-29 17:14:49 +00:00
|
|
|
{
|
2020-06-26 11:30:23 +00:00
|
|
|
auto metadata_snapshot = data.getInMemoryMetadataPtr();
|
2021-02-10 14:12:49 +00:00
|
|
|
for (const auto & [name, projection] : projections)
|
|
|
|
{
|
|
|
|
auto projection_sample_block = metadata_snapshot->projections.get(name).sample_block;
|
|
|
|
auto part_in_memory = asInMemoryPart(projection);
|
|
|
|
if (!part_in_memory)
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Projection {} of part {} is not stored in memory", name, part->name);
|
2021-02-10 14:12:49 +00:00
|
|
|
|
|
|
|
writeStringBinary(name, out);
|
|
|
|
projection->checksums.write(out);
|
2021-10-08 17:21:19 +00:00
|
|
|
NativeWriter block_out(out, 0, projection_sample_block);
|
2021-02-10 14:12:49 +00:00
|
|
|
block_out.write(part_in_memory->block);
|
|
|
|
}
|
|
|
|
|
2020-06-05 20:47:46 +00:00
|
|
|
auto part_in_memory = asInMemoryPart(part);
|
2020-04-29 17:14:49 +00:00
|
|
|
if (!part_in_memory)
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} is not stored in memory", part->name);
|
2020-05-05 01:27:31 +00:00
|
|
|
|
2021-10-08 17:21:19 +00:00
|
|
|
NativeWriter block_out(out, 0, metadata_snapshot->getSampleBlock());
|
2020-05-05 01:27:31 +00:00
|
|
|
part->checksums.write(out);
|
2020-04-29 17:14:49 +00:00
|
|
|
block_out.write(part_in_memory->block);
|
2021-05-26 20:37:44 +00:00
|
|
|
|
|
|
|
data.getSendsThrottler()->add(part_in_memory->block.bytes());
|
2020-04-29 17:14:49 +00:00
|
|
|
}
|
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
|
|
|
|
const MergeTreeData::DataPartPtr & part,
|
|
|
|
WriteBuffer & out,
|
|
|
|
int client_protocol_version,
|
|
|
|
const std::map<String, std::shared_ptr<IMergeTreeDataPart>> & projections)
|
2020-04-29 17:14:49 +00:00
|
|
|
{
|
|
|
|
/// We'll take a list of files from the list of checksums.
|
|
|
|
MergeTreeData::DataPart::Checksums checksums = part->checksums;
|
|
|
|
/// Add files that are not in the checksum list.
|
2020-08-26 15:29:46 +00:00
|
|
|
auto file_names_without_checksums = part->getFileNamesWithoutChecksums();
|
|
|
|
for (const auto & file_name : file_names_without_checksums)
|
|
|
|
{
|
2020-10-15 16:17:16 +00:00
|
|
|
if (client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION && file_name == IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
|
2020-08-26 15:29:46 +00:00
|
|
|
continue;
|
2020-10-15 16:17:16 +00:00
|
|
|
|
2020-08-26 15:29:46 +00:00
|
|
|
checksums.files[file_name] = {};
|
|
|
|
}
|
2020-04-29 17:14:49 +00:00
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
//auto disk = part->volume->getDisk();
|
2020-04-29 17:14:49 +00:00
|
|
|
MergeTreeData::DataPart::Checksums data_checksums;
|
2021-02-10 14:12:49 +00:00
|
|
|
for (const auto & [name, projection] : part->getProjectionParts())
|
|
|
|
{
|
|
|
|
// Get rid of projection files
|
|
|
|
checksums.files.erase(name + ".proj");
|
|
|
|
auto it = projections.find(name);
|
|
|
|
if (it != projections.end())
|
|
|
|
{
|
|
|
|
writeStringBinary(name, out);
|
|
|
|
MergeTreeData::DataPart::Checksums projection_checksum = sendPartFromDisk(it->second, out, client_protocol_version);
|
|
|
|
data_checksums.addFile(name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
|
|
|
|
}
|
|
|
|
else if (part->checksums.has(name + ".proj"))
|
|
|
|
{
|
|
|
|
// We don't send this projection, just add out checksum to bypass the following check
|
|
|
|
const auto & our_checksum = part->checksums.files.find(name + ".proj")->second;
|
|
|
|
data_checksums.addFile(name + ".proj", our_checksum.file_size, our_checksum.file_hash);
|
|
|
|
}
|
|
|
|
}
|
2020-04-29 17:14:49 +00:00
|
|
|
|
|
|
|
writeBinary(checksums.files.size(), out);
|
|
|
|
for (const auto & it : checksums.files)
|
|
|
|
{
|
|
|
|
String file_name = it.first;
|
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
UInt64 size = part->data_part_storage->getFileSize(file_name);
|
2020-04-29 17:14:49 +00:00
|
|
|
|
|
|
|
writeStringBinary(it.first, out);
|
|
|
|
writeBinary(size, out);
|
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
auto file_in = part->data_part_storage->readFile(file_name, {}, std::nullopt, std::nullopt);
|
2020-04-29 17:14:49 +00:00
|
|
|
HashingWriteBuffer hashing_out(out);
|
2021-05-26 20:37:44 +00:00
|
|
|
copyDataWithThrottler(*file_in, hashing_out, blocker.getCounter(), data.getSendsThrottler());
|
2020-04-29 17:14:49 +00:00
|
|
|
|
|
|
|
if (blocker.isCancelled())
|
|
|
|
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
|
|
|
|
|
|
|
|
if (hashing_out.count() != size)
|
2022-04-21 19:19:13 +00:00
|
|
|
throw Exception(
|
2022-04-26 19:08:00 +00:00
|
|
|
ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART,
|
|
|
|
"Unexpected size of file {}, expected {} got {}",
|
2022-06-20 18:18:17 +00:00
|
|
|
std::string(fs::path(part->data_part_storage->getRelativePath()) / file_name),
|
2022-04-21 19:19:13 +00:00
|
|
|
hashing_out.count(), size);
|
2020-04-29 17:14:49 +00:00
|
|
|
|
|
|
|
writePODBinary(hashing_out.getHash(), out);
|
|
|
|
|
2022-04-18 10:18:43 +00:00
|
|
|
if (!file_names_without_checksums.contains(file_name))
|
2020-04-29 17:14:49 +00:00
|
|
|
data_checksums.addFile(file_name, hashing_out.count(), hashing_out.getHash());
|
|
|
|
}
|
|
|
|
|
|
|
|
part->checksums.checkEqual(data_checksums, false);
|
2021-02-10 14:12:49 +00:00
|
|
|
return data_checksums;
|
2020-04-29 17:14:49 +00:00
|
|
|
}
|
|
|
|
|
2021-06-24 08:25:05 +00:00
|
|
|
void Service::sendPartFromDiskRemoteMeta(const MergeTreeData::DataPartPtr & part, WriteBuffer & out)
|
2020-10-08 15:45:10 +00:00
|
|
|
{
|
2022-04-22 16:58:09 +00:00
|
|
|
const auto * data_part_storage_on_disk = dynamic_cast<const DataPartStorageOnDisk *>(part->data_part_storage.get());
|
2022-04-21 19:19:13 +00:00
|
|
|
if (!data_part_storage_on_disk)
|
2022-06-20 18:18:17 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage '{}' doesn't support zero-copy replication", part->data_part_storage->getDiskName());
|
2022-04-21 19:19:13 +00:00
|
|
|
|
2022-06-20 18:18:17 +00:00
|
|
|
if (!data_part_storage_on_disk->supportZeroCopyReplication())
|
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk '{}' doesn't support zero-copy replication", data_part_storage_on_disk->getDiskName());
|
2022-02-14 19:19:49 +00:00
|
|
|
|
2020-10-08 15:45:10 +00:00
|
|
|
/// We'll take a list of files from the list of checksums.
|
|
|
|
MergeTreeData::DataPart::Checksums checksums = part->checksums;
|
|
|
|
/// Add files that are not in the checksum list.
|
|
|
|
auto file_names_without_checksums = part->getFileNamesWithoutChecksums();
|
|
|
|
for (const auto & file_name : file_names_without_checksums)
|
|
|
|
checksums.files[file_name] = {};
|
|
|
|
|
2022-02-14 19:19:49 +00:00
|
|
|
std::vector<std::string> paths;
|
|
|
|
paths.reserve(checksums.files.size());
|
|
|
|
for (const auto & it : checksums.files)
|
2022-06-20 18:18:17 +00:00
|
|
|
paths.push_back(fs::path(part->data_part_storage->getRelativePath()) / it.first);
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2022-02-15 09:11:50 +00:00
|
|
|
/// Serialized metadatadatas with zero ref counts.
|
2022-06-20 18:18:17 +00:00
|
|
|
auto metadatas = data_part_storage_on_disk->getSerializedMetadata(paths);
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2022-06-30 12:12:45 +00:00
|
|
|
String part_id = data_part_storage_on_disk->getUniqueId();
|
2020-11-03 08:58:26 +00:00
|
|
|
writeStringBinary(part_id, out);
|
|
|
|
|
2020-10-08 15:45:10 +00:00
|
|
|
writeBinary(checksums.files.size(), out);
|
|
|
|
for (const auto & it : checksums.files)
|
|
|
|
{
|
2022-02-14 19:19:49 +00:00
|
|
|
const String & file_name = it.first;
|
2022-06-20 18:18:17 +00:00
|
|
|
String file_path_prefix = fs::path(part->data_part_storage->getRelativePath()) / file_name;
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2022-02-15 09:11:50 +00:00
|
|
|
/// Just some additional checks
|
2022-06-20 18:18:17 +00:00
|
|
|
String metadata_file_path = fs::path(data_part_storage_on_disk->getDiskPath()) / file_path_prefix;
|
2022-02-15 09:11:50 +00:00
|
|
|
fs::path metadata(metadata_file_path);
|
2021-04-27 00:05:43 +00:00
|
|
|
if (!fs::exists(metadata))
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::CORRUPTED_DATA, "Remote metadata '{}' is not exists", file_name);
|
2021-04-27 00:05:43 +00:00
|
|
|
if (!fs::is_regular_file(metadata))
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::CORRUPTED_DATA, "Remote metadata '{}' is not a file", file_name);
|
2022-02-14 19:19:49 +00:00
|
|
|
|
2022-02-15 09:11:50 +00:00
|
|
|
/// Actual metadata send
|
2022-02-14 19:19:49 +00:00
|
|
|
auto metadata_str = metadatas[file_path_prefix];
|
|
|
|
UInt64 file_size = metadata_str.size();
|
|
|
|
ReadBufferFromString buf(metadata_str);
|
2020-10-08 15:45:10 +00:00
|
|
|
|
|
|
|
writeStringBinary(it.first, out);
|
|
|
|
writeBinary(file_size, out);
|
|
|
|
|
|
|
|
HashingWriteBuffer hashing_out(out);
|
2022-02-14 19:19:49 +00:00
|
|
|
copyDataWithThrottler(buf, hashing_out, blocker.getCounter(), data.getSendsThrottler());
|
2020-10-08 15:45:10 +00:00
|
|
|
if (blocker.isCancelled())
|
|
|
|
throw Exception("Transferring part to replica was cancelled", ErrorCodes::ABORTED);
|
|
|
|
|
|
|
|
if (hashing_out.count() != file_size)
|
2022-02-15 09:11:50 +00:00
|
|
|
throw Exception(ErrorCodes::BAD_SIZE_OF_FILE_IN_DATA_PART, "Unexpected size of file {}", metadata_file_path);
|
2020-10-08 15:45:10 +00:00
|
|
|
|
|
|
|
writePODBinary(hashing_out.getHash(), out);
|
2020-10-22 09:32:05 +00:00
|
|
|
}
|
2020-10-08 15:45:10 +00:00
|
|
|
}
|
|
|
|
|
2016-01-28 01:00:27 +00:00
|
|
|
MergeTreeData::DataPartPtr Service::findPart(const String & name)
|
|
|
|
{
|
2021-12-30 14:27:22 +00:00
|
|
|
/// It is important to include PreActive and Outdated parts here because remote replicas cannot reliably
|
2017-12-18 17:26:46 +00:00
|
|
|
/// determine the local state of the part, so queries for the parts in these states are completely normal.
|
|
|
|
auto part = data.getPartIfExists(
|
2021-12-30 14:27:22 +00:00
|
|
|
name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
|
2017-04-01 07:20:54 +00:00
|
|
|
if (part)
|
|
|
|
return part;
|
2017-10-06 15:17:14 +00:00
|
|
|
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
|
2016-01-28 01:00:27 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
2020-06-26 11:30:23 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
2021-05-21 16:14:01 +00:00
|
|
|
ContextPtr context,
|
2017-04-01 07:20:54 +00:00
|
|
|
const String & part_name,
|
|
|
|
const String & replica_path,
|
|
|
|
const String & host,
|
|
|
|
int port,
|
2017-12-27 17:58:52 +00:00
|
|
|
const ConnectionTimeouts & timeouts,
|
2018-07-26 15:10:57 +00:00
|
|
|
const String & user,
|
|
|
|
const String & password,
|
2018-07-30 18:32:21 +00:00
|
|
|
const String & interserver_scheme,
|
2021-05-26 20:37:44 +00:00
|
|
|
ThrottlerPtr throttler,
|
2018-05-21 13:49:54 +00:00
|
|
|
bool to_detached,
|
2020-10-08 15:45:10 +00:00
|
|
|
const String & tmp_prefix_,
|
2021-03-12 09:58:32 +00:00
|
|
|
std::optional<CurrentlySubmergingEmergingTagger> * tagger_ptr,
|
2021-06-24 08:25:05 +00:00
|
|
|
bool try_zero_copy,
|
2021-07-05 03:32:56 +00:00
|
|
|
DiskPtr disk)
|
2014-07-22 13:49:52 +00:00
|
|
|
{
|
2020-06-16 02:14:53 +00:00
|
|
|
if (blocker.isCancelled())
|
|
|
|
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
|
|
|
|
|
2022-08-09 16:44:51 +00:00
|
|
|
/// It should be "tmp-fetch_" and not "tmp_fetch_", because we can fetch part to detached/,
|
|
|
|
/// but detached part name prefix should not contain underscore.
|
|
|
|
static const String TMP_PREFIX = "tmp-fetch_";
|
|
|
|
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
|
|
|
|
String part_dir = tmp_prefix + part_name;
|
|
|
|
auto temporary_directory_lock = data.getTemporaryPartDirectoryHolder(part_dir);
|
|
|
|
|
2019-08-02 20:19:06 +00:00
|
|
|
/// Validation of the input that may come from malicious replica.
|
2020-10-26 16:38:35 +00:00
|
|
|
auto part_info = MergeTreePartInfo::fromPartName(part_name, data.format_version);
|
2019-08-26 14:24:29 +00:00
|
|
|
const auto data_settings = data.getSettings();
|
2019-08-02 20:19:06 +00:00
|
|
|
|
2017-04-06 18:32:00 +00:00
|
|
|
Poco::URI uri;
|
2018-07-30 18:32:21 +00:00
|
|
|
uri.setScheme(interserver_scheme);
|
2017-04-06 18:32:00 +00:00
|
|
|
uri.setHost(host);
|
|
|
|
uri.setPort(port);
|
|
|
|
uri.setQueryParameters(
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2019-09-06 12:18:56 +00:00
|
|
|
{"endpoint", getEndpointId(replica_path)},
|
|
|
|
{"part", part_name},
|
2021-02-10 14:12:49 +00:00
|
|
|
{"client_protocol_version", toString(REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)},
|
2019-09-06 12:18:56 +00:00
|
|
|
{"compress", "false"}
|
2017-11-17 20:42:03 +00:00
|
|
|
});
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2022-02-25 17:44:13 +00:00
|
|
|
if (disk)
|
|
|
|
{
|
|
|
|
UInt64 revision = disk->getRevision();
|
|
|
|
if (revision)
|
|
|
|
uri.addQueryParameter("disk_revision", toString(revision));
|
|
|
|
}
|
|
|
|
|
2021-07-05 03:32:56 +00:00
|
|
|
Strings capability;
|
|
|
|
if (try_zero_copy && data_settings->allow_remote_fs_zero_copy_replication)
|
2020-10-08 15:45:10 +00:00
|
|
|
{
|
2021-07-05 03:32:56 +00:00
|
|
|
if (!disk)
|
2021-01-14 16:26:56 +00:00
|
|
|
{
|
2021-08-24 23:05:55 +00:00
|
|
|
Disks disks = data.getDisks();
|
|
|
|
for (const auto & data_disk : disks)
|
|
|
|
if (data_disk->supportZeroCopyReplication())
|
2022-08-19 14:58:30 +00:00
|
|
|
capability.push_back(toString(data_disk->getDataSourceDescription().type));
|
2021-06-24 08:25:05 +00:00
|
|
|
}
|
2021-07-05 03:32:56 +00:00
|
|
|
else if (disk->supportZeroCopyReplication())
|
2021-06-24 08:25:05 +00:00
|
|
|
{
|
2022-08-19 14:58:30 +00:00
|
|
|
capability.push_back(toString(disk->getDataSourceDescription().type));
|
2021-01-14 16:26:56 +00:00
|
|
|
}
|
2020-10-08 15:45:10 +00:00
|
|
|
}
|
2021-07-05 03:32:56 +00:00
|
|
|
if (!capability.empty())
|
2020-10-08 15:45:10 +00:00
|
|
|
{
|
2022-01-30 19:49:48 +00:00
|
|
|
::sort(capability.begin(), capability.end());
|
2021-08-24 23:05:55 +00:00
|
|
|
capability.erase(std::unique(capability.begin(), capability.end()), capability.end());
|
2021-07-05 03:32:56 +00:00
|
|
|
const String & remote_fs_metadata = boost::algorithm::join(capability, ", ");
|
|
|
|
uri.addQueryParameter("remote_fs_metadata", remote_fs_metadata);
|
2020-10-08 15:45:10 +00:00
|
|
|
}
|
2021-07-05 03:32:56 +00:00
|
|
|
else
|
2021-06-24 08:25:05 +00:00
|
|
|
{
|
2021-07-05 03:32:56 +00:00
|
|
|
try_zero_copy = false;
|
2020-10-08 15:45:10 +00:00
|
|
|
}
|
|
|
|
|
2018-07-26 15:10:57 +00:00
|
|
|
Poco::Net::HTTPBasicCredentials creds{};
|
|
|
|
if (!user.empty())
|
|
|
|
{
|
|
|
|
creds.setUsername(user);
|
|
|
|
creds.setPassword(password);
|
|
|
|
}
|
|
|
|
|
2022-05-27 11:13:36 +00:00
|
|
|
std::unique_ptr<PooledReadWriteBufferFromHTTP> in = std::make_unique<PooledReadWriteBufferFromHTTP>(
|
2019-09-06 12:18:56 +00:00
|
|
|
uri,
|
2018-11-16 13:15:17 +00:00
|
|
|
Poco::Net::HTTPRequest::HTTP_POST,
|
2022-05-27 11:13:36 +00:00
|
|
|
nullptr,
|
2018-11-16 13:15:17 +00:00
|
|
|
timeouts,
|
|
|
|
creds,
|
|
|
|
DBMS_DEFAULT_BUFFER_SIZE,
|
2019-09-19 07:33:54 +00:00
|
|
|
0, /* no redirects */
|
2022-05-27 11:13:36 +00:00
|
|
|
static_cast<uint64_t>(data_settings->replicated_max_parallel_fetches_for_host));
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2022-05-27 11:13:36 +00:00
|
|
|
int server_protocol_version = parse<int>(in->getResponseCookie("server_protocol_version", "0"));
|
2019-09-09 12:28:28 +00:00
|
|
|
|
2019-11-27 09:39:44 +00:00
|
|
|
ReservationPtr reservation;
|
2020-06-26 21:55:48 +00:00
|
|
|
size_t sum_files_size = 0;
|
2020-02-27 10:43:38 +00:00
|
|
|
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
|
2019-09-06 12:18:56 +00:00
|
|
|
{
|
2022-05-27 11:13:36 +00:00
|
|
|
readBinary(sum_files_size, *in);
|
2020-02-27 10:43:38 +00:00
|
|
|
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
|
2020-01-30 10:21:40 +00:00
|
|
|
{
|
2020-02-20 12:36:55 +00:00
|
|
|
IMergeTreeDataPart::TTLInfos ttl_infos;
|
2020-01-30 10:21:40 +00:00
|
|
|
String ttl_infos_string;
|
2022-05-27 11:13:36 +00:00
|
|
|
readBinary(ttl_infos_string, *in);
|
2020-01-30 10:21:40 +00:00
|
|
|
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
|
|
|
|
assertString("ttl format version: 1\n", ttl_infos_buffer);
|
|
|
|
ttl_infos.read(ttl_infos_buffer);
|
2021-07-05 03:32:56 +00:00
|
|
|
if (!disk)
|
|
|
|
{
|
2021-02-18 08:50:31 +00:00
|
|
|
reservation
|
2021-07-05 03:32:56 +00:00
|
|
|
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
|
|
|
|
if (!reservation)
|
|
|
|
reservation
|
|
|
|
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
|
|
|
|
}
|
2020-01-30 10:21:40 +00:00
|
|
|
}
|
2021-07-05 03:32:56 +00:00
|
|
|
else if (!disk)
|
2021-02-18 08:50:31 +00:00
|
|
|
{
|
|
|
|
reservation = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, nullptr);
|
|
|
|
if (!reservation)
|
|
|
|
reservation = data.reserveSpace(sum_files_size);
|
|
|
|
}
|
2019-09-06 12:18:56 +00:00
|
|
|
}
|
2021-07-05 03:32:56 +00:00
|
|
|
else if (!disk)
|
2019-09-06 12:18:56 +00:00
|
|
|
{
|
2019-09-06 15:09:20 +00:00
|
|
|
/// We don't know real size of part because sender server version is too old
|
|
|
|
reservation = data.makeEmptyReservationOnLargestDisk();
|
2019-09-06 12:18:56 +00:00
|
|
|
}
|
2021-07-05 03:32:56 +00:00
|
|
|
if (!disk)
|
|
|
|
disk = reservation->getDisk();
|
2019-09-06 12:18:56 +00:00
|
|
|
|
2022-05-27 11:44:29 +00:00
|
|
|
UInt64 revision = parse<UInt64>(in->getResponseCookie("disk_revision", "0"));
|
2022-02-25 17:44:13 +00:00
|
|
|
if (revision)
|
|
|
|
disk->syncRevision(revision);
|
|
|
|
|
2020-07-02 23:41:37 +00:00
|
|
|
bool sync = (data_settings->min_compressed_bytes_to_fsync_after_fetch
|
|
|
|
&& sum_files_size >= data_settings->min_compressed_bytes_to_fsync_after_fetch);
|
2020-06-26 21:55:48 +00:00
|
|
|
|
2020-05-14 20:08:15 +00:00
|
|
|
String part_type = "Wide";
|
2020-06-26 11:38:37 +00:00
|
|
|
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
|
2022-05-27 11:13:36 +00:00
|
|
|
readStringBinary(part_type, *in);
|
2020-05-14 20:08:15 +00:00
|
|
|
|
2020-11-02 14:38:18 +00:00
|
|
|
UUID part_uuid = UUIDHelpers::Nil;
|
2020-10-29 16:18:25 +00:00
|
|
|
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
|
2022-05-27 11:13:36 +00:00
|
|
|
readUUIDText(part_uuid, *in);
|
2020-10-29 16:18:25 +00:00
|
|
|
|
2022-05-27 11:13:36 +00:00
|
|
|
String remote_fs_metadata = parse<String>(in->getResponseCookie("remote_fs_metadata", ""));
|
2021-07-05 03:32:56 +00:00
|
|
|
if (!remote_fs_metadata.empty())
|
|
|
|
{
|
|
|
|
if (!try_zero_copy)
|
|
|
|
throw Exception("Got unexpected 'remote_fs_metadata' cookie", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
if (std::find(capability.begin(), capability.end(), remote_fs_metadata) == capability.end())
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got 'remote_fs_metadata' cookie {}, expect one from {}", remote_fs_metadata, fmt::join(capability, ", "));
|
2021-07-05 03:32:56 +00:00
|
|
|
if (server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got 'remote_fs_metadata' cookie with old protocol version {}", server_protocol_version);
|
2021-07-05 03:32:56 +00:00
|
|
|
if (part_type == "InMemory")
|
|
|
|
throw Exception("Got 'remote_fs_metadata' cookie for in-memory part", ErrorCodes::INCORRECT_PART_TYPE);
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2022-08-09 16:44:51 +00:00
|
|
|
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix, disk, *in, throttler);
|
2021-07-05 03:32:56 +00:00
|
|
|
}
|
|
|
|
catch (const Exception & e)
|
|
|
|
{
|
|
|
|
if (e.code() != ErrorCodes::S3_ERROR && e.code() != ErrorCodes::ZERO_COPY_REPLICATION_ERROR)
|
|
|
|
throw;
|
2022-04-22 17:18:18 +00:00
|
|
|
|
Use fmt::runtime() for LOG_* for non constexpr
Here is oneliner:
$ gg 'LOG_\(DEBUG\|TRACE\|INFO\|TEST\|WARNING\|ERROR\|FATAL\)([^,]*, [a-zA-Z]' -- :*.cpp :*.h | cut -d: -f1 | sort -u | xargs -r sed -E -i 's#(LOG_[A-Z]*)\(([^,]*), ([A-Za-z][^,)]*)#\1(\2, fmt::runtime(\3)#'
Note, that I tried to do this with coccinelle (tool for semantic
patchin), but it cannot parse C++:
$ cat fmt.cocci
@@
expression log;
expression var;
@@
-LOG_DEBUG(log, var)
+LOG_DEBUG(log, fmt::runtime(var))
I've also tried to use some macros/templates magic to do this implicitly
in logger_useful.h, but I failed to do so, and apparently it is not
possible for now.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
v2: manual fixes
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-02-01 09:10:27 +00:00
|
|
|
LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy."));
|
2022-05-27 11:13:36 +00:00
|
|
|
|
|
|
|
/// It's important to release session from HTTP pool. Otherwise it's possible to get deadlock
|
|
|
|
/// on http pool.
|
|
|
|
try
|
|
|
|
{
|
|
|
|
in.reset();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(log);
|
|
|
|
}
|
|
|
|
|
2022-08-10 13:48:56 +00:00
|
|
|
temporary_directory_lock = {};
|
|
|
|
|
2021-07-05 03:32:56 +00:00
|
|
|
/// Try again but without zero-copy
|
|
|
|
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
|
2022-08-09 16:44:51 +00:00
|
|
|
user, password, interserver_scheme, throttler, to_detached, tmp_prefix, nullptr, false, disk);
|
2021-07-05 03:32:56 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-10-27 12:47:42 +00:00
|
|
|
auto storage_id = data.getStorageID();
|
2021-07-05 03:32:56 +00:00
|
|
|
String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
|
2021-04-10 23:33:54 +00:00
|
|
|
auto entry = data.getContext()->getReplicatedFetchList().insert(
|
2020-10-27 12:47:42 +00:00
|
|
|
storage_id.getDatabaseName(), storage_id.getTableName(),
|
|
|
|
part_info.partition_id, part_name, new_part_path,
|
|
|
|
replica_path, uri, to_detached, sum_files_size);
|
|
|
|
|
2022-05-27 11:13:36 +00:00
|
|
|
in->setNextCallback(ReplicatedFetchReadCallback(*entry));
|
2020-10-27 12:47:42 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
size_t projections = 0;
|
|
|
|
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
|
2022-05-27 11:13:36 +00:00
|
|
|
readBinary(projections, *in);
|
2021-02-10 14:12:49 +00:00
|
|
|
|
|
|
|
MergeTreeData::DataPart::Checksums checksums;
|
|
|
|
return part_type == "InMemory"
|
2022-05-27 11:13:36 +00:00
|
|
|
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, *in, projections, throttler)
|
2022-08-09 16:44:51 +00:00
|
|
|
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix, sync, disk, *in, projections, checksums, throttler);
|
2020-04-29 17:14:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
|
|
|
const String & part_name,
|
2020-10-29 16:18:25 +00:00
|
|
|
const UUID & part_uuid,
|
2020-06-26 11:30:23 +00:00
|
|
|
const StorageMetadataPtr & metadata_snapshot,
|
2021-05-21 16:14:01 +00:00
|
|
|
ContextPtr context,
|
2021-07-05 03:32:56 +00:00
|
|
|
DiskPtr disk,
|
2021-02-10 14:12:49 +00:00
|
|
|
PooledReadWriteBufferFromHTTP & in,
|
2021-05-26 20:37:44 +00:00
|
|
|
size_t projections,
|
|
|
|
ThrottlerPtr throttler)
|
2020-04-29 17:14:49 +00:00
|
|
|
{
|
2021-07-05 03:32:56 +00:00
|
|
|
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
|
2022-04-21 19:19:13 +00:00
|
|
|
|
|
|
|
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
|
|
|
|
volume,
|
|
|
|
data.getRelativeDataPath(),
|
|
|
|
part_name);
|
|
|
|
|
|
|
|
auto data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
|
|
|
|
volume,
|
|
|
|
data.getRelativeDataPath(),
|
|
|
|
part_name);
|
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
MergeTreeData::MutableDataPartPtr new_data_part =
|
2022-04-21 19:19:13 +00:00
|
|
|
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, data_part_storage);
|
2022-02-15 15:00:45 +00:00
|
|
|
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
|
2021-02-10 14:12:49 +00:00
|
|
|
|
|
|
|
for (auto i = 0ul; i < projections; ++i)
|
|
|
|
{
|
|
|
|
String projection_name;
|
|
|
|
readStringBinary(projection_name, in);
|
|
|
|
MergeTreeData::DataPart::Checksums checksums;
|
|
|
|
if (!checksums.read(in))
|
|
|
|
throw Exception("Cannot deserialize checksums", ErrorCodes::CORRUPTED_DATA);
|
|
|
|
|
2021-10-08 17:21:19 +00:00
|
|
|
NativeReader block_in(in, 0);
|
2021-02-10 14:12:49 +00:00
|
|
|
auto block = block_in.read();
|
2021-05-26 20:37:44 +00:00
|
|
|
throttler->add(block.bytes());
|
2021-02-10 14:12:49 +00:00
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");
|
|
|
|
auto projection_part_storage_builder = data_part_storage_builder->getProjection(projection_name + ".proj");
|
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
MergeTreePartInfo new_part_info("all", 0, 0, 0);
|
|
|
|
MergeTreeData::MutableDataPartPtr new_projection_part =
|
2022-04-21 19:19:13 +00:00
|
|
|
std::make_shared<MergeTreeDataPartInMemory>(data, projection_name, new_part_info, projection_part_storage, new_data_part.get());
|
2021-02-10 14:12:49 +00:00
|
|
|
|
|
|
|
new_projection_part->is_temp = false;
|
2022-07-27 14:05:16 +00:00
|
|
|
new_projection_part->setColumns(block.getNamesAndTypesList(), {});
|
2021-02-10 14:12:49 +00:00
|
|
|
MergeTreePartition partition{};
|
|
|
|
new_projection_part->partition = std::move(partition);
|
2021-09-16 21:19:58 +00:00
|
|
|
new_projection_part->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
|
2021-02-10 14:12:49 +00:00
|
|
|
|
|
|
|
MergedBlockOutputStream part_out(
|
|
|
|
new_projection_part,
|
2022-04-21 19:19:13 +00:00
|
|
|
projection_part_storage_builder,
|
2021-02-10 14:12:49 +00:00
|
|
|
metadata_snapshot->projections.get(projection_name).metadata,
|
|
|
|
block.getNamesAndTypesList(),
|
|
|
|
{},
|
2022-02-14 19:50:08 +00:00
|
|
|
CompressionCodecFactory::instance().get("NONE", {}),
|
2022-03-16 19:16:26 +00:00
|
|
|
NO_TRANSACTION_PTR);
|
2021-05-14 21:45:13 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
part_out.write(block);
|
2022-02-08 08:01:26 +00:00
|
|
|
part_out.finalizePart(new_projection_part, false);
|
2021-02-10 14:12:49 +00:00
|
|
|
new_projection_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
|
|
|
|
new_data_part->addProjectionPart(projection_name, std::move(new_projection_part));
|
|
|
|
}
|
|
|
|
|
2020-05-05 01:27:31 +00:00
|
|
|
MergeTreeData::DataPart::Checksums checksums;
|
|
|
|
if (!checksums.read(in))
|
|
|
|
throw Exception("Cannot deserialize checksums", ErrorCodes::CORRUPTED_DATA);
|
|
|
|
|
2021-10-08 17:21:19 +00:00
|
|
|
NativeReader block_in(in, 0);
|
2020-06-03 18:59:18 +00:00
|
|
|
auto block = block_in.read();
|
2021-05-26 20:37:44 +00:00
|
|
|
throttler->add(block.bytes());
|
2020-06-03 13:27:54 +00:00
|
|
|
|
2020-10-29 16:18:25 +00:00
|
|
|
new_data_part->uuid = part_uuid;
|
2020-04-29 17:14:49 +00:00
|
|
|
new_data_part->is_temp = true;
|
2022-07-27 14:05:16 +00:00
|
|
|
new_data_part->setColumns(block.getNamesAndTypesList(), {});
|
2021-09-16 21:19:58 +00:00
|
|
|
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
|
2021-05-21 16:14:01 +00:00
|
|
|
new_data_part->partition.create(metadata_snapshot, block, 0, context);
|
2020-04-29 17:14:49 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
MergedBlockOutputStream part_out(
|
2022-04-21 19:19:13 +00:00
|
|
|
new_data_part, data_part_storage_builder, metadata_snapshot, block.getNamesAndTypesList(), {},
|
2022-03-16 19:16:26 +00:00
|
|
|
CompressionCodecFactory::instance().get("NONE", {}), NO_TRANSACTION_PTR);
|
2021-04-15 21:47:11 +00:00
|
|
|
|
2020-04-29 17:14:49 +00:00
|
|
|
part_out.write(block);
|
2022-02-08 08:01:26 +00:00
|
|
|
part_out.finalizePart(new_data_part, false);
|
2020-05-05 01:27:31 +00:00
|
|
|
new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
|
2020-04-29 17:14:49 +00:00
|
|
|
|
|
|
|
return new_data_part;
|
2019-05-12 14:57:23 +00:00
|
|
|
}
|
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
void Fetcher::downloadBaseOrProjectionPartToDisk(
|
2019-05-12 14:57:23 +00:00
|
|
|
const String & replica_path,
|
2022-04-21 19:19:13 +00:00
|
|
|
DataPartStorageBuilderPtr & data_part_storage_builder,
|
2020-06-26 21:55:48 +00:00
|
|
|
bool sync,
|
2021-02-10 14:12:49 +00:00
|
|
|
PooledReadWriteBufferFromHTTP & in,
|
2021-05-26 20:37:44 +00:00
|
|
|
MergeTreeData::DataPart::Checksums & checksums,
|
|
|
|
ThrottlerPtr throttler) const
|
2019-05-12 14:57:23 +00:00
|
|
|
{
|
|
|
|
size_t files;
|
|
|
|
readBinary(files, in);
|
|
|
|
|
2017-04-01 07:20:54 +00:00
|
|
|
for (size_t i = 0; i < files; ++i)
|
|
|
|
{
|
|
|
|
String file_name;
|
|
|
|
UInt64 file_size;
|
|
|
|
|
|
|
|
readStringBinary(file_name, in);
|
|
|
|
readBinary(file_size, in);
|
|
|
|
|
2019-07-31 18:21:13 +00:00
|
|
|
/// File must be inside "absolute_part_path" directory.
|
|
|
|
/// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.
|
2022-06-20 18:18:17 +00:00
|
|
|
String absolute_file_path = fs::weakly_canonical(fs::path(data_part_storage_builder->getRelativePath()) / file_name);
|
|
|
|
if (!startsWith(absolute_file_path, fs::weakly_canonical(data_part_storage_builder->getRelativePath()).string()))
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::INSECURE_PATH,
|
|
|
|
"File path ({}) doesn't appear to be inside part path ({}). "
|
|
|
|
"This may happen if we are trying to download part from malicious replica or logical error.",
|
2022-06-20 18:18:17 +00:00
|
|
|
absolute_file_path, data_part_storage_builder->getRelativePath());
|
2019-07-31 18:21:13 +00:00
|
|
|
|
2022-08-08 13:32:49 +00:00
|
|
|
auto file_out = data_part_storage_builder->writeFile(file_name, std::min<UInt64>(file_size, DBMS_DEFAULT_BUFFER_SIZE), {});
|
2020-04-08 08:41:13 +00:00
|
|
|
HashingWriteBuffer hashing_out(*file_out);
|
2021-05-26 20:37:44 +00:00
|
|
|
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2017-10-06 16:53:55 +00:00
|
|
|
if (blocker.isCancelled())
|
2017-04-01 07:20:54 +00:00
|
|
|
{
|
2020-06-16 02:14:53 +00:00
|
|
|
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,
|
|
|
|
/// performing a poll with a not very large timeout.
|
2017-04-01 07:20:54 +00:00
|
|
|
/// And now we check it only between read chunks (in the `copyData` function).
|
2022-04-21 19:19:13 +00:00
|
|
|
data_part_storage_builder->removeRecursive();
|
2017-04-01 07:20:54 +00:00
|
|
|
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
|
|
|
|
}
|
|
|
|
|
2017-06-21 01:24:05 +00:00
|
|
|
MergeTreeDataPartChecksum::uint128 expected_hash;
|
|
|
|
readPODBinary(expected_hash, in);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (expected_hash != hashing_out.getHash())
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH,
|
|
|
|
"Checksum mismatch for file {} transferred from {}",
|
2022-04-21 19:19:13 +00:00
|
|
|
(fs::path(data_part_storage_builder->getFullPath()) / file_name).string(),
|
2022-02-01 10:00:23 +00:00
|
|
|
replica_path);
|
2017-04-01 07:20:54 +00:00
|
|
|
|
|
|
|
if (file_name != "checksums.txt" &&
|
2020-08-26 15:29:46 +00:00
|
|
|
file_name != "columns.txt" &&
|
|
|
|
file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME)
|
2017-04-01 07:20:54 +00:00
|
|
|
checksums.addFile(file_name, file_size, expected_hash);
|
2020-06-26 21:55:48 +00:00
|
|
|
|
|
|
|
if (sync)
|
|
|
|
hashing_out.sync();
|
2017-04-01 07:20:54 +00:00
|
|
|
}
|
2021-02-10 14:12:49 +00:00
|
|
|
}
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
|
|
|
|
const String & part_name,
|
|
|
|
const String & replica_path,
|
|
|
|
bool to_detached,
|
2022-08-09 16:44:51 +00:00
|
|
|
const String & tmp_prefix,
|
2021-02-10 14:12:49 +00:00
|
|
|
bool sync,
|
|
|
|
DiskPtr disk,
|
|
|
|
PooledReadWriteBufferFromHTTP & in,
|
|
|
|
size_t projections,
|
2021-05-26 20:37:44 +00:00
|
|
|
MergeTreeData::DataPart::Checksums & checksums,
|
|
|
|
ThrottlerPtr throttler)
|
2021-02-10 14:12:49 +00:00
|
|
|
{
|
2022-08-09 16:44:51 +00:00
|
|
|
assert(!tmp_prefix.empty());
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
/// We will remove directory if it's already exists. Make precautions.
|
|
|
|
if (tmp_prefix.empty() //-V560
|
|
|
|
|| part_name.empty()
|
|
|
|
|| std::string::npos != tmp_prefix.find_first_of("/.")
|
|
|
|
|| std::string::npos != part_name.find_first_of("/."))
|
|
|
|
throw Exception("Logical error: tmp_prefix and part_name cannot be empty or contain '.' or '/' characters.", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
String part_dir = tmp_prefix + part_name;
|
|
|
|
String part_relative_path = data.getRelativeDataPath() + String(to_detached ? "detached/" : "");
|
|
|
|
|
|
|
|
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
|
|
|
|
|
|
|
|
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
|
|
|
|
volume,
|
|
|
|
part_relative_path,
|
|
|
|
part_dir);
|
|
|
|
|
|
|
|
DataPartStorageBuilderPtr data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
|
|
|
|
volume,
|
|
|
|
part_relative_path,
|
|
|
|
part_dir);
|
2021-02-10 14:12:49 +00:00
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
if (data_part_storage_builder->exists())
|
2021-02-10 14:12:49 +00:00
|
|
|
{
|
|
|
|
LOG_WARNING(log, "Directory {} already exists, probably result of a failed fetch. Will remove it before fetching part.",
|
2022-04-21 19:19:13 +00:00
|
|
|
data_part_storage_builder->getFullPath());
|
|
|
|
data_part_storage_builder->removeRecursive();
|
2021-02-10 14:12:49 +00:00
|
|
|
}
|
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
data_part_storage_builder->createDirectories();
|
2021-02-10 14:12:49 +00:00
|
|
|
|
|
|
|
SyncGuardPtr sync_guard;
|
|
|
|
if (data.getSettings()->fsync_part_directory)
|
2022-07-08 07:19:59 +00:00
|
|
|
sync_guard = disk->getDirectorySyncGuard(data_part_storage->getRelativePath());
|
2017-04-01 07:20:54 +00:00
|
|
|
|
2021-02-10 14:12:49 +00:00
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
|
|
|
|
|
|
|
|
for (auto i = 0ul; i < projections; ++i)
|
|
|
|
{
|
|
|
|
String projection_name;
|
|
|
|
readStringBinary(projection_name, in);
|
|
|
|
MergeTreeData::DataPart::Checksums projection_checksum;
|
2022-04-21 19:19:13 +00:00
|
|
|
|
|
|
|
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");
|
|
|
|
auto projection_part_storage_builder = data_part_storage_builder->getProjection(projection_name + ".proj");
|
|
|
|
|
|
|
|
projection_part_storage_builder->createDirectories();
|
2021-02-10 14:12:49 +00:00
|
|
|
downloadBaseOrProjectionPartToDisk(
|
2022-04-21 19:19:13 +00:00
|
|
|
replica_path, projection_part_storage_builder, sync, in, projection_checksum, throttler);
|
2021-02-10 14:12:49 +00:00
|
|
|
checksums.addFile(
|
|
|
|
projection_name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());
|
|
|
|
}
|
|
|
|
|
|
|
|
// Download the base part
|
2022-04-21 19:19:13 +00:00
|
|
|
downloadBaseOrProjectionPartToDisk(replica_path, data_part_storage_builder, sync, in, checksums, throttler);
|
2021-02-10 14:12:49 +00:00
|
|
|
|
|
|
|
assertEOF(in);
|
2022-04-21 19:19:13 +00:00
|
|
|
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, data_part_storage);
|
2022-02-15 15:00:45 +00:00
|
|
|
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
|
2020-02-11 23:29:34 +00:00
|
|
|
new_data_part->is_temp = true;
|
2017-08-04 14:00:26 +00:00
|
|
|
new_data_part->modification_time = time(nullptr);
|
2017-08-16 19:24:50 +00:00
|
|
|
new_data_part->loadColumnsChecksumsIndexes(true, false);
|
2017-04-01 07:20:54 +00:00
|
|
|
new_data_part->checksums.checkEqual(checksums, false);
|
|
|
|
return new_data_part;
|
2014-07-22 13:49:52 +00:00
|
|
|
}
|
|
|
|
|
2021-06-24 08:25:05 +00:00
|
|
|
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
|
2020-10-08 15:45:10 +00:00
|
|
|
const String & part_name,
|
|
|
|
const String & replica_path,
|
|
|
|
bool to_detached,
|
2022-08-09 16:44:51 +00:00
|
|
|
const String & tmp_prefix,
|
2021-07-05 03:32:56 +00:00
|
|
|
DiskPtr disk,
|
2021-05-26 20:37:44 +00:00
|
|
|
PooledReadWriteBufferFromHTTP & in,
|
|
|
|
ThrottlerPtr throttler)
|
2020-10-08 15:45:10 +00:00
|
|
|
{
|
2020-11-03 08:58:26 +00:00
|
|
|
String part_id;
|
|
|
|
readStringBinary(part_id, in);
|
|
|
|
|
2021-07-05 03:32:56 +00:00
|
|
|
if (!disk->supportZeroCopyReplication() || !disk->checkUniqueId(part_id))
|
2020-11-03 08:58:26 +00:00
|
|
|
{
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName());
|
2020-11-03 08:58:26 +00:00
|
|
|
}
|
2022-02-10 11:15:08 +00:00
|
|
|
|
2021-07-05 03:32:56 +00:00
|
|
|
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",
|
|
|
|
part_name, part_id, disk->getName());
|
2020-11-03 08:58:26 +00:00
|
|
|
|
2022-02-14 09:20:27 +00:00
|
|
|
data.lockSharedDataTemporary(part_name, part_id, disk);
|
|
|
|
|
2022-08-09 16:44:51 +00:00
|
|
|
assert(!tmp_prefix.empty());
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
String part_dir = tmp_prefix + part_name;
|
|
|
|
String part_relative_path = data.getRelativeDataPath() + String(to_detached ? "detached/" : "");
|
|
|
|
|
|
|
|
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
auto data_part_storage = std::make_shared<DataPartStorageOnDisk>(
|
|
|
|
volume,
|
|
|
|
part_relative_path,
|
|
|
|
part_dir);
|
|
|
|
|
|
|
|
DataPartStorageBuilderPtr data_part_storage_builder = std::make_shared<DataPartStorageBuilderOnDisk>(
|
|
|
|
volume,
|
|
|
|
part_relative_path,
|
|
|
|
part_dir);
|
|
|
|
|
2022-06-29 13:08:16 +00:00
|
|
|
if (data_part_storage->exists())
|
|
|
|
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Directory {} already exists.", data_part_storage->getFullPath());
|
2020-10-08 15:45:10 +00:00
|
|
|
|
|
|
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedFetch};
|
|
|
|
|
2022-06-29 13:08:16 +00:00
|
|
|
volume->getDisk()->createDirectories(data_part_storage->getFullPath());
|
2020-10-08 15:45:10 +00:00
|
|
|
|
|
|
|
size_t files;
|
|
|
|
readBinary(files, in);
|
|
|
|
|
|
|
|
for (size_t i = 0; i < files; ++i)
|
|
|
|
{
|
|
|
|
String file_name;
|
|
|
|
UInt64 file_size;
|
|
|
|
|
|
|
|
readStringBinary(file_name, in);
|
|
|
|
readBinary(file_size, in);
|
|
|
|
|
2022-06-29 13:08:16 +00:00
|
|
|
String metadata_file = fs::path(data_part_storage->getFullPath()) / file_name;
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2020-10-14 15:05:59 +00:00
|
|
|
{
|
2021-01-20 09:48:22 +00:00
|
|
|
auto file_out = std::make_unique<WriteBufferFromFile>(metadata_file, DBMS_DEFAULT_BUFFER_SIZE, -1, 0666, nullptr, 0);
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2020-10-14 15:05:59 +00:00
|
|
|
HashingWriteBuffer hashing_out(*file_out);
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2021-05-26 20:37:44 +00:00
|
|
|
copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2020-10-14 15:05:59 +00:00
|
|
|
if (blocker.isCancelled())
|
|
|
|
{
|
|
|
|
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,
|
|
|
|
/// performing a poll with a not very large timeout.
|
|
|
|
/// And now we check it only between read chunks (in the `copyData` function).
|
2022-04-21 19:19:13 +00:00
|
|
|
data_part_storage_builder->removeSharedRecursive(true);
|
2022-06-29 13:08:16 +00:00
|
|
|
data_part_storage_builder->commit();
|
2020-10-14 15:05:59 +00:00
|
|
|
throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);
|
|
|
|
}
|
|
|
|
|
|
|
|
MergeTreeDataPartChecksum::uint128 expected_hash;
|
|
|
|
readPODBinary(expected_hash, in);
|
|
|
|
|
|
|
|
if (expected_hash != hashing_out.getHash())
|
|
|
|
{
|
2022-02-01 10:00:23 +00:00
|
|
|
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH,
|
|
|
|
"Checksum mismatch for file {} transferred from {}",
|
|
|
|
metadata_file, replica_path);
|
2020-10-14 15:05:59 +00:00
|
|
|
}
|
2020-10-08 15:45:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
assertEOF(in);
|
|
|
|
|
2022-06-29 13:08:16 +00:00
|
|
|
data_part_storage_builder->commit();
|
|
|
|
|
2022-04-21 19:19:13 +00:00
|
|
|
MergeTreeData::MutableDataPartPtr new_data_part = data.createPart(part_name, data_part_storage);
|
2022-02-17 21:26:37 +00:00
|
|
|
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
|
2020-10-08 15:45:10 +00:00
|
|
|
new_data_part->is_temp = true;
|
|
|
|
new_data_part->modification_time = time(nullptr);
|
|
|
|
new_data_part->loadColumnsChecksumsIndexes(true, false);
|
|
|
|
|
2022-06-30 20:51:27 +00:00
|
|
|
data.lockSharedData(*new_data_part, /* replace_existing_lock = */ true, {});
|
2020-10-08 15:45:10 +00:00
|
|
|
|
2022-02-10 11:15:08 +00:00
|
|
|
LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.",
|
|
|
|
part_name, part_id, disk->getName());
|
|
|
|
|
2020-10-08 15:45:10 +00:00
|
|
|
return new_data_part;
|
|
|
|
}
|
|
|
|
|
2014-07-22 13:49:52 +00:00
|
|
|
}
|
2016-01-28 01:00:27 +00:00
|
|
|
|
|
|
|
}
|