Increase retries for test_merge_tree_azure_blob_storage (#54069)

* Increase max_single_download_retries & max_single_read_retries for test_merge_tree_azure_blob_storage

* Updated logs to log exception leading to detaching part

* Updated to catch RequestFailedException azure exception as its the base exception

* Updated isRetryableExcepion to take std::exception_ptr as input

* Fixed style check fails

* Fix build issue & moved NetException & TimeoutException to retryable exceptions

* Fixed style check with catch
This commit is contained in:
SmitaRKulkarni 2023-09-06 19:57:59 +02:00 committed by GitHub
parent 7f756ed613
commit f0146c0ff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 104 additions and 127 deletions

View File

@ -124,11 +124,7 @@ bool ReadBufferFromAzureBlobStorage::nextImpl()
read_settings.remote_throttler->add(bytes_read, ProfileEvents::RemoteReadThrottlerBytes, ProfileEvents::RemoteReadThrottlerSleepMicroseconds);
break;
}
catch (const Azure::Core::Http::TransportException & e)
{
handle_exception(e, i);
}
catch (const Azure::Storage::StorageException & e)
catch (const Azure::Core::RequestFailedException & e)
{
handle_exception(e, i);
}
@ -240,10 +236,6 @@ void ReadBufferFromAzureBlobStorage::initialize()
data_stream = std::move(download_response.Value.BodyStream);
break;
}
catch (const Azure::Core::Http::TransportException & e)
{
handle_exception(e, i);
}
catch (const Azure::Core::RequestFailedException & e)
{
handle_exception(e,i);

View File

@ -62,10 +62,6 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
func();
break;
}
catch (const Azure::Core::Http::TransportException & e)
{
handle_exception(e, i);
}
catch (const Azure::Core::RequestFailedException & e)
{
handle_exception(e, i);

View File

@ -35,7 +35,7 @@ TEST(AzureBlobContainerClient, CurlMemoryLeak)
options.Retry.MaxRetries = 0;
auto client = std::make_unique<BlobContainerClient>(BlobContainerClient::CreateFromConnectionString(unavailable_url, container, options));
EXPECT_THROW({ client->ListBlobs(); }, Azure::Core::Http::TransportException);
EXPECT_THROW({ client->ListBlobs(); }, Azure::Core::RequestFailedException);
}
#endif

View File

