check for network errors while loading parts

This commit is contained in:
Anton Popov 2023-01-10 11:55:08 +00:00
parent 1e5d6e44a5
commit b9bf92fa5d
2 changed files with 34 additions and 8 deletions

View File

@ -18,6 +18,7 @@
#include <Functions/IFunction.h> #include <Functions/IFunction.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/S3Common.h>
#include <Interpreters/Aggregator.h> #include <Interpreters/Aggregator.h>
#include <Interpreters/ExpressionAnalyzer.h> #include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/PartLog.h> #include <Interpreters/PartLog.h>
@ -157,6 +158,8 @@ namespace ErrorCodes
extern const int ZERO_COPY_REPLICATION_ERROR; extern const int ZERO_COPY_REPLICATION_ERROR;
extern const int NOT_INITIALIZED; extern const int NOT_INITIALIZED;
extern const int SERIALIZATION_ERROR; extern const int SERIALIZATION_ERROR;
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
} }
@ -1066,6 +1069,25 @@ static void preparePartForRemoval(const MergeTreeMutableDataPartPtr & part)
} }
} }
static bool isRetryableException(const Exception & e)
{
if (isNotEnoughMemoryErrorCode(e.code()))
return true;
if (e.code() == ErrorCodes::NETWORK_ERROR || e.code() == ErrorCodes::SOCKET_TIMEOUT)
return true;
#if USE_AWS_S3
const auto * s3_exception = dynamic_cast<const S3Exception *>(&e);
if (s3_exception && s3_exception->isRetryableError())
return true;
#endif
/// In fact, there can be other similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
return false;
}
MergeTreeData::LoadPartResult MergeTreeData::loadDataPart( MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
const MergeTreePartInfo & part_info, const MergeTreePartInfo & part_info,
const String & part_name, const String & part_name,
@ -1106,10 +1128,9 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
} }
catch (const Exception & e) catch (const Exception & e)
{ {
/// Don't count the part as broken if there is not enough memory to load it. /// Don't count the part as broken if there was a retryalbe error
/// In fact, there can be many similar situations. /// during loading, such as "not enough memory" or network error.
/// But it is OK, because there is a safety guard against deleting too many parts. if (isRetryableException(e))
if (isNotEnoughMemoryErrorCode(e.code()))
throw; throw;
res.is_broken = true; res.is_broken = true;
@ -1753,7 +1774,8 @@ catch (...)
std::terminate(); std::terminate();
} }
void MergeTreeData::waitForOutdatedPartsToBeLoaded() const /// No TSA because of std::unique_lock and std::condition_variable.
void MergeTreeData::waitForOutdatedPartsToBeLoaded() const TSA_NO_THREAD_SAFETY_ANALYSIS
{ {
/// Background tasks are not run if storage is static. /// Background tasks are not run if storage is static.
if (isStaticStorage()) if (isStaticStorage())
@ -1765,7 +1787,11 @@ void MergeTreeData::waitForOutdatedPartsToBeLoaded() const
LOG_TRACE(log, "Will wait for outdated data parts to be loaded"); LOG_TRACE(log, "Will wait for outdated data parts to be loaded");
outdated_data_parts_cv.wait(lock, [this] { return outdated_unloaded_data_parts.empty() || outdated_data_parts_loading_canceled; }); outdated_data_parts_cv.wait(lock, [this]() TSA_NO_THREAD_SAFETY_ANALYSIS
{
return outdated_unloaded_data_parts.empty() || outdated_data_parts_loading_canceled;
});
if (outdated_data_parts_loading_canceled) if (outdated_data_parts_loading_canceled)
throw Exception(ErrorCodes::NOT_INITIALIZED, "Loading of outdated data parts was canceled"); throw Exception(ErrorCodes::NOT_INITIALIZED, "Loading of outdated data parts was canceled");

View File

@ -1404,8 +1404,8 @@ protected:
mutable std::condition_variable outdated_data_parts_cv; mutable std::condition_variable outdated_data_parts_cv;
BackgroundSchedulePool::TaskHolder outdated_data_parts_loading_task; BackgroundSchedulePool::TaskHolder outdated_data_parts_loading_task;
PartLoadingTreeNodes outdated_unloaded_data_parts; PartLoadingTreeNodes outdated_unloaded_data_parts TSA_GUARDED_BY(outdated_data_parts_mutex);
bool outdated_data_parts_loading_canceled = false; bool outdated_data_parts_loading_canceled TSA_GUARDED_BY(outdated_data_parts_mutex) = false;
void loadOutdatedDataParts(bool is_async); void loadOutdatedDataParts(bool is_async);
void startOutdatedDataPartsLoadingTask(); void startOutdatedDataPartsLoadingTask();