diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 20de0694a6a..365dc1a502b 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1433,6 +1433,21 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPartWithRetries( UNREACHABLE(); } +/// Wait for all tasks to finish and rethrow the first exception if any. +/// The tasks access local variables of the caller function, so we can't just rethrow the first exception until all other tasks are finished. +void waitForAllToFinishAndRethrowFirstError(std::vector> & futures) +{ + /// First wait for all tasks to finish. + for (auto & future : futures) + future.wait(); + + /// Now rethrow the first exception if any. + for (auto & future : futures) + future.get(); + + futures.clear(); +} + std::vector MergeTreeData::loadDataPartsFromDisk(PartLoadingTreeNodes & parts_to_load) { const size_t num_parts = parts_to_load.size(); @@ -1463,10 +1478,8 @@ std::vector MergeTreeData::loadDataPartsFromDisk( if (are_parts_to_load_empty) { /// Wait for all scheduled tasks. - /// We have to use .get() method to rethrow any exception that could occur. - for (auto & future: parts_futures) - future.get(); - parts_futures.clear(); + waitForAllToFinishAndRethrowFirstError(parts_futures); + /// At this point it is possible, that some other parts appeared in the queue for processing (parts_to_load), /// because we added them from inside the pool. /// So we need to recheck it. @@ -1660,10 +1673,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) } /// For iteration to be completed - /// Any exception will be re-thrown. - for (auto & future : disks_futures) - future.get(); - disks_futures.clear(); + waitForAllToFinishAndRethrowFirstError(disks_futures); PartLoadingTree::PartLoadingInfos parts_to_load; for (auto & disk_parts : parts_to_load_by_disk) @@ -1773,10 +1783,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) } /// For for iteration to be completed - /// Any exception will be re-thrown. - for (auto & future : wal_disks_futures) - future.get(); - wal_disks_futures.clear(); + waitForAllToFinishAndRethrowFirstError(wal_disks_futures); MutableDataPartsVector parts_from_wal; for (auto & disk_wal_parts : disks_wal_parts) @@ -1891,9 +1898,7 @@ try { /// Wait for every scheduled task /// In case of any exception it will be re-thrown and server will be terminated. - for (auto & future : parts_futures) - future.get(); - parts_futures.clear(); + waitForAllToFinishAndRethrowFirstError(parts_futures); LOG_DEBUG(log, "Stopped loading outdated data parts because task was canceled. " @@ -2443,10 +2448,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t }, Priority{0})); } - /// Any exception will be re-thrown. - for (auto & future : parts_to_remove_futures) - future.get(); - parts_to_remove_futures.clear(); + waitForAllToFinishAndRethrowFirstError(parts_to_remove_futures); return; } @@ -2602,10 +2604,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t independent_ranges = split_into_independent_ranges(excluded_parts, /* split_times */ 0); - /// Any exception will be re-thrown. - for (auto & future : part_removal_futures) - future.get(); - part_removal_futures.clear(); + waitForAllToFinishAndRethrowFirstError(part_removal_futures); for (size_t i = 0; i < independent_ranges.infos.size(); ++i) { @@ -2614,10 +2613,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t schedule_parts_removal(range, std::move(parts_in_range)); } - /// Any exception will be re-thrown. - for (auto & future : part_removal_futures) - future.get(); - part_removal_futures.clear(); + waitForAllToFinishAndRethrowFirstError(part_removal_futures); if (parts_to_remove.size() != sum_of_ranges + excluded_parts.size()) throw Exception(ErrorCodes::LOGICAL_ERROR,