2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/DataPartsExchange.h>
2021-02-19 12:51:26 +00:00
2021-10-15 20:18:20 +00:00
# include <Formats/NativeWriter.h>
2021-07-05 03:32:56 +00:00
# include <Disks/IDiskRemote.h>
2021-02-19 12:51:26 +00:00
# include <Disks/SingleDiskVolume.h>
# include <Disks/createVolume.h>
# include <IO/HTTPCommon.h>
# include <Server/HTTP/HTMLForm.h>
# include <Server/HTTP/HTTPServerResponse.h>
2020-04-29 17:14:49 +00:00
# include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
# include <Storages/MergeTree/MergedBlockOutputStream.h>
2021-02-19 12:51:26 +00:00
# include <Storages/MergeTree/ReplicatedFetchList.h>
2021-05-26 20:37:44 +00:00
# include <Storages/StorageReplicatedMergeTree.h>
2017-04-01 09:19:00 +00:00
# include <Common/CurrentMetrics.h>
# include <Common/NetException.h>
2020-10-08 15:45:10 +00:00
# include <IO/createReadBufferFromFileBase.h>
2021-10-02 07:13:14 +00:00
# include <base/scope_guard.h>
2017-04-11 14:13:19 +00:00
# include <Poco/Net/HTTPRequest.h>
2021-07-05 03:32:56 +00:00
# include <iterator>
# include <regex>
2014-07-22 13:49:52 +00:00
2021-04-27 00:05:43 +00:00
namespace fs = std : : filesystem ;
2016-10-24 04:06:27 +00:00
namespace CurrentMetrics
{
2017-04-01 07:20:54 +00:00
extern const Metric ReplicatedSend ;
2016-10-24 04:06:27 +00:00
}
2014-07-22 13:49:52 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int DIRECTORY_ALREADY_EXISTS ;
extern const int NO_SUCH_DATA_PART ;
2017-04-01 07:20:54 +00:00
extern const int ABORTED ;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART ;
2017-08-09 13:31:13 +00:00
extern const int CANNOT_WRITE_TO_OSTREAM ;
2018-11-22 21:19:58 +00:00
extern const int CHECKSUM_DOESNT_MATCH ;
2019-07-31 18:21:13 +00:00
extern const int INSECURE_PATH ;
2020-05-05 01:27:31 +00:00
extern const int CORRUPTED_DATA ;
extern const int LOGICAL_ERROR ;
2020-10-08 15:45:10 +00:00
extern const int S3_ERROR ;
2021-02-26 09:48:57 +00:00
extern const int INCORRECT_PART_TYPE ;
2021-07-05 03:32:56 +00:00
extern const int ZERO_COPY_REPLICATION_ERROR ;
2016-01-11 21:46:36 +00:00
}
2016-01-28 01:00:27 +00:00
namespace DataPartsExchange
{
namespace
{
2020-03-09 01:22:33 +00:00
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE = 1 ;
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS = 2 ;
2020-05-14 20:08:15 +00:00
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE = 3 ;
2020-08-26 15:29:46 +00:00
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION = 4 ;
2020-10-15 16:17:16 +00:00
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID = 5 ;
2021-07-05 03:32:56 +00:00
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY = 6 ;
2021-02-10 14:12:49 +00:00
constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION = 7 ;
2021-08-27 12:17:58 +00:00
// Reserved for ALTER PRIMARY KEY
// constexpr auto REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PRIMARY_KEY = 8;
2020-01-30 10:21:40 +00:00
2019-09-04 16:00:20 +00:00
2016-01-28 01:00:27 +00:00
std : : string getEndpointId ( const std : : string & node_id )
{
2017-04-01 07:20:54 +00:00
return " DataPartsExchange: " + node_id ;
2016-01-28 01:00:27 +00:00
}
2020-10-27 13:09:14 +00:00
/// Simple functor for tracking fetch progress in system.replicated_fetches table.
2020-10-26 16:38:35 +00:00
struct ReplicatedFetchReadCallback
{
ReplicatedFetchList : : Entry & replicated_fetch_entry ;
2020-10-27 15:29:06 +00:00
explicit ReplicatedFetchReadCallback ( ReplicatedFetchList : : Entry & replicated_fetch_entry_ )
2020-10-26 16:38:35 +00:00
: replicated_fetch_entry ( replicated_fetch_entry_ )
{ }
void operator ( ) ( size_t bytes_count )
{
2020-10-27 12:24:10 +00:00
replicated_fetch_entry - > bytes_read_compressed . store ( bytes_count , std : : memory_order_relaxed ) ;
2020-10-30 08:52:11 +00:00
/// It's possible when we fetch part from very old clickhouse version
/// which doesn't send total size.
if ( replicated_fetch_entry - > total_size_bytes_compressed ! = 0 )
{
replicated_fetch_entry - > progress . store (
static_cast < double > ( bytes_count ) / replicated_fetch_entry - > total_size_bytes_compressed ,
std : : memory_order_relaxed ) ;
}
2020-10-26 16:38:35 +00:00
}
} ;
2016-01-28 01:00:27 +00:00
}
2021-05-26 20:37:44 +00:00
Service : : Service ( StorageReplicatedMergeTree & data_ ) :
data ( data_ ) , log ( & Poco : : Logger : : get ( data . getLogName ( ) + " (Replicated PartsService) " ) ) { }
2016-01-28 01:00:27 +00:00
std : : string Service : : getId ( const std : : string & node_id ) const
{
2017-04-01 07:20:54 +00:00
return getEndpointId ( node_id ) ;
2016-01-28 01:00:27 +00:00
}
2016-01-11 21:46:36 +00:00
2021-02-19 12:51:26 +00:00
void Service : : processQuery ( const HTMLForm & params , ReadBuffer & /*body*/ , WriteBuffer & out , HTTPServerResponse & response )
2014-07-22 13:49:52 +00:00
{
2020-02-27 10:43:38 +00:00
int client_protocol_version = parse < int > ( params . get ( " client_protocol_version " , " 0 " ) ) ;
2019-09-06 12:18:56 +00:00
2019-09-04 16:00:20 +00:00
String part_name = params . get ( " part " ) ;
2019-05-12 14:57:23 +00:00
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2017-04-01 07:20:54 +00:00
2019-08-02 20:19:06 +00:00
/// Validation of the input that may come from malicious replica.
MergeTreePartInfo : : fromPartName ( part_name , data . format_version ) ;
2020-02-27 10:43:38 +00:00
/// We pretend to work as older server version, to be sure that client will correctly process our version
2021-02-10 14:12:49 +00:00
response . addCookie ( { " server_protocol_version " , toString ( std : : min ( client_protocol_version , REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION ) ) } ) ;
2019-09-06 12:18:56 +00:00
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Sending part {} " , part_name ) ;
2017-04-01 07:20:54 +00:00
2021-02-10 14:12:49 +00:00
MergeTreeData : : DataPartPtr part ;
auto report_broken_part = [ & ] ( )
{
if ( part & & part - > isProjectionPart ( ) )
{
data . reportBrokenPart ( part - > getParentPart ( ) - > name ) ;
}
else
{
data . reportBrokenPart ( part_name ) ;
}
} ;
2017-04-01 07:20:54 +00:00
try
{
2021-02-10 14:12:49 +00:00
part = findPart ( part_name ) ;
2017-04-01 07:20:54 +00:00
CurrentMetrics : : Increment metric_increment { CurrentMetrics : : ReplicatedSend } ;
2020-02-27 10:43:38 +00:00
if ( client_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE )
2020-04-29 17:14:49 +00:00
writeBinary ( part - > checksums . getTotalSizeOnDisk ( ) , out ) ;
2019-05-12 14:57:23 +00:00
2020-02-27 10:43:38 +00:00
if ( client_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS )
2020-01-30 10:21:40 +00:00
{
WriteBufferFromOwnString ttl_infos_buffer ;
part - > ttl_infos . write ( ttl_infos_buffer ) ;
writeBinary ( ttl_infos_buffer . str ( ) , out ) ;
}
2020-05-14 20:08:15 +00:00
if ( client_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE )
writeStringBinary ( part - > getType ( ) . toString ( ) , out ) ;
2017-04-01 07:20:54 +00:00
2020-10-29 16:18:25 +00:00
if ( client_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID )
writeUUIDText ( part - > uuid , out ) ;
2021-07-05 03:32:56 +00:00
String remote_fs_metadata = parse < String > ( params . get ( " remote_fs_metadata " , " " ) ) ;
std : : regex re ( " \\ s*, \\ s* " ) ;
Strings capability (
std : : sregex_token_iterator ( remote_fs_metadata . begin ( ) , remote_fs_metadata . end ( ) , re , - 1 ) ,
std : : sregex_token_iterator ( ) ) ;
2020-10-08 15:45:10 +00:00
2021-07-05 03:32:56 +00:00
if ( data_settings - > allow_remote_fs_zero_copy_replication & &
client_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY )
2021-06-24 08:25:05 +00:00
{
2021-07-05 03:32:56 +00:00
auto disk = part - > volume - > getDisk ( ) ;
2021-08-24 22:24:47 +00:00
auto disk_type = toString ( disk - > getType ( ) ) ;
2021-07-05 03:32:56 +00:00
if ( disk - > supportZeroCopyReplication ( ) & & std : : find ( capability . begin ( ) , capability . end ( ) , disk_type ) ! = capability . end ( ) )
2021-02-10 14:12:49 +00:00
{
2021-07-05 03:32:56 +00:00
/// Send metadata if the receiver's capability covers the source disk type.
response . addCookie ( { " remote_fs_metadata " , disk_type } ) ;
sendPartFromDiskRemoteMeta ( part , out ) ;
return ;
2020-10-08 15:45:10 +00:00
}
2021-02-10 14:12:49 +00:00
}
2021-07-05 03:32:56 +00:00
if ( client_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION )
2021-02-10 14:12:49 +00:00
{
const auto & projections = part - > getProjectionParts ( ) ;
writeBinary ( projections . size ( ) , out ) ;
if ( isInMemoryPart ( part ) )
sendPartFromMemory ( part , out , projections ) ;
else
sendPartFromDisk ( part , out , client_protocol_version , projections ) ;
}
else
{
if ( isInMemoryPart ( part ) )
sendPartFromMemory ( part , out ) ;
2020-10-08 15:45:10 +00:00
else
2020-12-16 15:31:13 +00:00
sendPartFromDisk ( part , out , client_protocol_version ) ;
2020-08-26 15:29:46 +00:00
}
2017-04-01 07:20:54 +00:00
}
2018-08-10 04:02:56 +00:00
catch ( const NetException & )
2017-04-01 07:20:54 +00:00
{
2018-03-23 16:33:51 +00:00
/// Network error or error on remote side. No need to enqueue part for check.
2017-04-01 07:20:54 +00:00
throw ;
}
catch ( const Exception & e )
{
2017-08-09 13:31:13 +00:00
if ( e . code ( ) ! = ErrorCodes : : ABORTED & & e . code ( ) ! = ErrorCodes : : CANNOT_WRITE_TO_OSTREAM )
2021-02-10 14:12:49 +00:00
report_broken_part ( ) ;
2017-04-01 07:20:54 +00:00
throw ;
}
catch ( . . . )
{
2021-02-10 14:12:49 +00:00
report_broken_part ( ) ;
2017-04-01 07:20:54 +00:00
throw ;
}
2014-07-22 13:49:52 +00:00
}
2021-02-10 14:12:49 +00:00
void Service : : sendPartFromMemory (
const MergeTreeData : : DataPartPtr & part , WriteBuffer & out , const std : : map < String , std : : shared_ptr < IMergeTreeDataPart > > & projections )
2020-04-29 17:14:49 +00:00
{
2020-06-26 11:30:23 +00:00
auto metadata_snapshot = data . getInMemoryMetadataPtr ( ) ;
2021-02-10 14:12:49 +00:00
for ( const auto & [ name , projection ] : projections )
{
auto projection_sample_block = metadata_snapshot - > projections . get ( name ) . sample_block ;
auto part_in_memory = asInMemoryPart ( projection ) ;
if ( ! part_in_memory )
throw Exception ( " Projection " + name + " of part " + part - > name + " is not stored in memory " , ErrorCodes : : LOGICAL_ERROR ) ;
writeStringBinary ( name , out ) ;
projection - > checksums . write ( out ) ;
2021-10-08 17:21:19 +00:00
NativeWriter block_out ( out , 0 , projection_sample_block ) ;
2021-02-10 14:12:49 +00:00
block_out . write ( part_in_memory - > block ) ;
}
2020-06-05 20:47:46 +00:00
auto part_in_memory = asInMemoryPart ( part ) ;
2020-04-29 17:14:49 +00:00
if ( ! part_in_memory )
2020-05-05 01:27:31 +00:00
throw Exception ( " Part " + part - > name + " is not stored in memory " , ErrorCodes : : LOGICAL_ERROR ) ;
2021-10-08 17:21:19 +00:00
NativeWriter block_out ( out , 0 , metadata_snapshot - > getSampleBlock ( ) ) ;
2020-05-05 01:27:31 +00:00
part - > checksums . write ( out ) ;
2020-04-29 17:14:49 +00:00
block_out . write ( part_in_memory - > block ) ;
2021-05-26 20:37:44 +00:00
data . getSendsThrottler ( ) - > add ( part_in_memory - > block . bytes ( ) ) ;
2020-04-29 17:14:49 +00:00
}
2021-02-10 14:12:49 +00:00
MergeTreeData : : DataPart : : Checksums Service : : sendPartFromDisk (
const MergeTreeData : : DataPartPtr & part ,
WriteBuffer & out ,
int client_protocol_version ,
const std : : map < String , std : : shared_ptr < IMergeTreeDataPart > > & projections )
2020-04-29 17:14:49 +00:00
{
/// We'll take a list of files from the list of checksums.
MergeTreeData : : DataPart : : Checksums checksums = part - > checksums ;
/// Add files that are not in the checksum list.
2020-08-26 15:29:46 +00:00
auto file_names_without_checksums = part - > getFileNamesWithoutChecksums ( ) ;
for ( const auto & file_name : file_names_without_checksums )
{
2020-10-15 16:17:16 +00:00
if ( client_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_DEFAULT_COMPRESSION & & file_name = = IMergeTreeDataPart : : DEFAULT_COMPRESSION_CODEC_FILE_NAME )
2020-08-26 15:29:46 +00:00
continue ;
2020-10-15 16:17:16 +00:00
2020-08-26 15:29:46 +00:00
checksums . files [ file_name ] = { } ;
}
2020-04-29 17:14:49 +00:00
2020-08-26 15:29:46 +00:00
auto disk = part - > volume - > getDisk ( ) ;
2020-04-29 17:14:49 +00:00
MergeTreeData : : DataPart : : Checksums data_checksums ;
2021-02-10 14:12:49 +00:00
for ( const auto & [ name , projection ] : part - > getProjectionParts ( ) )
{
// Get rid of projection files
checksums . files . erase ( name + " .proj " ) ;
auto it = projections . find ( name ) ;
if ( it ! = projections . end ( ) )
{
writeStringBinary ( name , out ) ;
MergeTreeData : : DataPart : : Checksums projection_checksum = sendPartFromDisk ( it - > second , out , client_protocol_version ) ;
data_checksums . addFile ( name + " .proj " , projection_checksum . getTotalSizeOnDisk ( ) , projection_checksum . getTotalChecksumUInt128 ( ) ) ;
}
else if ( part - > checksums . has ( name + " .proj " ) )
{
// We don't send this projection, just add out checksum to bypass the following check
const auto & our_checksum = part - > checksums . files . find ( name + " .proj " ) - > second ;
data_checksums . addFile ( name + " .proj " , our_checksum . file_size , our_checksum . file_hash ) ;
}
}
2020-04-29 17:14:49 +00:00
writeBinary ( checksums . files . size ( ) , out ) ;
for ( const auto & it : checksums . files )
{
String file_name = it . first ;
2021-05-05 15:10:14 +00:00
String path = fs : : path ( part - > getFullRelativePath ( ) ) / file_name ;
2020-04-29 17:14:49 +00:00
UInt64 size = disk - > getFileSize ( path ) ;
writeStringBinary ( it . first , out ) ;
writeBinary ( size , out ) ;
auto file_in = disk - > readFile ( path ) ;
HashingWriteBuffer hashing_out ( out ) ;
2021-05-26 20:37:44 +00:00
copyDataWithThrottler ( * file_in , hashing_out , blocker . getCounter ( ) , data . getSendsThrottler ( ) ) ;
2020-04-29 17:14:49 +00:00
if ( blocker . isCancelled ( ) )
throw Exception ( " Transferring part to replica was cancelled " , ErrorCodes : : ABORTED ) ;
if ( hashing_out . count ( ) ! = size )
throw Exception ( " Unexpected size of file " + path , ErrorCodes : : BAD_SIZE_OF_FILE_IN_DATA_PART ) ;
writePODBinary ( hashing_out . getHash ( ) , out ) ;
2020-08-26 15:29:46 +00:00
if ( ! file_names_without_checksums . count ( file_name ) )
2020-04-29 17:14:49 +00:00
data_checksums . addFile ( file_name , hashing_out . count ( ) , hashing_out . getHash ( ) ) ;
}
part - > checksums . checkEqual ( data_checksums , false ) ;
2021-02-10 14:12:49 +00:00
return data_checksums ;
2020-04-29 17:14:49 +00:00
}
2021-06-24 08:25:05 +00:00
void Service : : sendPartFromDiskRemoteMeta ( const MergeTreeData : : DataPartPtr & part , WriteBuffer & out )
2020-10-08 15:45:10 +00:00
{
/// We'll take a list of files from the list of checksums.
MergeTreeData : : DataPart : : Checksums checksums = part - > checksums ;
/// Add files that are not in the checksum list.
auto file_names_without_checksums = part - > getFileNamesWithoutChecksums ( ) ;
for ( const auto & file_name : file_names_without_checksums )
checksums . files [ file_name ] = { } ;
auto disk = part - > volume - > getDisk ( ) ;
2021-07-05 03:32:56 +00:00
if ( ! disk - > supportZeroCopyReplication ( ) )
throw Exception ( fmt : : format ( " disk {} doesn't support zero-copy replication " , disk - > getName ( ) ) , ErrorCodes : : LOGICAL_ERROR ) ;
2020-10-08 15:45:10 +00:00
2021-02-26 09:48:57 +00:00
part - > storage . lockSharedData ( * part ) ;
2020-10-08 15:45:10 +00:00
2020-11-03 08:58:26 +00:00
String part_id = part - > getUniqueId ( ) ;
writeStringBinary ( part_id , out ) ;
2020-10-08 15:45:10 +00:00
writeBinary ( checksums . files . size ( ) , out ) ;
for ( const auto & it : checksums . files )
{
String file_name = it . first ;
2021-05-05 15:10:14 +00:00
String metadata_file = fs : : path ( disk - > getPath ( ) ) / part - > getFullRelativePath ( ) / file_name ;
2020-10-08 15:45:10 +00:00
2021-04-27 00:05:43 +00:00
fs : : path metadata ( metadata_file ) ;
2020-10-08 15:45:10 +00:00
2021-04-27 00:05:43 +00:00
if ( ! fs : : exists ( metadata ) )
2021-07-05 03:32:56 +00:00
throw Exception ( " Remote metadata ' " + file_name + " ' is not exists " , ErrorCodes : : CORRUPTED_DATA ) ;
2021-04-27 00:05:43 +00:00
if ( ! fs : : is_regular_file ( metadata ) )
2021-07-05 03:32:56 +00:00
throw Exception ( " Remote metadata ' " + file_name + " ' is not a file " , ErrorCodes : : CORRUPTED_DATA ) ;
2021-04-27 00:05:43 +00:00
UInt64 file_size = fs : : file_size ( metadata ) ;
2020-10-08 15:45:10 +00:00
writeStringBinary ( it . first , out ) ;
writeBinary ( file_size , out ) ;
2021-08-16 00:00:32 +00:00
auto file_in = createReadBufferFromFileBase ( metadata_file , { } , 0 ) ;
2020-10-08 15:45:10 +00:00
HashingWriteBuffer hashing_out ( out ) ;
2021-05-26 20:37:44 +00:00
copyDataWithThrottler ( * file_in , hashing_out , blocker . getCounter ( ) , data . getSendsThrottler ( ) ) ;
2020-10-08 15:45:10 +00:00
if ( blocker . isCancelled ( ) )
throw Exception ( " Transferring part to replica was cancelled " , ErrorCodes : : ABORTED ) ;
if ( hashing_out . count ( ) ! = file_size )
throw Exception ( " Unexpected size of file " + metadata_file , ErrorCodes : : BAD_SIZE_OF_FILE_IN_DATA_PART ) ;
writePODBinary ( hashing_out . getHash ( ) , out ) ;
2020-10-22 09:32:05 +00:00
}
2020-10-08 15:45:10 +00:00
}
2016-01-28 01:00:27 +00:00
MergeTreeData : : DataPartPtr Service : : findPart ( const String & name )
{
2017-12-18 17:26:46 +00:00
/// It is important to include PreCommitted and Outdated parts here because remote replicas cannot reliably
/// determine the local state of the part, so queries for the parts in these states are completely normal.
auto part = data . getPartIfExists (
name , { MergeTreeDataPartState : : PreCommitted , MergeTreeDataPartState : : Committed , MergeTreeDataPartState : : Outdated } ) ;
2017-04-01 07:20:54 +00:00
if ( part )
return part ;
2017-10-06 15:17:14 +00:00
throw Exception ( " No part " + name + " in table " , ErrorCodes : : NO_SUCH_DATA_PART ) ;
2016-01-28 01:00:27 +00:00
}
MergeTreeData : : MutableDataPartPtr Fetcher : : fetchPart (
2020-06-26 11:30:23 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2021-05-21 16:14:01 +00:00
ContextPtr context ,
2017-04-01 07:20:54 +00:00
const String & part_name ,
const String & replica_path ,
const String & host ,
int port ,
2017-12-27 17:58:52 +00:00
const ConnectionTimeouts & timeouts ,
2018-07-26 15:10:57 +00:00
const String & user ,
const String & password ,
2018-07-30 18:32:21 +00:00
const String & interserver_scheme ,
2021-05-26 20:37:44 +00:00
ThrottlerPtr throttler ,
2018-05-21 13:49:54 +00:00
bool to_detached ,
2020-10-08 15:45:10 +00:00
const String & tmp_prefix_ ,
2021-03-12 09:58:32 +00:00
std : : optional < CurrentlySubmergingEmergingTagger > * tagger_ptr ,
2021-06-24 08:25:05 +00:00
bool try_zero_copy ,
2021-07-05 03:32:56 +00:00
DiskPtr disk )
2014-07-22 13:49:52 +00:00
{
2020-06-16 02:14:53 +00:00
if ( blocker . isCancelled ( ) )
throw Exception ( " Fetching of part was cancelled " , ErrorCodes : : ABORTED ) ;
2019-08-02 20:19:06 +00:00
/// Validation of the input that may come from malicious replica.
2020-10-26 16:38:35 +00:00
auto part_info = MergeTreePartInfo : : fromPartName ( part_name , data . format_version ) ;
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2019-08-02 20:19:06 +00:00
2017-04-06 18:32:00 +00:00
Poco : : URI uri ;
2018-07-30 18:32:21 +00:00
uri . setScheme ( interserver_scheme ) ;
2017-04-06 18:32:00 +00:00
uri . setHost ( host ) ;
uri . setPort ( port ) ;
uri . setQueryParameters (
2017-04-01 07:20:54 +00:00
{
2019-09-06 12:18:56 +00:00
{ " endpoint " , getEndpointId ( replica_path ) } ,
{ " part " , part_name } ,
2021-02-10 14:12:49 +00:00
{ " client_protocol_version " , toString ( REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION ) } ,
2019-09-06 12:18:56 +00:00
{ " compress " , " false " }
2017-11-17 20:42:03 +00:00
} ) ;
2017-04-01 07:20:54 +00:00
2021-07-05 03:32:56 +00:00
Strings capability ;
if ( try_zero_copy & & data_settings - > allow_remote_fs_zero_copy_replication )
2020-10-08 15:45:10 +00:00
{
2021-07-05 03:32:56 +00:00
if ( ! disk )
2021-01-14 16:26:56 +00:00
{
2021-08-24 23:05:55 +00:00
Disks disks = data . getDisks ( ) ;
for ( const auto & data_disk : disks )
if ( data_disk - > supportZeroCopyReplication ( ) )
capability . push_back ( toString ( data_disk - > getType ( ) ) ) ;
2021-06-24 08:25:05 +00:00
}
2021-07-05 03:32:56 +00:00
else if ( disk - > supportZeroCopyReplication ( ) )
2021-06-24 08:25:05 +00:00
{
2021-08-24 22:24:47 +00:00
capability . push_back ( toString ( disk - > getType ( ) ) ) ;
2021-01-14 16:26:56 +00:00
}
2020-10-08 15:45:10 +00:00
}
2021-07-05 03:32:56 +00:00
if ( ! capability . empty ( ) )
2020-10-08 15:45:10 +00:00
{
2021-08-24 23:05:55 +00:00
std : : sort ( capability . begin ( ) , capability . end ( ) ) ;
capability . erase ( std : : unique ( capability . begin ( ) , capability . end ( ) ) , capability . end ( ) ) ;
2021-07-05 03:32:56 +00:00
const String & remote_fs_metadata = boost : : algorithm : : join ( capability , " , " ) ;
uri . addQueryParameter ( " remote_fs_metadata " , remote_fs_metadata ) ;
2020-10-08 15:45:10 +00:00
}
2021-07-05 03:32:56 +00:00
else
2021-06-24 08:25:05 +00:00
{
2021-07-05 03:32:56 +00:00
try_zero_copy = false ;
2020-10-08 15:45:10 +00:00
}
2018-07-26 15:10:57 +00:00
Poco : : Net : : HTTPBasicCredentials creds { } ;
if ( ! user . empty ( ) )
{
creds . setUsername ( user ) ;
creds . setPassword ( password ) ;
}
2018-11-16 13:15:17 +00:00
PooledReadWriteBufferFromHTTP in {
2019-09-06 12:18:56 +00:00
uri ,
2018-11-16 13:15:17 +00:00
Poco : : Net : : HTTPRequest : : HTTP_POST ,
{ } ,
timeouts ,
creds ,
DBMS_DEFAULT_BUFFER_SIZE ,
2019-09-19 07:33:54 +00:00
0 , /* no redirects */
2019-08-13 10:29:31 +00:00
data_settings - > replicated_max_parallel_fetches_for_host
2018-11-16 13:15:17 +00:00
} ;
2017-04-01 07:20:54 +00:00
2020-02-27 10:43:38 +00:00
int server_protocol_version = parse < int > ( in . getResponseCookie ( " server_protocol_version " , " 0 " ) ) ;
2019-09-09 12:28:28 +00:00
2019-11-27 09:39:44 +00:00
ReservationPtr reservation ;
2020-06-26 21:55:48 +00:00
size_t sum_files_size = 0 ;
2020-02-27 10:43:38 +00:00
if ( server_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE )
2019-09-06 12:18:56 +00:00
{
readBinary ( sum_files_size , in ) ;
2020-02-27 10:43:38 +00:00
if ( server_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS )
2020-01-30 10:21:40 +00:00
{
2020-02-20 12:36:55 +00:00
IMergeTreeDataPart : : TTLInfos ttl_infos ;
2020-01-30 10:21:40 +00:00
String ttl_infos_string ;
readBinary ( ttl_infos_string , in ) ;
ReadBufferFromString ttl_infos_buffer ( ttl_infos_string ) ;
assertString ( " ttl format version: 1 \n " , ttl_infos_buffer ) ;
ttl_infos . read ( ttl_infos_buffer ) ;
2021-07-05 03:32:56 +00:00
if ( ! disk )
{
2021-02-18 08:50:31 +00:00
reservation
2021-07-05 03:32:56 +00:00
= data . balancedReservation ( metadata_snapshot , sum_files_size , 0 , part_name , part_info , { } , tagger_ptr , & ttl_infos , true ) ;
if ( ! reservation )
reservation
= data . reserveSpacePreferringTTLRules ( metadata_snapshot , sum_files_size , ttl_infos , std : : time ( nullptr ) , 0 , true ) ;
}
2020-01-30 10:21:40 +00:00
}
2021-07-05 03:32:56 +00:00
else if ( ! disk )
2021-02-18 08:50:31 +00:00
{
reservation = data . balancedReservation ( metadata_snapshot , sum_files_size , 0 , part_name , part_info , { } , tagger_ptr , nullptr ) ;
if ( ! reservation )
reservation = data . reserveSpace ( sum_files_size ) ;
}
2019-09-06 12:18:56 +00:00
}
2021-07-05 03:32:56 +00:00
else if ( ! disk )
2019-09-06 12:18:56 +00:00
{
2019-09-06 15:09:20 +00:00
/// We don't know real size of part because sender server version is too old
reservation = data . makeEmptyReservationOnLargestDisk ( ) ;
2019-09-06 12:18:56 +00:00
}
2021-07-05 03:32:56 +00:00
if ( ! disk )
disk = reservation - > getDisk ( ) ;
2019-09-06 12:18:56 +00:00
2020-07-02 23:41:37 +00:00
bool sync = ( data_settings - > min_compressed_bytes_to_fsync_after_fetch
& & sum_files_size > = data_settings - > min_compressed_bytes_to_fsync_after_fetch ) ;
2020-06-26 21:55:48 +00:00
2020-05-14 20:08:15 +00:00
String part_type = " Wide " ;
2020-06-26 11:38:37 +00:00
if ( server_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE )
2020-05-14 20:08:15 +00:00
readStringBinary ( part_type , in ) ;
2020-11-02 14:38:18 +00:00
UUID part_uuid = UUIDHelpers : : Nil ;
2020-10-29 16:18:25 +00:00
if ( server_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID )
readUUIDText ( part_uuid , in ) ;
2021-07-05 03:32:56 +00:00
String remote_fs_metadata = parse < String > ( in . getResponseCookie ( " remote_fs_metadata " , " " ) ) ;
if ( ! remote_fs_metadata . empty ( ) )
{
if ( ! try_zero_copy )
throw Exception ( " Got unexpected 'remote_fs_metadata' cookie " , ErrorCodes : : LOGICAL_ERROR ) ;
if ( std : : find ( capability . begin ( ) , capability . end ( ) , remote_fs_metadata ) = = capability . end ( ) )
throw Exception ( fmt : : format ( " Got 'remote_fs_metadata' cookie {}, expect one from {} " , remote_fs_metadata , fmt : : join ( capability , " , " ) ) , ErrorCodes : : LOGICAL_ERROR ) ;
if ( server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY )
throw Exception ( fmt : : format ( " Got 'remote_fs_metadata' cookie with old protocol version {} " , server_protocol_version ) , ErrorCodes : : LOGICAL_ERROR ) ;
if ( part_type = = " InMemory " )
throw Exception ( " Got 'remote_fs_metadata' cookie for in-memory part " , ErrorCodes : : INCORRECT_PART_TYPE ) ;
try
{
return downloadPartToDiskRemoteMeta ( part_name , replica_path , to_detached , tmp_prefix_ , disk , in , throttler ) ;
}
catch ( const Exception & e )
{
if ( e . code ( ) ! = ErrorCodes : : S3_ERROR & & e . code ( ) ! = ErrorCodes : : ZERO_COPY_REPLICATION_ERROR )
throw ;
LOG_WARNING ( log , e . message ( ) + " Will retry fetching part without zero-copy. " ) ;
/// Try again but without zero-copy
return fetchPart ( metadata_snapshot , context , part_name , replica_path , host , port , timeouts ,
user , password , interserver_scheme , throttler , to_detached , tmp_prefix_ , nullptr , false , disk ) ;
}
}
2020-10-27 12:47:42 +00:00
auto storage_id = data . getStorageID ( ) ;
2021-07-05 03:32:56 +00:00
String new_part_path = part_type = = " InMemory " ? " memory " : fs : : path ( data . getFullPathOnDisk ( disk ) ) / part_name / " " ;
2021-04-10 23:33:54 +00:00
auto entry = data . getContext ( ) - > getReplicatedFetchList ( ) . insert (
2020-10-27 12:47:42 +00:00
storage_id . getDatabaseName ( ) , storage_id . getTableName ( ) ,
part_info . partition_id , part_name , new_part_path ,
replica_path , uri , to_detached , sum_files_size ) ;
2020-10-27 13:00:40 +00:00
in . setNextCallback ( ReplicatedFetchReadCallback ( * entry ) ) ;
2020-10-27 12:47:42 +00:00
2021-02-10 14:12:49 +00:00
size_t projections = 0 ;
if ( server_protocol_version > = REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION )
readBinary ( projections , in ) ;
MergeTreeData : : DataPart : : Checksums checksums ;
return part_type = = " InMemory "
2021-07-05 03:32:56 +00:00
? downloadPartToMemory ( part_name , part_uuid , metadata_snapshot , context , disk , in , projections , throttler )
: downloadPartToDisk ( part_name , replica_path , to_detached , tmp_prefix_ , sync , disk , in , projections , checksums , throttler ) ;
2020-04-29 17:14:49 +00:00
}
MergeTreeData : : MutableDataPartPtr Fetcher : : downloadPartToMemory (
const String & part_name ,
2020-10-29 16:18:25 +00:00
const UUID & part_uuid ,
2020-06-26 11:30:23 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2021-05-21 16:14:01 +00:00
ContextPtr context ,
2021-07-05 03:32:56 +00:00
DiskPtr disk ,
2021-02-10 14:12:49 +00:00
PooledReadWriteBufferFromHTTP & in ,
2021-05-26 20:37:44 +00:00
size_t projections ,
ThrottlerPtr throttler )
2020-04-29 17:14:49 +00:00
{
2021-07-05 03:32:56 +00:00
auto volume = std : : make_shared < SingleDiskVolume > ( " volume_ " + part_name , disk , 0 ) ;
2021-02-10 14:12:49 +00:00
MergeTreeData : : MutableDataPartPtr new_data_part =
std : : make_shared < MergeTreeDataPartInMemory > ( data , part_name , volume ) ;
for ( auto i = 0ul ; i < projections ; + + i )
{
String projection_name ;
readStringBinary ( projection_name , in ) ;
MergeTreeData : : DataPart : : Checksums checksums ;
if ( ! checksums . read ( in ) )
throw Exception ( " Cannot deserialize checksums " , ErrorCodes : : CORRUPTED_DATA ) ;
2021-10-08 17:21:19 +00:00
NativeReader block_in ( in , 0 ) ;
2021-02-10 14:12:49 +00:00
auto block = block_in . read ( ) ;
2021-05-26 20:37:44 +00:00
throttler - > add ( block . bytes ( ) ) ;
2021-02-10 14:12:49 +00:00
MergeTreePartInfo new_part_info ( " all " , 0 , 0 , 0 ) ;
MergeTreeData : : MutableDataPartPtr new_projection_part =
std : : make_shared < MergeTreeDataPartInMemory > ( data , projection_name , new_part_info , volume , projection_name , new_data_part . get ( ) ) ;
new_projection_part - > is_temp = false ;
new_projection_part - > setColumns ( block . getNamesAndTypesList ( ) ) ;
MergeTreePartition partition { } ;
new_projection_part - > partition = std : : move ( partition ) ;
2021-09-16 21:19:58 +00:00
new_projection_part - > minmax_idx = std : : make_shared < IMergeTreeDataPart : : MinMaxIndex > ( ) ;
2021-02-10 14:12:49 +00:00
MergedBlockOutputStream part_out (
new_projection_part ,
metadata_snapshot - > projections . get ( projection_name ) . metadata ,
block . getNamesAndTypesList ( ) ,
{ } ,
2021-05-14 21:45:13 +00:00
CompressionCodecFactory : : instance ( ) . get ( " NONE " , { } ) ,
new_data_part - > serialization_info ) ;
2021-02-10 14:12:49 +00:00
part_out . write ( block ) ;
part_out . writeSuffixAndFinalizePart ( new_projection_part ) ;
new_projection_part - > checksums . checkEqual ( checksums , /* have_uncompressed = */ true ) ;
new_data_part - > addProjectionPart ( projection_name , std : : move ( new_projection_part ) ) ;
}
2020-05-05 01:27:31 +00:00
MergeTreeData : : DataPart : : Checksums checksums ;
if ( ! checksums . read ( in ) )
throw Exception ( " Cannot deserialize checksums " , ErrorCodes : : CORRUPTED_DATA ) ;
2021-10-08 17:21:19 +00:00
NativeReader block_in ( in , 0 ) ;
2020-06-03 18:59:18 +00:00
auto block = block_in . read ( ) ;
2021-05-26 20:37:44 +00:00
throttler - > add ( block . bytes ( ) ) ;
2020-06-03 13:27:54 +00:00
2020-10-29 16:18:25 +00:00
new_data_part - > uuid = part_uuid ;
2020-04-29 17:14:49 +00:00
new_data_part - > is_temp = true ;
new_data_part - > setColumns ( block . getNamesAndTypesList ( ) ) ;
2021-09-16 21:19:58 +00:00
new_data_part - > minmax_idx - > update ( block , data . getMinMaxColumnsNames ( metadata_snapshot - > getPartitionKey ( ) ) ) ;
2021-05-21 16:14:01 +00:00
new_data_part - > partition . create ( metadata_snapshot , block , 0 , context ) ;
2020-04-29 17:14:49 +00:00
2021-02-10 14:12:49 +00:00
MergedBlockOutputStream part_out (
2021-10-14 16:44:08 +00:00
new_data_part , metadata_snapshot , block . getNamesAndTypesList ( ) , { } ,
CompressionCodecFactory : : instance ( ) . get ( " NONE " , { } ) ,
new_data_part - > serialization_info ) ;
2021-04-15 21:47:11 +00:00
2020-04-29 17:14:49 +00:00
part_out . write ( block ) ;
part_out . writeSuffixAndFinalizePart ( new_data_part ) ;
2020-05-05 01:27:31 +00:00
new_data_part - > checksums . checkEqual ( checksums , /* have_uncompressed = */ true ) ;
2020-04-29 17:14:49 +00:00
return new_data_part ;
2019-05-12 14:57:23 +00:00
}
2021-02-10 14:12:49 +00:00
void Fetcher : : downloadBaseOrProjectionPartToDisk (
2019-05-12 14:57:23 +00:00
const String & replica_path ,
2021-02-10 14:12:49 +00:00
const String & part_download_path ,
2020-06-26 21:55:48 +00:00
bool sync ,
2021-02-10 14:12:49 +00:00
DiskPtr disk ,
PooledReadWriteBufferFromHTTP & in ,
2021-05-26 20:37:44 +00:00
MergeTreeData : : DataPart : : Checksums & checksums ,
ThrottlerPtr throttler ) const
2019-05-12 14:57:23 +00:00
{
size_t files ;
readBinary ( files , in ) ;
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < files ; + + i )
{
String file_name ;
UInt64 file_size ;
readStringBinary ( file_name , in ) ;
readBinary ( file_size , in ) ;
2019-07-31 18:21:13 +00:00
/// File must be inside "absolute_part_path" directory.
/// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.
2021-05-24 16:03:09 +00:00
String absolute_file_path = fs : : weakly_canonical ( fs : : path ( part_download_path ) / file_name ) ;
if ( ! startsWith ( absolute_file_path , fs : : weakly_canonical ( part_download_path ) . string ( ) ) )
2020-04-08 08:41:13 +00:00
throw Exception ( " File path ( " + absolute_file_path + " ) doesn't appear to be inside part path ( " + part_download_path + " ). "
2019-07-31 18:21:13 +00:00
" This may happen if we are trying to download part from malicious replica or logical error. " ,
ErrorCodes : : INSECURE_PATH ) ;
2021-05-05 15:10:14 +00:00
auto file_out = disk - > writeFile ( fs : : path ( part_download_path ) / file_name ) ;
2020-04-08 08:41:13 +00:00
HashingWriteBuffer hashing_out ( * file_out ) ;
2021-05-26 20:37:44 +00:00
copyDataWithThrottler ( in , hashing_out , file_size , blocker . getCounter ( ) , throttler ) ;
2017-04-01 07:20:54 +00:00
2017-10-06 16:53:55 +00:00
if ( blocker . isCancelled ( ) )
2017-04-01 07:20:54 +00:00
{
2020-06-16 02:14:53 +00:00
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,
/// performing a poll with a not very large timeout.
2017-04-01 07:20:54 +00:00
/// And now we check it only between read chunks (in the `copyData` function).
2020-04-08 08:41:13 +00:00
disk - > removeRecursive ( part_download_path ) ;
2017-04-01 07:20:54 +00:00
throw Exception ( " Fetching of part was cancelled " , ErrorCodes : : ABORTED ) ;
}
2017-06-21 01:24:05 +00:00
MergeTreeDataPartChecksum : : uint128 expected_hash ;
readPODBinary ( expected_hash , in ) ;
2017-04-01 07:20:54 +00:00
if ( expected_hash ! = hashing_out . getHash ( ) )
2021-05-05 15:10:14 +00:00
throw Exception ( " Checksum mismatch for file " + fullPath ( disk , ( fs : : path ( part_download_path ) / file_name ) . string ( ) ) + " transferred from " + replica_path ,
2018-11-22 21:19:58 +00:00
ErrorCodes : : CHECKSUM_DOESNT_MATCH ) ;
2017-04-01 07:20:54 +00:00
if ( file_name ! = " checksums.txt " & &
2020-08-26 15:29:46 +00:00
file_name ! = " columns.txt " & &
file_name ! = IMergeTreeDataPart : : DEFAULT_COMPRESSION_CODEC_FILE_NAME )
2017-04-01 07:20:54 +00:00
checksums . addFile ( file_name , file_size , expected_hash ) ;
2020-06-26 21:55:48 +00:00
if ( sync )
hashing_out . sync ( ) ;
2017-04-01 07:20:54 +00:00
}
2021-02-10 14:12:49 +00:00
}
2017-04-01 07:20:54 +00:00
2021-02-10 14:12:49 +00:00
MergeTreeData : : MutableDataPartPtr Fetcher : : downloadPartToDisk (
const String & part_name ,
const String & replica_path ,
bool to_detached ,
const String & tmp_prefix_ ,
bool sync ,
DiskPtr disk ,
PooledReadWriteBufferFromHTTP & in ,
size_t projections ,
2021-05-26 20:37:44 +00:00
MergeTreeData : : DataPart : : Checksums & checksums ,
ThrottlerPtr throttler )
2021-02-10 14:12:49 +00:00
{
2021-08-04 14:42:48 +00:00
static const String TMP_PREFIX = " tmp-fetch_ " ;
2021-02-10 14:12:49 +00:00
String tmp_prefix = tmp_prefix_ . empty ( ) ? TMP_PREFIX : tmp_prefix_ ;
2017-04-01 07:20:54 +00:00
2021-02-10 14:12:49 +00:00
/// We will remove directory if it's already exists. Make precautions.
if ( tmp_prefix . empty ( ) //-V560
| | part_name . empty ( )
| | std : : string : : npos ! = tmp_prefix . find_first_of ( " /. " )
| | std : : string : : npos ! = part_name . find_first_of ( " /. " ) )
throw Exception ( " Logical error: tmp_prefix and part_name cannot be empty or contain '.' or '/' characters. " , ErrorCodes : : LOGICAL_ERROR ) ;
String part_relative_path = String ( to_detached ? " detached/ " : " " ) + tmp_prefix + part_name ;
String part_download_path = data . getRelativeDataPath ( ) + part_relative_path + " / " ;
if ( disk - > exists ( part_download_path ) )
{
LOG_WARNING ( log , " Directory {} already exists, probably result of a failed fetch. Will remove it before fetching part. " ,
fullPath ( disk , part_download_path ) ) ;
disk - > removeRecursive ( part_download_path ) ;
}
disk - > createDirectories ( part_download_path ) ;
SyncGuardPtr sync_guard ;
if ( data . getSettings ( ) - > fsync_part_directory )
sync_guard = disk - > getDirectorySyncGuard ( part_download_path ) ;
2017-04-01 07:20:54 +00:00
2021-02-10 14:12:49 +00:00
CurrentMetrics : : Increment metric_increment { CurrentMetrics : : ReplicatedFetch } ;
for ( auto i = 0ul ; i < projections ; + + i )
{
String projection_name ;
readStringBinary ( projection_name , in ) ;
MergeTreeData : : DataPart : : Checksums projection_checksum ;
2021-05-11 11:44:59 +00:00
disk - > createDirectories ( part_download_path + projection_name + " .proj/ " ) ;
2021-02-10 14:12:49 +00:00
downloadBaseOrProjectionPartToDisk (
2021-05-26 20:37:44 +00:00
replica_path , part_download_path + projection_name + " .proj/ " , sync , disk , in , projection_checksum , throttler ) ;
2021-02-10 14:12:49 +00:00
checksums . addFile (
projection_name + " .proj " , projection_checksum . getTotalSizeOnDisk ( ) , projection_checksum . getTotalChecksumUInt128 ( ) ) ;
}
// Download the base part
2021-05-26 20:37:44 +00:00
downloadBaseOrProjectionPartToDisk ( replica_path , part_download_path , sync , disk , in , checksums , throttler ) ;
2021-02-10 14:12:49 +00:00
assertEOF ( in ) ;
2020-10-20 15:10:24 +00:00
auto volume = std : : make_shared < SingleDiskVolume > ( " volume_ " + part_name , disk , 0 ) ;
2020-05-09 21:24:15 +00:00
MergeTreeData : : MutableDataPartPtr new_data_part = data . createPart ( part_name , volume , part_relative_path ) ;
2020-02-11 23:29:34 +00:00
new_data_part - > is_temp = true ;
2017-08-04 14:00:26 +00:00
new_data_part - > modification_time = time ( nullptr ) ;
2017-08-16 19:24:50 +00:00
new_data_part - > loadColumnsChecksumsIndexes ( true , false ) ;
2017-04-01 07:20:54 +00:00
new_data_part - > checksums . checkEqual ( checksums , false ) ;
return new_data_part ;
2014-07-22 13:49:52 +00:00
}
2021-06-24 08:25:05 +00:00
MergeTreeData : : MutableDataPartPtr Fetcher : : downloadPartToDiskRemoteMeta (
2020-10-08 15:45:10 +00:00
const String & part_name ,
const String & replica_path ,
bool to_detached ,
const String & tmp_prefix_ ,
2021-07-05 03:32:56 +00:00
DiskPtr disk ,
2021-05-26 20:37:44 +00:00
PooledReadWriteBufferFromHTTP & in ,
ThrottlerPtr throttler )
2020-10-08 15:45:10 +00:00
{
2020-11-03 08:58:26 +00:00
String part_id ;
readStringBinary ( part_id , in ) ;
2021-07-05 03:32:56 +00:00
if ( ! disk - > supportZeroCopyReplication ( ) | | ! disk - > checkUniqueId ( part_id ) )
2020-11-03 08:58:26 +00:00
{
2021-07-05 03:32:56 +00:00
throw Exception ( fmt : : format ( " Part {} unique id {} doesn't exist on {}. " , part_name , part_id , disk - > getName ( ) ) , ErrorCodes : : ZERO_COPY_REPLICATION_ERROR ) ;
2020-11-03 08:58:26 +00:00
}
2021-07-05 03:32:56 +00:00
LOG_DEBUG ( log , " Downloading Part {} unique id {} metadata onto disk {}. " ,
part_name , part_id , disk - > getName ( ) ) ;
2020-11-03 08:58:26 +00:00
2021-08-04 14:42:48 +00:00
static const String TMP_PREFIX = " tmp-fetch_ " ;
2020-10-08 15:45:10 +00:00
String tmp_prefix = tmp_prefix_ . empty ( ) ? TMP_PREFIX : tmp_prefix_ ;
String part_relative_path = String ( to_detached ? " detached/ " : " " ) + tmp_prefix + part_name ;
2021-05-05 15:10:14 +00:00
String part_download_path = fs : : path ( data . getRelativeDataPath ( ) ) / part_relative_path / " " ;
2020-10-08 15:45:10 +00:00
if ( disk - > exists ( part_download_path ) )
throw Exception ( " Directory " + fullPath ( disk , part_download_path ) + " already exists. " , ErrorCodes : : DIRECTORY_ALREADY_EXISTS ) ;
CurrentMetrics : : Increment metric_increment { CurrentMetrics : : ReplicatedFetch } ;
disk - > createDirectories ( part_download_path ) ;
size_t files ;
readBinary ( files , in ) ;
auto volume = std : : make_shared < SingleDiskVolume > ( " volume_ " + part_name , disk ) ;
for ( size_t i = 0 ; i < files ; + + i )
{
String file_name ;
UInt64 file_size ;
readStringBinary ( file_name , in ) ;
readBinary ( file_size , in ) ;
2021-05-26 14:53:43 +00:00
String data_path = fs : : path ( part_download_path ) / file_name ;
2020-10-14 15:05:59 +00:00
String metadata_file = fullPath ( disk , data_path ) ;
2020-10-08 15:45:10 +00:00
2020-10-14 15:05:59 +00:00
{
2021-01-20 09:48:22 +00:00
auto file_out = std : : make_unique < WriteBufferFromFile > ( metadata_file , DBMS_DEFAULT_BUFFER_SIZE , - 1 , 0666 , nullptr , 0 ) ;
2020-10-08 15:45:10 +00:00
2020-10-14 15:05:59 +00:00
HashingWriteBuffer hashing_out ( * file_out ) ;
2020-10-08 15:45:10 +00:00
2021-05-26 20:37:44 +00:00
copyDataWithThrottler ( in , hashing_out , file_size , blocker . getCounter ( ) , throttler ) ;
2020-10-08 15:45:10 +00:00
2020-10-14 15:05:59 +00:00
if ( blocker . isCancelled ( ) )
{
/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,
/// performing a poll with a not very large timeout.
/// And now we check it only between read chunks (in the `copyData` function).
2020-10-23 12:01:50 +00:00
disk - > removeSharedRecursive ( part_download_path , true ) ;
2020-10-14 15:05:59 +00:00
throw Exception ( " Fetching of part was cancelled " , ErrorCodes : : ABORTED ) ;
}
MergeTreeDataPartChecksum : : uint128 expected_hash ;
readPODBinary ( expected_hash , in ) ;
if ( expected_hash ! = hashing_out . getHash ( ) )
{
throw Exception ( " Checksum mismatch for file " + metadata_file + " transferred from " + replica_path ,
ErrorCodes : : CHECKSUM_DOESNT_MATCH ) ;
}
2020-10-08 15:45:10 +00:00
}
}
assertEOF ( in ) ;
2021-05-24 16:37:37 +00:00
MergeTreeData : : MutableDataPartPtr new_data_part = data . createPart ( part_name , volume , part_relative_path ) ;
2020-10-08 15:45:10 +00:00
new_data_part - > is_temp = true ;
new_data_part - > modification_time = time ( nullptr ) ;
new_data_part - > loadColumnsChecksumsIndexes ( true , false ) ;
2021-02-26 09:48:57 +00:00
new_data_part - > storage . lockSharedData ( * new_data_part ) ;
2020-10-08 15:45:10 +00:00
return new_data_part ;
}
2014-07-22 13:49:52 +00:00
}
2016-01-28 01:00:27 +00:00
}