Slightly better fix

This commit is contained in:
alesapin 2022-05-24 14:46:29 +02:00
parent 3ca7a8831b
commit 9a19309e69

View File

@ -453,29 +453,28 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
creds.setPassword(password);
}
PooledReadWriteBufferFromHTTP in{
std::unique_ptr<PooledReadWriteBufferFromHTTP> in = std::make_unique<PooledReadWriteBufferFromHTTP>(
uri,
Poco::Net::HTTPRequest::HTTP_POST,
{},
nullptr,
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
0, /* no redirects */
data_settings->replicated_max_parallel_fetches_for_host
};
static_cast<uint64_t>(data_settings->replicated_max_parallel_fetches_for_host));
int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));
int server_protocol_version = parse<int>(in->getResponseCookie("server_protocol_version", "0"));
ReservationPtr reservation;
size_t sum_files_size = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
{
readBinary(sum_files_size, in);
readBinary(sum_files_size, *in);
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
{
IMergeTreeDataPart::TTLInfos ttl_infos;
String ttl_infos_string;
readBinary(ttl_infos_string, in);
readBinary(ttl_infos_string, *in);
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
@ -508,13 +507,13 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
String part_type = "Wide";
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
readStringBinary(part_type, in);
readStringBinary(part_type, *in);
UUID part_uuid = UUIDHelpers::Nil;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
readUUIDText(part_uuid, in);
readUUIDText(part_uuid, *in);
String remote_fs_metadata = parse<String>(in.getResponseCookie("remote_fs_metadata", ""));
String remote_fs_metadata = parse<String>(in->getResponseCookie("remote_fs_metadata", ""));
if (!remote_fs_metadata.empty())
{
if (!try_zero_copy)
@ -528,7 +527,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
try
{
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler);
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, *in, throttler);
}
catch (const Exception & e)
{
@ -536,6 +535,18 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
throw;
LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy."));
/// It's important to release session from HTTP pool. Otherwise it's possible to get deadlock
/// on http pool.
try
{
in.reset();
}
catch (...)
{
tryLogCurrentException(log);
}
/// Try again but without zero-copy
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false, disk);
@ -549,16 +560,16 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
part_info.partition_id, part_name, new_part_path,
replica_path, uri, to_detached, sum_files_size);
in.setNextCallback(ReplicatedFetchReadCallback(*entry));
in->setNextCallback(ReplicatedFetchReadCallback(*entry));
size_t projections = 0;
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
readBinary(projections, in);
readBinary(projections, *in);
MergeTreeData::DataPart::Checksums checksums;
return part_type == "InMemory"
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler);
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, *in, projections, throttler)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, *in, projections, checksums, throttler);
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(