Merge pull request #51099 from ClickHouse/fix_use_local_vars_after_exception

Fix for part_names_mutex used after destruction
This commit is contained in:
Alexey Milovidov 2023-06-16 23:52:50 +03:00 committed by GitHub
commit 4e45e39e25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1433,6 +1433,21 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPartWithRetries(
UNREACHABLE();
}
/// Wait for all tasks to finish and rethrow the first exception if any.
/// The tasks access local variables of the caller function, so we can't just rethrow the first exception until all other tasks are finished.
void waitForAllToFinishAndRethrowFirstError(std::vector<std::future<void>> & futures)
{
/// First wait for all tasks to finish.
for (auto & future : futures)
future.wait();
/// Now rethrow the first exception if any.
for (auto & future : futures)
future.get();
futures.clear();
}
std::vector<MergeTreeData::LoadPartResult> MergeTreeData::loadDataPartsFromDisk(PartLoadingTreeNodes & parts_to_load)
{
const size_t num_parts = parts_to_load.size();
@ -1463,10 +1478,8 @@ std::vector<MergeTreeData::LoadPartResult> MergeTreeData::loadDataPartsFromDisk(
if (are_parts_to_load_empty)
{
/// Wait for all scheduled tasks.
/// We have to use .get() method to rethrow any exception that could occur.
for (auto & future: parts_futures)
future.get();
parts_futures.clear();
waitForAllToFinishAndRethrowFirstError(parts_futures);
/// At this point it is possible, that some other parts appeared in the queue for processing (parts_to_load),
/// because we added them from inside the pool.
/// So we need to recheck it.
@ -1660,10 +1673,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
/// For iteration to be completed
/// Any exception will be re-thrown.
for (auto & future : disks_futures)
future.get();
disks_futures.clear();
waitForAllToFinishAndRethrowFirstError(disks_futures);
PartLoadingTree::PartLoadingInfos parts_to_load;
for (auto & disk_parts : parts_to_load_by_disk)
@ -1773,10 +1783,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
/// For for iteration to be completed
/// Any exception will be re-thrown.
for (auto & future : wal_disks_futures)
future.get();
wal_disks_futures.clear();
waitForAllToFinishAndRethrowFirstError(wal_disks_futures);
MutableDataPartsVector parts_from_wal;
for (auto & disk_wal_parts : disks_wal_parts)
@ -1891,9 +1898,7 @@ try
{
/// Wait for every scheduled task
/// In case of any exception it will be re-thrown and server will be terminated.
for (auto & future : parts_futures)
future.get();
parts_futures.clear();
waitForAllToFinishAndRethrowFirstError(parts_futures);
LOG_DEBUG(log,
"Stopped loading outdated data parts because task was canceled. "
@ -2443,10 +2448,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
}, Priority{0}));
}
/// Any exception will be re-thrown.
for (auto & future : parts_to_remove_futures)
future.get();
parts_to_remove_futures.clear();
waitForAllToFinishAndRethrowFirstError(parts_to_remove_futures);
return;
}
@ -2602,10 +2604,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
independent_ranges = split_into_independent_ranges(excluded_parts, /* split_times */ 0);
/// Any exception will be re-thrown.
for (auto & future : part_removal_futures)
future.get();
part_removal_futures.clear();
waitForAllToFinishAndRethrowFirstError(part_removal_futures);
for (size_t i = 0; i < independent_ranges.infos.size(); ++i)
{
@ -2614,10 +2613,7 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
schedule_parts_removal(range, std::move(parts_in_range));
}
/// Any exception will be re-thrown.
for (auto & future : part_removal_futures)
future.get();
part_removal_futures.clear();
waitForAllToFinishAndRethrowFirstError(part_removal_futures);
if (parts_to_remove.size() != sum_of_ranges + excluded_parts.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,