From 22a92faab649479c69bcc3eab7a7f6bfaef30c45 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 16 Jun 2020 05:14:53 +0300 Subject: [PATCH] Avoid connection to replica when fetches are cancelled --- src/Storages/MergeTree/DataPartsExchange.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index acc3bf38461..6796e630ff2 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -63,8 +63,10 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo static std::atomic_uint total_sends {0}; - if ((data_settings->replicated_max_parallel_sends && total_sends >= data_settings->replicated_max_parallel_sends) - || (data_settings->replicated_max_parallel_sends_for_table && data.current_table_sends >= data_settings->replicated_max_parallel_sends_for_table)) + if ((data_settings->replicated_max_parallel_sends + && total_sends >= data_settings->replicated_max_parallel_sends) + || (data_settings->replicated_max_parallel_sends_for_table + && data.current_table_sends >= data_settings->replicated_max_parallel_sends_for_table)) { response.setStatus(std::to_string(HTTP_TOO_MANY_REQUESTS)); response.setReason("Too many concurrent fetches, try again later"); @@ -182,6 +184,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( bool to_detached, const String & tmp_prefix_) { + if (blocker.isCancelled()) + throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED); + /// Validation of the input that may come from malicious replica. MergeTreePartInfo::fromPartName(part_name, data.format_version); const auto data_settings = data.getSettings(); @@ -294,7 +299,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPart( if (blocker.isCancelled()) { - /// NOTE The is_cancelled flag also makes sense to check every time you read over the network, performing a poll with a not very large timeout. + /// NOTE The is_cancelled flag also makes sense to check every time you read over the network, + /// performing a poll with a not very large timeout. /// And now we check it only between read chunks (in the `copyData` function). disk->removeRecursive(part_download_path); throw Exception("Fetching of part was cancelled", ErrorCodes::ABORTED);