mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #37598 from ClickHouse/revert-37545-revert-37424-fix_fetching_part_deadlock
Revert "Revert "(only with zero-copy replication, non-production experimental feature not recommended to use) fix possible deadlock during fetching part""
This commit is contained in:
commit
9dc81e5cc8
@ -469,29 +469,28 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
creds.setPassword(password);
|
||||
}
|
||||
|
||||
PooledReadWriteBufferFromHTTP in{
|
||||
std::unique_ptr<PooledReadWriteBufferFromHTTP> in = std::make_unique<PooledReadWriteBufferFromHTTP>(
|
||||
uri,
|
||||
Poco::Net::HTTPRequest::HTTP_POST,
|
||||
{},
|
||||
nullptr,
|
||||
timeouts,
|
||||
creds,
|
||||
DBMS_DEFAULT_BUFFER_SIZE,
|
||||
0, /* no redirects */
|
||||
data_settings->replicated_max_parallel_fetches_for_host
|
||||
};
|
||||
static_cast<uint64_t>(data_settings->replicated_max_parallel_fetches_for_host));
|
||||
|
||||
int server_protocol_version = parse<int>(in.getResponseCookie("server_protocol_version", "0"));
|
||||
int server_protocol_version = parse<int>(in->getResponseCookie("server_protocol_version", "0"));
|
||||
|
||||
ReservationPtr reservation;
|
||||
size_t sum_files_size = 0;
|
||||
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
|
||||
{
|
||||
readBinary(sum_files_size, in);
|
||||
readBinary(sum_files_size, *in);
|
||||
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE_AND_TTL_INFOS)
|
||||
{
|
||||
IMergeTreeDataPart::TTLInfos ttl_infos;
|
||||
String ttl_infos_string;
|
||||
readBinary(ttl_infos_string, in);
|
||||
readBinary(ttl_infos_string, *in);
|
||||
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
|
||||
assertString("ttl format version: 1\n", ttl_infos_buffer);
|
||||
ttl_infos.read(ttl_infos_buffer);
|
||||
@ -519,7 +518,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
if (!disk)
|
||||
disk = reservation->getDisk();
|
||||
|
||||
UInt64 revision = parse<UInt64>(in.getResponseCookie("disk_revision", "0"));
|
||||
UInt64 revision = parse<UInt64>(in->getResponseCookie("disk_revision", "0"));
|
||||
if (revision)
|
||||
disk->syncRevision(revision);
|
||||
|
||||
@ -528,13 +527,13 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
|
||||
String part_type = "Wide";
|
||||
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_TYPE)
|
||||
readStringBinary(part_type, in);
|
||||
readStringBinary(part_type, *in);
|
||||
|
||||
UUID part_uuid = UUIDHelpers::Nil;
|
||||
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_UUID)
|
||||
readUUIDText(part_uuid, in);
|
||||
readUUIDText(part_uuid, *in);
|
||||
|
||||
String remote_fs_metadata = parse<String>(in.getResponseCookie("remote_fs_metadata", ""));
|
||||
String remote_fs_metadata = parse<String>(in->getResponseCookie("remote_fs_metadata", ""));
|
||||
if (!remote_fs_metadata.empty())
|
||||
{
|
||||
if (!try_zero_copy)
|
||||
@ -548,7 +547,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
|
||||
try
|
||||
{
|
||||
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, in, throttler);
|
||||
return downloadPartToDiskRemoteMeta(part_name, replica_path, to_detached, tmp_prefix_, disk, *in, throttler);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
@ -556,6 +555,18 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
throw;
|
||||
|
||||
LOG_WARNING(log, fmt::runtime(e.message() + " Will retry fetching part without zero-copy."));
|
||||
|
||||
/// It's important to release session from HTTP pool. Otherwise it's possible to get deadlock
|
||||
/// on http pool.
|
||||
try
|
||||
{
|
||||
in.reset();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(log);
|
||||
}
|
||||
|
||||
/// Try again but without zero-copy
|
||||
return fetchPart(metadata_snapshot, context, part_name, replica_path, host, port, timeouts,
|
||||
user, password, interserver_scheme, throttler, to_detached, tmp_prefix_, nullptr, false, disk);
|
||||
@ -569,16 +580,16 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
part_info.partition_id, part_name, new_part_path,
|
||||
replica_path, uri, to_detached, sum_files_size);
|
||||
|
||||
in.setNextCallback(ReplicatedFetchReadCallback(*entry));
|
||||
in->setNextCallback(ReplicatedFetchReadCallback(*entry));
|
||||
|
||||
size_t projections = 0;
|
||||
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_PROJECTION)
|
||||
readBinary(projections, in);
|
||||
readBinary(projections, *in);
|
||||
|
||||
MergeTreeData::DataPart::Checksums checksums;
|
||||
return part_type == "InMemory"
|
||||
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, in, projections, throttler)
|
||||
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, in, projections, checksums, throttler);
|
||||
? downloadPartToMemory(part_name, part_uuid, metadata_snapshot, context, disk, *in, projections, throttler)
|
||||
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, sync, disk, *in, projections, checksums, throttler);
|
||||
}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
|
||||
|
Loading…
Reference in New Issue
Block a user