@ -1244,32 +1244,13 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
.withPartFormatFromDisk()
.build();
}
catch (const Exception & e)
catch (...)
{
/// Don't count the part as broken if there was a retryalbe error
/// during loading, such as "not enough memory" or network error.
if (isRetryableException(e))
if (isRetryableException(std::current_exception()))
throw;
mark_broken();
return res;
}
catch (const Poco::Net::NetException &)
{
throw;
}
catch (const Poco::TimeoutException &)
{
throw;
}
#if USE_AZURE_BLOB_STORAGE
catch (const Azure::Core::Http::TransportException &)
{
throw;
}
#endif
catch (...)
{
LOG_DEBUG(log, "Failed to load data part {}, unknown exception", part_name);
mark_broken();
return res;
}
@ -1294,18 +1275,12 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPart(
{
res.part->loadColumnsChecksumsIndexes(require_part_metadata, true);
}
catch (const Exception & e)
catch (...)
{
/// Don't count the part as broken if there was a retryalbe error
/// during loading, such as "not enough memory" or network error.
if (isRetryableException(e))
if (isRetryableException(std::current_exception()))
throw;
mark_broken();
return res;
}
catch (...)
{
mark_broken();
return res;
}
@ -1416,11 +1391,28 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPartWithRetries(
size_t max_backoff_ms,
size_t max_tries)
{
auto handle_exception = [&, this](String exception_message, size_t try_no)
auto handle_exception = [&, this](std::exception_ptr exception_ptr, size_t try_no)
{
if (try_no + 1 == max_tries)
throw;
String exception_message;
try
{
rethrow_exception(exception_ptr);
}
catch (const Exception & e)
{
exception_message = e.message();
}
#if USE_AZURE_BLOB_STORAGE
catch (const Azure::Core::RequestFailedException & e)
{
exception_message = e.Message;
}
#endif
LOG_DEBUG(log, "Failed to load data part {} at try {} with retryable error: {}. Will retry in {} ms",
part_name, try_no, exception_message, initial_backoff_ms);
@ -1434,19 +1426,13 @@ MergeTreeData::LoadPartResult MergeTreeData::loadDataPartWithRetries(
{
return loadDataPart(part_info, part_name, part_disk_ptr, to_state, part_loading_mutex);
}
catch (const Exception & e)
catch (...)
{
if (isRetryableException(e))
handle_exception(e.message(),try_no);
if (isRetryableException(std::current_exception()))
handle_exception(std::current_exception(),try_no);
else
throw;
}
#if USE_AZURE_BLOB_STORAGE
catch (const Azure::Core::Http::TransportException & e)
{
handle_exception(e.Message,try_no);
}
#endif
}
UNREACHABLE();
}
@ -5522,11 +5508,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartRestoredFromBackup(cons
{
load_part();
}
catch (const Exception & e)
{
error = std::current_exception();
retryable = isRetryableException(e);
}
catch (const Poco::Net::NetException &)
{
error = std::current_exception();
@ -5540,6 +5521,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartRestoredFromBackup(cons
catch (...)
{
error = std::current_exception();
retryable = isRetryableException(std::current_exception());
}
if (!error)

View File

@ -112,15 +112,10 @@ void MergeTreeReaderCompact::initialize()
compressed_data_buffer = non_cached_buffer.get();
}
}
catch (const Exception & e)
{
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
throw;
}
}
@ -239,18 +234,21 @@ size_t MergeTreeReaderCompact::readRows(
"Cannot read all data in MergeTreeReaderCompact. Rows read: {}. Rows expected: {}.",
read_rows_in_column, rows_to_read);
}
catch (Exception & e)
catch (...)
{
if (!isRetryableException(e))
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
/// Better diagnostics.
e.addMessage(getMessageForDiagnosticOfBrokenPart(from_mark, max_rows_to_read));
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
try
{
rethrow_exception(std::current_exception());
}
catch (Exception & e)
{
e.addMessage(getMessageForDiagnosticOfBrokenPart(from_mark, max_rows_to_read));
}
throw;
}
}
@ -396,15 +394,10 @@ try
seekToMark(all_mark_ranges.front().begin, 0);
data_buffer->prefetch(priority);
}
catch (const Exception & e)
{
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
throw;
}

View File

@ -47,15 +47,10 @@ MergeTreeReaderWide::MergeTreeReaderWide(
for (size_t i = 0; i < columns_to_read.size(); ++i)
addStreams(columns_to_read[i], serializations[i], profile_callback_, clock_type_);
}
catch (const Exception & e)
{
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
throw;
}
}
@ -78,15 +73,10 @@ void MergeTreeReaderWide::prefetchBeginOfRange(Priority priority)
/// of range only once so there is no such problem.
/// 4. continue_reading == false, as we haven't read anything yet.
}
catch (const Exception & e)
{
if (!isRetryableException(e))
data_part_info_for_read->reportBroken();
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
throw;
}
}
@ -184,18 +174,21 @@ size_t MergeTreeReaderWide::readRows(
/// In particular, even if for some streams there are no rows to be read,
/// you must ensure that no seeks are skipped and at this point they all point to to_mark.
}
catch (Exception & e)
catch (...)
{
if (!isRetryableException(e))
if (!isRetryableException(std::current_exception()))
data_part_info_for_read->reportBroken();
/// Better diagnostics.
e.addMessage(getMessageForDiagnosticOfBrokenPart(from_mark, max_rows_to_read));
throw;
}
catch (...)
{
data_part_info_for_read->reportBroken();
try
{
rethrow_exception(std::current_exception());
}
catch (Exception & e)
{
e.addMessage(getMessageForDiagnosticOfBrokenPart(from_mark, max_rows_to_read));
}
throw;
}

