diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 064447c54ad..0087a8892d4 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -453,29 +453,28 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( creds.setPassword(password); } - PooledReadWriteBufferFromHTTP in{ + std::unique_ptr in = std::make_unique( uri, Poco::Net::HTTPRequest::HTTP_POST, - {}, + nullptr, timeouts, creds, DBMS_DEFAULT_BUFFER_SIZE, 0, /* no redirects */ - data_settings->replicated_max_parallel_fetches_for_host - }; + static_cast(data_settings->replicated_max_parallel_fetches_for_host)); - int server_protocol_version = parse(in.getResponseCookie("server_protocol_version", "0")); + int server_protocol_version = parse(in->getResponseCookie("server_protocol_version", "0")); ReservationPtr reservation; size_t sum_files_size = 0; if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE) { - readBinary(sum_files_size, in); + readBinary(sum_files_size, *in); if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS) { IMergeTreeDataPart::TTLInfos ttl_infos; String ttl_infos_string; - readBinary(ttl_infos_string, in); + readBinary(ttl_infos_string, *in); ReadBufferFromString ttl_infos_buffer(ttl_infos_string); assertString("ttl format version: 1\n", ttl_infos_buffer); ttl_infos.read(ttl_infos_buffer); @@ -508,13 +507,13 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( String part_type = "Wide"; if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE) - readStringBinary(part_type, in); + readStringBinary(part_type, *in); UUID part_uuid = UUIDHelpers::Nil; if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID) - readUUIDText(part_uuid, in); + readUUIDText(part_uuid, *in); - String remote_fs_metadata = parse(in.getResponseCookie("remote_fs_metadata", "")); + String remote_fs_metadata = parse(in->getResponseCookie("remote_fs_metadata", "")); if (!remote_fs_metadata.empty()) { if (!try_zero_copy) @@ -528,7 +527,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( try { - return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler); + return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, *in, throttler); } catch (const Exception & e) { @@ -536,6 +535,18 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( throw; LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy.")); + + /// It's important to release session from HTTP pool. Otherwise it's possible to get deadlock + /// on http pool. + try + { + in.reset(); + } + catch (...) + { + tryLogCurrentException(log); + } + /// Try again but without zero-copy return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts, user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false, disk); @@ -549,16 +560,16 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( part_info.partition_id, part_name, new_part_path, replica_path, uri, to_detached, sum_files_size); - in.setNextCallback(ReplicatedFetchReadCallback(*entry)); + in->setNextCallback(ReplicatedFetchReadCallback(*entry)); size_t projections = 0; if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION) - readBinary(projections, in); + readBinary(projections, *in); MergeTreeData::DataPart::Checksums checksums; return part_type == "InMemory" - ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler) - : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler); + ? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, *in, projections, throttler) + : downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, *in, projections, checksums, throttler); } MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(