diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 064447c54ad..8c59a1c00bc 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -453,112 +453,122 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( creds.setPassword(password); } - PooledReadWriteBufferFromHTTP in{ - uri, - Poco::Net::HTTPRequest::HTTP_POST, - {}, - timeouts, - creds, - DBMS_DEFAULT_BUFFER_SIZE, - 0, /* no redirects */ - data_settings->replicated_max_parallel_fetches_for_host - }; - - int server_protocol_version = parse(in.getResponseCookie("server_protocol_version", "0")); - - ReservationPtr reservation; - size_t sum_files_size = 0; - if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE) + bool retry_without_zero_copy = false; { - readBinary(sum_files_size, in); - if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS) + PooledReadWriteBufferFromHTTP in{ + uri, + Poco::Net::HTTPRequest::HTTP_POST, + {}, + timeouts, + creds, + DBMS_DEFAULT_BUFFER_SIZE, + 0, /* no redirects */ + data_settings->replicated_max_parallel_fetches_for_host + }; + + int server_protocol_version = parse(in.getResponseCookie("server_protocol_version", "0")); + + ReservationPtr reservation; + size_t sum_files_size = 0; + if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE) { - IMergeTreeDataPart::TTLInfos ttl_infos; - String ttl_infos_string; - readBinary(ttl_infos_string, in); - ReadBufferFromString ttl_infos_buffer(ttl_infos_string); - assertString("ttl format version: 1\n", ttl_infos_buffer); - ttl_infos.read(ttl_infos_buffer); - if (!disk) + readBinary(sum_files_size, in); + if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS) { - reservation - = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true); - if (!reservation) + IMergeTreeDataPart::TTLInfos ttl_infos; + String ttl_infos_string; + readBinary(ttl_infos_string, in); + ReadBufferFromString ttl_infos_buffer(ttl_infos_string); + assertString("ttl format version: 1\n", ttl_infos_buffer); + ttl_infos.read(ttl_infos_buffer); + if (!disk) + { reservation - = data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true); + = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true); + if (!reservation) + reservation + = data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true); + } + } + else if (!disk) + { + reservation = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, nullptr); + if (!reservation) + reservation = data.reserveSpace(sum_files_size); } } else if (!disk) { - reservation = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, nullptr); - if (!reservation) - reservation = data.reserveSpace(sum_files_size); + /// We don't know real size of part because sender server version is too old + reservation = data.makeEmptyReservationOnLargestDisk(); } - } - else if (!disk) - { - /// We don't know real size of part because sender server version is too old - reservation = data.makeEmptyReservationOnLargestDisk(); - } - if (!disk) - disk = reservation->getDisk(); + if (!disk) + disk = reservation->getDisk(); - bool sync = (data_settings->min_compressed_bytes_to_fsync_after_fetch - && sum_files_size >= data_settings->min_compressed_bytes_to_fsync_after_fetch); + bool sync = (data_settings->min_compressed_bytes_to_fsync_after_fetch + && sum_files_size >= data_settings->min_compressed_bytes_to_fsync_after_fetch); - String part_type = "Wide"; - if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE) - readStringBinary(part_type, in); + String part_type = "Wide"; + if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE) + readStringBinary(part_type, in); - UUID part_uuid = UUIDHelpers::Nil; - if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID) - readUUIDText(part_uuid, in); + UUID part_uuid = UUIDHelpers::Nil; + if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID) + readUUIDText(part_uuid, in); - String remote_fs_metadata = parse(in.getResponseCookie("remote_fs_metadata", "")); - if (!remote_fs_metadata.empty()) - { - if (!try_zero_copy) - throw Exception("Got unexpected 'remote_fs_metadata' cookie", ErrorCodes::LOGICAL_ERROR); - if (std::find(capability.begin(), capability.end(), remote_fs_metadata) == capability.end()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Got 'remote_fs_metadata' cookie {}, expect one from {}", remote_fs_metadata, fmt::join(capability, ", ")); - if (server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Got 'remote_fs_metadata' cookie with old protocol version {}", server_protocol_version); - if (part_type == "InMemory") - throw Exception("Got 'remote_fs_metadata' cookie for in-memory part", ErrorCodes::INCORRECT_PART_TYPE); - - try + String remote_fs_metadata = parse(in.getResponseCookie("remote_fs_metadata", "")); + if (!remote_fs_metadata.empty()) { - return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler); - } - catch (const Exception & e) - { - if (e.code() != ErrorCodes::S3_ERROR && e.code() != ErrorCodes::ZERO_COPY_REPLICATION_ERROR) - throw; + if (!try_zero_copy) + throw Exception("Got unexpected 'remote_fs_metadata' cookie", ErrorCodes::LOGICAL_ERROR); + if (std::find(capability.begin(), capability.end(), remote_fs_metadata) == capability.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Got 'remote_fs_metadata' cookie {}, expect one from {}", remote_fs_metadata, fmt::join(capability, ", ")); + if (server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Got 'remote_fs_metadata' cookie with old protocol version {}", server_protocol_version); + if (part_type == "InMemory") + throw Exception("Got 'remote_fs_metadata' cookie for in-memory part", ErrorCodes::INCORRECT_PART_TYPE); - LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy.")); - /// Try again but without zero-copy - return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, - user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false, disk); + try + { + return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler); + } + catch (const Exception & e) + { + if (e.code() != ErrorCodes::S3_ERROR && e.code() != ErrorCodes::ZERO_COPY_REPLICATION_ERROR) + throw; + + LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy.")); + /// Try again later but without zero-copy + retry_without_zero_copy = true; + } + } + else + { + auto storage_id = data.getStorageID(); + String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / ""; + auto entry = data.getContext()->getReplicatedFetchList().insert( + storage_id.getDatabaseName(), storage_id.getTableName(), + part_info.partition_id, part_name, new_part_path, + replica_path, uri, to_detached, sum_files_size); + + in.setNextCallback(ReplicatedFetchReadCallback(*entry)); + + size_t projections = 0; + if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION) + readBinary(projections, in); + + MergeTreeData::DataPart::Checksums checksums; + return part_type == "InMemory" + ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler) + : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler); } } - auto storage_id = data.getStorageID(); - String new_part_path = part_type == "InMemory" ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / ""; - auto entry = data.getContext()->getReplicatedFetchList().insert( - storage_id.getDatabaseName(), storage_id.getTableName(), - part_info.partition_id, part_name, new_part_path, - replica_path, uri, to_detached, sum_files_size); - - in.setNextCallback(ReplicatedFetchReadCallback(*entry)); - - size_t projections = 0; - if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION) - readBinary(projections, in); - - MergeTreeData::DataPart::Checksums checksums; - return part_type == "InMemory" - ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler) - : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler); + if (retry_without_zero_copy) + return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, + user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false, disk); + else + throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't fetch part and no retry. It is a bug."); } MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(