diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 9bffa9d1322..2470b45bed7 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -157,6 +158,8 @@ namespace ErrorCodes extern const int ZERO_COPY_REPLICATION_ERROR; extern const int NOT_INITIALIZED; extern const int SERIALIZATION_ERROR; + extern const int NETWORK_ERROR; + extern const int SOCKET_TIMEOUT; } @@ -1066,6 +1069,25 @@ static void preparePartForRemoval(const MergeTreeMutableDataPartPtr & part) } } +static bool isRetryableException(const Exception & e) +{ + if (isNotEnoughMemoryErrorCode(e.code())) + return true; + + if (e.code() == ErrorCodes::NETWORK_ERROR || e.code() == ErrorCodes::SOCKET_TIMEOUT) + return true; + +#if USE_AWS_S3 + const auto * s3_exception = dynamic_cast(&e); + if (s3_exception && s3_exception->isRetryableError()) + return true; +#endif + + /// In fact, there can be other similar situations. + /// But it is OK, because there is a safety guard against deleting too many parts. + return false; +} + MergeTreeData::LoadPartResult MergeTreeData::loadDataPart( const MergeTreePartInfo & part_info, const String & part_name, @@ -1106,10 +1128,9 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart( } catch (const Exception & e) { - /// Don't count the part as broken if there is not enough memory to load it. - /// In fact, there can be many similar situations. - /// But it is OK, because there is a safety guard against deleting too many parts. - if (isNotEnoughMemoryErrorCode(e.code())) + /// Don't count the part as broken if there was a retryalbe error + /// during loading, such as "not enough memory" or network error. + if (isRetryableException(e)) throw; res.is_broken = true; @@ -1753,7 +1774,8 @@ catch (...) std::terminate(); } -void MergeTreeData::waitForOutdatedPartsToBeLoaded() const +/// No TSA because of std::unique_lock and std::condition_variable. +void MergeTreeData::waitForOutdatedPartsToBeLoaded() const TSA_NO_THREAD_SAFETY_ANALYSIS { /// Background tasks are not run if storage is static. if (isStaticStorage()) @@ -1765,7 +1787,11 @@ void MergeTreeData::waitForOutdatedPartsToBeLoaded() const LOG_TRACE(log, "Will wait for outdated data parts to be loaded"); - outdated_data_parts_cv.wait(lock, [this] { return outdated_unloaded_data_parts.empty() || outdated_data_parts_loading_canceled; }); + outdated_data_parts_cv.wait(lock, [this]() TSA_NO_THREAD_SAFETY_ANALYSIS + { + return outdated_unloaded_data_parts.empty() || outdated_data_parts_loading_canceled; + }); + if (outdated_data_parts_loading_canceled) throw Exception(ErrorCodes::NOT_INITIALIZED, "Loading of outdated data parts was canceled"); diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index ddf1e523eca..09a757c4419 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1404,8 +1404,8 @@ protected: mutable std::condition_variable outdated_data_parts_cv; BackgroundSchedulePool::TaskHolder outdated_data_parts_loading_task; - PartLoadingTreeNodes outdated_unloaded_data_parts; - bool outdated_data_parts_loading_canceled = false; + PartLoadingTreeNodes outdated_unloaded_data_parts TSA_GUARDED_BY(outdated_data_parts_mutex); + bool outdated_data_parts_loading_canceled TSA_GUARDED_BY(outdated_data_parts_mutex) = false; void loadOutdatedDataParts(bool is_async); void startOutdatedDataPartsLoadingTask();