View File

@ -377,13 +377,9 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
result.action = ReplicatedCheckResult::DoNothing;
return result;
}
catch (const Exception & e)
catch (...)
{
/// Don't count the part as broken if we got known retryable exception.
/// In fact, there can be other similar situations because not all
/// of the exceptions are classified as retryable/non-retryable. But it is OK,
/// because there is a safety guard against deleting too many parts.
if (isRetryableException(e))
if (isRetryableException(std::current_exception()))
throw;
tryLogCurrentException(log, __PRETTY_FUNCTION__);
@ -395,6 +391,7 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
result.status = {part_name, false, message};
result.action = ReplicatedCheckResult::TryFetchMissing;
return result;
}
}
else if (part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER < current_time)

View File

@ -15,6 +15,11 @@
#include <IO/HashingReadBuffer.h>
#include <IO/S3Common.h>
#include <Common/CurrentMetrics.h>
#include <Poco/Net/NetException.h>
#if USE_AZURE_BLOB_STORAGE
#include <azure/core/http/http.hpp>
#endif
namespace CurrentMetrics
{
@ -49,19 +54,41 @@ bool isNotEnoughMemoryErrorCode(int code)
|| code == ErrorCodes::CANNOT_MREMAP;
}
bool isRetryableException(const Exception & e)
bool isRetryableException(const std::exception_ptr exception_ptr)
{
if (isNotEnoughMemoryErrorCode(e.code()))
return true;
if (e.code() == ErrorCodes::NETWORK_ERROR || e.code() == ErrorCodes::SOCKET_TIMEOUT)
return true;
try
{
rethrow_exception(exception_ptr);
}
#if USE_AWS_S3
const auto * s3_exception = dynamic_cast<const S3Exception *>(&e);
if (s3_exception && s3_exception->isRetryableError())
return true;
catch (const S3Exception & s3_exception)
{
if (s3_exception.isRetryableError())
return true;
}
#endif
#if USE_AZURE_BLOB_STORAGE
catch (const Azure::Core::RequestFailedException &)
{
return true;
}
#endif
catch (const Exception & e)
{
if (isNotEnoughMemoryErrorCode(e.code()))
return true;
if (e.code() == ErrorCodes::NETWORK_ERROR || e.code() == ErrorCodes::SOCKET_TIMEOUT)
return true;
}
catch (const Poco::Net::NetException &)
{
return true;
}
catch (const Poco::TimeoutException &)
{
return true;
}
/// In fact, there can be other similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
@ -321,15 +348,10 @@ IMergeTreeDataPart::Checksums checkDataPart(
require_checksums,
is_cancelled);
}
catch (const Exception & e)
{
if (isRetryableException(e))
throw;
return drop_cache_and_check();
}
catch (...)
{
if (isRetryableException(std::current_exception()))
throw;
return drop_cache_and_check();
}
}

View File

@ -13,6 +13,6 @@ IMergeTreeDataPart::Checksums checkDataPart(
std::function<bool()> is_cancelled = []{ return false; });
bool isNotEnoughMemoryErrorCode(int code);
bool isRetryableException(const Exception & e);
bool isRetryableException(const std::exception_ptr exception_ptr);
}

View File

@ -10,6 +10,8 @@
<account_name>devstoreaccount1</account_name>
<account_key>Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==</account_key>
<max_single_part_upload_size>100000</max_single_part_upload_size>
<max_single_download_retries>10</max_single_download_retries>
<max_single_read_retries>10</max_single_read_retries>
<!-- NOTE: container_already_exists is omitted to:
a) create it
b) ignore if it already exists, since there are two instances, that conflicts with each other